在 一文秒懂 Java ExecutorService 章节,我们讲解了如何使用 ExecutorService 框架轻松地处理多个线程中的任务。在那个章节中,我们虽然也介绍了如何使用 shutdown()
方法关闭 ExecutorService,但是,我们还漏了一项最重要的东西没讲,就是需要 ExecutorService 等待线程完成执行的一些场景。
本章节,我们将演示如何等待已经运行的线程完成其执行并正常关闭 ExecutorService。
Executor 关闭后的那些事
当使用执行器 ( Executor ) 执行多线程时,我们可以通过调用 shutdown()
或 shutdownNow()
方法将其关闭。
但,这有一个缺点,就是它不会等到所有线程都停止执行。也就是说,可能某个线程执行到一半半的时候,就被 Executor 停止了。
有时候,这是很可怕的事情。为了让 Executor 等待所有的线程执行完毕后再退出,Java 提供了 awaitTermination()
方法,该方法会等待现有线程完成其执行。
也就是说,这个方法会阻塞线程,直到所有任务完成执行或达到指定的超时。
public void awaitTerminationAfterShutdown(ExecutorService threadPool) { threadPool.shutdown(); try { if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { threadPool.shutdownNow(); } } catch (InterruptedException ex) { threadPool.shutdownNow(); Thread.currentThread().interrupt(); } }
使用 CountDownLatch
还记得那篇 一文秒懂 Java CountDownLatch 文章吗? 在这篇文章中,我们介绍了 CountDownLatch
类的用法。
因为,它提供了解决上述问题的另一种方法,准确的说,就是使用 CountDownLatch 来指示任务是否全部完成。
在那篇介绍文章中,我们知道 CountDownLatch 类提供了一个计数器,这个计数器一般设置为线程的数量,当线程完成时,就把值减 1 ,这样,当值为 0 的时候,就表示所有线程读执行完成了。
我们可以在执行器中使用它,使用在调用 await()
方法的所有线程被通知之前可以递减的次数来初始化计数器。
例如,如果我们需要当前线程等待另外 N
个线程完成执行,我们可以使用 N
初始化锁 ( latch )
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10); CountDownLatch latch = new CountDownLatch(2); for (int i = 0; i < 2; i++) { WORKER_THREAD_POOL.submit(() -> { try { // ... latch.countDown(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } // wait for the latch to be decremented by the two remaining threads latch.await();
使用 invokeAll() 方法
我们可以用来运行线程的第一种方式是 invokeAll()
方法。该方法在所有任务完成或超时到期后,会返回一个 Future 对象列表。
此外,我们必须注意返回的 Future
对象的顺序与提供的 Callable
对象的列表相同。
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10); List<Callable<String>> callables = Arrays.asList( new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000)); long startProcessingTime = System.currentTimeMillis(); List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables); awaitTerminationAfterShutdown(WORKER_THREAD_POOL); long totalProcessingTime = System.currentTimeMillis() - startProcessingTime; assertTrue(totalProcessingTime >= 3000); String firstThreadResponse = futures.get(0).get(); assertTrue("fast thread".equals(firstThreadResponse)); String secondThreadResponse = futures.get(1).get(); assertTrue("slow thread".equals(secondThreadResponse));
使用 ExecutorCompletionService
运行多个线程的另一种方式是使用 ExecutorCompletionService
。它使用我们提供的 ExecutorService 来执行任务。
这种方式与 invokeAll()
的一个区别是返回表示执行任务的 Futures 的顺序:
-
ExecutorCompletionService 使用队列按结束顺序存储结果,也就是说,它返回的 Future 是按照执行完成时间来排序的。
-
invokeAll()
则返回一个列表,该列表顺序与提供给执行器的任务顺序相同。
CompletionService<String> service = new ExecutorCompletionService<>(WORKER_THREAD_POOL); List<Callable<String>> callables = Arrays.asList( new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000)); for (Callable<String> callable : callables) { service.submit(callable); }
返回的结果我们可以使用 take()
方法来访问
long startProcessingTime = System.currentTimeMillis(); Future<String> future = service.take(); String firstThreadResponse = future.get(); long totalProcessingTime = System.currentTimeMillis() - startProcessingTime; assertTrue("First response should be from the fast thread", "fast thread".equals(firstThreadResponse)); assertTrue(totalProcessingTime >= 100 && totalProcessingTime < 1000); LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds"); future = service.take(); String secondThreadResponse = future.get(); totalProcessingTime = System.currentTimeMillis() - startProcessingTime; assertTrue( "Last response should be from the slow thread", "slow thread".equals(secondThreadResponse)); assertTrue( totalProcessingTime >= 3000 && totalProcessingTime < 4000); LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds"); awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
结束语
根据使用的场景不同,我们可以有多种选择来等待线程完成执行:
-
当我们需要一种机制来通知一个或多个线程其他线程执行的一组操作已经完成时,CountDownLatch 非常有用。
-
当我们需要尽快访问任务结果时,ExecutorCompletionService 非常有用
-
当我们想等待所有正在运行的任务完成时,则可以使用另外的其它方法