ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

线程池提交任务方法

2021-09-13 20:03:39  阅读:204  来源: 互联网

标签:tasks futures 任务 task 线程 提交 new null size


excute方法:  源码

submit方法通过提交参数构造FutrueTask,然后执行excute(FutrueTask)方法,返回一个future对象

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);//new FutureTask<T>(runnable, null)
    execute(ftask);
    return ftask;
}
submit(Runnable task)
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);//new FutureTask<T>(runnable, value)
    execute(ftask);
    return ftask;
}
submit(Runnable task, T result)
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
submit(Callable task)

任务批量提交

超时时间:执行invokeAll或者invokeAny的时间

会等待所有任务完成,如果异常终止(包括超时停止),取消所有任务

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            //构造FutureTask,添加进futures集合里
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);//执行任务
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {//任务已经完成
                try {
                    f.get();//这里只是起到阻塞的作用,如果线程中断,则跳到finally
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {//不处理任务执行抛出
                }
            }
        }
        //如果没走到这里,就说明抛出了异常,可能是线程中断异常或其他异常。
        done = true;
        return futures;
    } finally {
        //未正常结束,取消所有任务,通过future get时抛出CancellationException(
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}
invokeAll(Collection<? extends Callable> tasks)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        // Interleave time checks and calls to execute in case
        // executor doesn't have any/much parallelism.
        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)//超时返回
                return futures;
        }

        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            //如果任务未完成,则进去阻塞到任务完成
            if (!f.isDone()) {
                //超时返回
                if (nanos <= 0L)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;//get超时返回
                }
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;//未走到这里,标识抛出异常了,在finally里取消所有任务
        return futures;
    } finally {
        //进而取消所有任务,取消的任务不能再执行了
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}
invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit)

只要有任意一个任务完成,就会返回其结果值,如果超时,则抛出 TimeoutException异常,最终都会取消所有任务

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}
invokeAny(Collection<? extends Callable> tasks)
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit)

真正执行invokeAny的方法

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        //可以优先获取到已经完成的任务
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            //任务执行时抛出的异常
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;//正在执行的任务数量

            for (;;) {
                //获取已完成的任务, 这里是通过ExecutorCompletionService的方法来获取
                Future<T> f = ecs.poll();
                //没有已经完成的任务
                if (f == null) {
                    if (ntasks > 0) {//当前未提交的任务
                        --ntasks;//未提交任务-1
                        futures.add(ecs.submit(it.next()));//提交任务
                        ++active;
                    }
                    else if (active == 0) //如果没有任务正在执行
                        break;
                    else if (timed) {//如果设置了超时时间
                        //阻塞获取
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        //获取不到,就抛出超时异常
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();//阻塞获取
                }
                if (f != null) {//有任务完成了
                    --active;//任务正在执行数量-1
                    try {
                        return f.get();//直接返回结果值
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            //最终取消所有任务
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
doInvokeAny

 

标签:tasks,futures,任务,task,线程,提交,new,null,size
来源: https://www.cnblogs.com/shuiyingyuan/p/15262979.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有