等待 ExecutorService 中的所有任务完成

让我们看一下等待提交给 Executor 的任务完成的各种选项

  1. ExecutorService invokeAll()

    执行给定的任务,返回一个 Futures 列表,在完成所有操作后保持其状态和结果。

例:

import java.util.concurrent.*;
import java.util.*;

public class InvokeAllDemo{
    public InvokeAllDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        
        List<MyCallable> futureList = new ArrayList<MyCallable>();
        for (int i = 0; i < 10; i++){
            MyCallable myCallable = new MyCallable((long)i);
            futureList.add(myCallable);
        }
        System.out.println("Start");
        try{
            List<Future<Long>> futures = service.invokeAll(futureList);  
        } catch(Exception err){
            err.printStackTrace();
        }
        System.out.println("Completed");
        service.shutdown();
    }
    public static void main(String args[]){
        InvokeAllDemo demo = new InvokeAllDemo();
    }
    class MyCallable implements Callable<Long>{
        Long id = 0L;
        public MyCallable(Long val){
            this.id = val;
        }
        public Long call(){
            // Add your business logic
            return id;
        }
    }
}
  1. CountDownLatch

    允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。

    一个 CountDownLatch 初始化为给定数。由于 countDown() 方法的调用,await 方法阻塞直到当前计数达到零,之后所有等待的线程被释放,并且等待的任何后续调用立即返回。这是一次性现象 - 计数无法重置。如果你需要重置计数的版本,请考虑使用 CyclicBarrier

  2. 执行器中的 ForkJoinPoolnewWorkStealingPool()

  3. 迭代提交到 ExecutorService 后创建的所有 Future 对象

  4. ExecutorService 的 oracle 文档页面推荐的关闭方式 :

    void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // Disable new tasks from being submitted
        try {
          // Wait a while for existing tasks to terminate
          if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
            pool.shutdownNow(); // Cancel currently executing tasks
            // Wait a while for tasks to respond to being cancelled
            if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                System.err.println("Pool did not terminate");
          }
        } catch (InterruptedException ie) {
          // (Re-)Cancel if current thread also interrupted
          pool.shutdownNow();
          // Preserve interrupt status
          Thread.currentThread().interrupt();
        }
    

    shutdown():启动有序关闭,其中执行先前提交的任务,但不接受任何新任务。

    shutdownNow():Attempts 停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。

    在上面的示例中,如果你的任务需要更多时间来完成,则可以将条件更改为条件

    更换

    if (!pool.awaitTermination(60, TimeUnit.SECONDS))
    

    while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
      Thread.sleep(60000);
    

    }