等待 ExecutorService 中的所有任務完成

讓我們看一下等待提交給 Executor 的任務完成的各種選項

  1. ExecutorService invokeAll()

    執行給定的任務,返回一個 Futures 列表,在完成所有操作後保持其狀態和結果。

例:

import java.util.concurrent.*;
import java.util.*;

public class InvokeAllDemo{
    public InvokeAllDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        
        List<MyCallable> futureList = new ArrayList<MyCallable>();
        for (int i = 0; i < 10; i++){
            MyCallable myCallable = new MyCallable((long)i);
            futureList.add(myCallable);
        }
        System.out.println("Start");
        try{
            List<Future<Long>> futures = service.invokeAll(futureList);  
        } catch(Exception err){
            err.printStackTrace();
        }
        System.out.println("Completed");
        service.shutdown();
    }
    public static void main(String args[]){
        InvokeAllDemo demo = new InvokeAllDemo();
    }
    class MyCallable implements Callable<Long>{
        Long id = 0L;
        public MyCallable(Long val){
            this.id = val;
        }
        public Long call(){
            // Add your business logic
            return id;
        }
    }
}
  1. CountDownLatch

    允許一個或多個執行緒等待直到在其他執行緒中執行的一組操作完成的同步輔助。

    一個 CountDownLatch 初始化為給定數。由於 countDown() 方法的呼叫,await 方法阻塞直到當前計數達到零,之後所有等待的執行緒被釋放,並且等待的任何後續呼叫立即返回。這是一次性現象 - 計數無法重置。如果你需要重置計數的版本,請考慮使用 CyclicBarrier

  2. 執行器中的 ForkJoinPoolnewWorkStealingPool()

  3. 迭代提交到 ExecutorService 後建立的所有 Future 物件

  4. ExecutorService 的 oracle 文件頁面推薦的關閉方式 :

    void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // Disable new tasks from being submitted
        try {
          // Wait a while for existing tasks to terminate
          if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
            pool.shutdownNow(); // Cancel currently executing tasks
            // Wait a while for tasks to respond to being cancelled
            if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                System.err.println("Pool did not terminate");
          }
        } catch (InterruptedException ie) {
          // (Re-)Cancel if current thread also interrupted
          pool.shutdownNow();
          // Preserve interrupt status
          Thread.currentThread().interrupt();
        }
    

    shutdown():啟動有序關閉,其中執行先前提交的任務,但不接受任何新任務。

    shutdownNow():Attempts 停止所有正在執行的任務,停止等待任務的處理,並返回等待執行的任務列表。

    在上面的示例中,如果你的任務需要更多時間來完成,則可以將條件更改為條件

    更換

    if (!pool.awaitTermination(60, TimeUnit.SECONDS))
    

    while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
      Thread.sleep(60000);
    

    }