简述CompletableFuture异步任务编排
创始人
2024-02-06 06:34:58
0

前言

在之前的项目开发中,都没怎么使用过CompletableFuture的功能,只听说过和异步编程有关。为了能够在将来有需要的时候用得上,这两天花了点时间学习了一下,并简单地总结一下如何使用CompletableFuture完成异步任务编排。

先创建一个自定义的线程池,后续所有代码都会使用到:

  private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() {private final AtomicInteger THREAD_NUM = new AtomicInteger(1);
​@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);
//      设置为守护线程,main线程结束就跟着一起结束,否则main函数结束jvm还在t.setDaemon(true);t.setName("completable-future-test-Thread-" + THREAD_NUM.incrementAndGet());return t;}}, new ThreadPoolExecutor.AbortPolicy());
复制代码

同步串行

同步串行代表任务1、任务2、任务3按时间先后顺序执行,并且都是同一个线程来执行。

示例代码如下:

CompletableFuture.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task1";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR).thenApply((task1Result) -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task2";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println("拿到上一个任务的返回值:" + task1Result);System.out.println(taskName + "执行结束");return taskName;}).thenAccept((task2Result) -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task3";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println("拿到上一个任务的返回值:" + task2Result);System.out.println(taskName + "执行结束");});
复制代码

执行结果:

completable-future-test-Thread-2开始执行任务:task1
正在执行任务task1
task1执行结束
completable-future-test-Thread-2开始执行任务:task2
正在执行任务task2
拿到上一个任务的返回值:task1
task2执行结束
completable-future-test-Thread-2开始执行任务:task3
正在执行任务task3
拿到上一个任务的返回值:task2
task3执行结束
复制代码

1.入口函数supplyAsync()代表一个异步的有返回值的函数,之所以异步,是与主线程区别,从线程池中的拿一个线程来执行。

2.thenApply()thenAccept()没有Async,意味着是和前面的任务共用一个线程,从执行结果上我们也可以看到线程名称相同。

3.thenApply()需要接收上一个任务的返回值,并且自己也要有返回值。

4.thenAccept()需要接收上一个任务的返回值,但是它不需要返回值。

异步串行

异步串行代表任务1、任务2、任务3按时间先后顺序执行,并由不同的线程来执行。

示例代码如下:

    CompletableFuture// 有返回值.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task1";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR)// 需要上一个任务的返回值,并且自身有返回值.thenApplyAsync((task1Result) -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task2";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println("拿到上一个任务的返回值:" + task1Result);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR)// 不需要上一个任务的返回值,自身也没有返回值.thenRunAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task3";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println("thenRunAsync()不需要上一个任务的返回值");System.out.println(taskName + "执行结束");}, THREAD_POOL_EXECUTOR);
复制代码

执行结果如下:

completable-future-test-Thread-2开始执行任务:task1
正在执行任务task1
task1执行结束
completable-future-test-Thread-3开始执行任务:task2
正在执行任务task2
拿到上一个任务的返回值:task1
task2执行结束
completable-future-test-Thread-4开始执行任务:task3
正在执行任务task3
thenRunAsync()不需要上一个任务的返回值
task3执行结束
复制代码

1.入口函数依然是supplyAsync(),需要传入一个有返回值的函数作为参数;如果想要没有返回值的函数传进来的话,可以使用CompletableFuture.runAsync();

2.thenApplyAsync()thenRunAsync()分别表示里面的任务都是异步执行的,和执行前面的任务不是同一个线程;

3.thenRunAsync()需要传入一个既不需要参数,也没有返回值的任务;

并行任务

并行代表任务1、任务2、任务3没有依赖关系,分别由不同的线程执行;

示例代码如下:

    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task1";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);CompletableFuture future2 = CompletableFuture.runAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task2";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");}, THREAD_POOL_EXECUTOR);CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task3";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);
复制代码

执行结果如下:

completable-future-test-Thread-4开始执行任务:task3
completable-future-test-Thread-2开始执行任务:task1
completable-future-test-Thread-3开始执行任务:task2
正在执行任务task3
task3执行结束
正在执行任务task2
正在执行任务task1
task2执行结束
task1执行结束
复制代码

一看执行结果,明显是乱序的,并且三个任务分别由三个线程执行,符合咱们的预期;注意异步的方法后面都是带有Async关键字的;

多任务结果合并计算

  • 两个任务结果的合并

任务3的执行依赖于任务1、任务2的返回值,并且任务1和任务3由同一个线程执行,任务2单独一个线程执行;

示例代码如下:

    CompletableFuture// 任务1.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task1";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR).thenCombine(CompletableFuture// 任务2.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task2";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR),// 任务3(task1Result, task2Result) -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task3";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("task1结果:" + task1Result + "\ttask2结果:" + task2Result);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;});
复制代码

执行结果如下:

completable-future-test-Thread-3开始执行任务:task2
completable-future-test-Thread-2开始执行任务:task1
正在执行任务task1
正在执行任务task2
task2执行结束
task1执行结束
completable-future-test-Thread-2开始执行任务:task3
task1结果:task1 task2结果:task2
正在执行任务task3
task3执行结束
复制代码

CompletableFuture提供了thenCombine()来合并另一个CompletableFuture的执行结果,所以thenCombine()需要两个参数,第一个参数是另一个CompletableFuture,第二个参数会收集前两个任务的返回值,类似下面这样:

(result1,result2)->{// 执行业务逻辑return result3;
}
复制代码

如果小伙伴们想要实现任务3也是单独的线程执行的话,可以使用thenCombineAsync()这个方法。代码如下:

    CompletableFuture// 任务1.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task1";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR)
​.thenCombineAsync(CompletableFuture// 任务2.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task2";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return 2;}, THREAD_POOL_EXECUTOR),// 任务3(task1Result, task2Result) -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task3";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("task1结果:" + task1Result + "\ttask2结果:" + task2Result);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return 2L;}, THREAD_POOL_EXECUTOR);
复制代码

如果任务3中不需要返回结果,可以使用thenAcceptBoth()thenAcceptBothAsync(),使用方式与thenCombineAsync()类似;

  • 多任务结果合并

示例代码如下:

    CompletableFuture future1 = CompletableFuture// 任务1.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task1";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);CompletableFuture future2 = CompletableFuture// 任务2.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task2";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);CompletableFuture future3 = CompletableFuture// 任务3.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task3";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};CompletableFuture.allOf(futures)// 任务4.whenCompleteAsync((v, e) -> {List values = new ArrayList<>();for (CompletableFuture future : futures) {try {values.add(future.get());} catch (Exception ex) {}}Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task4";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("前面任务的处理结果:" + values);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");}, THREAD_POOL_EXECUTOR);
复制代码 

执行结果如下:

completable-future-test-Thread-3开始执行任务:task2
completable-future-test-Thread-4开始执行任务:task3
completable-future-test-Thread-2开始执行任务:task1
正在执行任务task2
正在执行任务task3
正在执行任务task1
task2执行结束
task3执行结束
task1执行结束
completable-future-test-Thread-2开始执行任务:task4
前面任务的处理结果:[task1, task2, task3]
正在执行任务task4
task4执行结束
复制代码

之所以最后任务4的线程是completable-future-test-Thread-2,那是因为线程池的核心线程数设置为3,线程数设置高一点就会创建新的线程处理;

从上述代码示例中,我们可以收获到另一个知识点:allOf(),它的作用是要求所有的任务全部完成才能执行后面的任务。

任一任务完成

在一批任务中,只要有一个任务完成,那么就可以向后继续执行其他任务。

为了代码演示无异议,后续代码中,我们把线程数提升到4。

示例代码如下:

    CompletableFuture future1 = CompletableFuture// 任务1.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task1";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);CompletableFuture future2 = CompletableFuture// 任务2.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task2";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);CompletableFuture future3 = CompletableFuture// 任务3.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task3";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);CompletableFuture.anyOf(future1, future2, future3).thenApplyAsync((taskResult) -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task4";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("前面任务的处理结果:" + taskResult);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);
复制代码

执行结果如下:

completable-future-test-Thread-2开始执行任务:task1
completable-future-test-Thread-4开始执行任务:task3
completable-future-test-Thread-3开始执行任务:task2
正在执行任务task3
正在执行任务task2
正在执行任务task1
task1执行结束
task3执行结束
task2执行结束
completable-future-test-Thread-5开始执行任务:task4
前面任务的处理结果:task1
正在执行任务task4
task4执行结束
复制代码

可以看到,任务1第一个结束,所以任务4中接收到的执行结果就是任务1的返回值。

快速失败

在一批任务当中,只要有任意一个任务执行产生异常了,那么就直接结束;否则就要等待所有任务成功执行完毕。

示例代码如下:

    CompletableFuture future1 = CompletableFuture// 任务1.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task1";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR);CompletableFuture future2 = CompletableFuture// 任务2.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task2";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");
​throw new RuntimeException("任务2异常!");}, THREAD_POOL_EXECUTOR);CompletableFuture future3 = CompletableFuture// 任务3.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task3";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");throw new RuntimeException("任务3异常!");}, THREAD_POOL_EXECUTOR);CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};CompletableFuture allCompletableFuture = CompletableFuture.allOf(futures);// 创建一个任务来监听异常CompletableFuture anyException = new CompletableFuture<>();for (CompletableFuture completableFuture : futures) {completableFuture.exceptionally((t) -> {// 任何一个任务异常都会让anyException任务完成anyException.completeExceptionally(t);return null;});}// 要么allCompletableFuture全部成功,要么一个出现异常就结束任务CompletableFuture.anyOf(allCompletableFuture, anyException).whenComplete((value, exception) -> {if (Objects.nonNull(exception)) {System.out.println("产生异常,提前结束!");exception.printStackTrace();return;}System.out.println("所有任务正常完成!");});
复制代码

执行结果如下:

completable-future-test-Thread-2开始执行任务:task1
completable-future-test-Thread-3开始执行任务:task2
completable-future-test-Thread-4开始执行任务:task3
正在执行任务task2
正在执行任务task3
正在执行任务task1
task2执行结束
task1执行结束
task3执行结束
产生异常,提前结束!
java.util.concurrent.CompletionException: java.lang.RuntimeException: 任务2异常!at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: 任务2异常!at com.example.awesomerocketmq.completable.CompletableFutureTest.lambda$t$1(CompletableFutureTest.java:53)at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)... 3 more
​
复制代码

CompletableFuture没有现成的api实现快速失败的功能,所以我们只能结合allOf()anyOf()来逻辑来自定义方法完成快速失败的逻辑;

1.我们需要额外创建一个CompletableFuture来监听所有的CompletableFuture,一旦其中一个CompletableFuture产生异常,我们就设置额外的CompletableFuture立即完成。

2.把所有的CompletableFuture和额外的CompletableFuture放在anyOf()方法中,这样一旦额外的CompletableFuture完成,说明产生异常了;否则就需要等待所有的CompletableFuture完成。

注意

  • 异常处理

最后需要注意的是,所有的CompletableFuture任务一定要加上异常处理:

    CompletableFuture// 任务1.supplyAsync(() -> {Thread currentThread = Thread.currentThread();String ThreadName = currentThread.getName();String taskName = "task1";System.out.println(ThreadName + "开始执行任务:" + taskName);System.out.println("正在执行任务" + taskName);System.out.println(taskName + "执行结束");return taskName;}, THREAD_POOL_EXECUTOR).whenComplete((v,e)->{if(Objects.nonNull(e)){// todo// 处理异常}if(Objects.nonNull(v)){// todo}});
复制代码

还可以通过另外两个方法处理:exceptionally()或者handle()

  • 自定义线程池

CompletableFuture默认的线程池是ForkJoinThreadPool,建议大家在使用的时候尽可能地使用自定义线程池,这样方便后续的代码优化以及相关的日志查看。

相关内容

热门资讯

“国潮范”闪耀世运,巴蜀魅力璀...   蜀绣飞针,绣出芙蓉叠影  竹编经纬,勾勒山水清韵  青铜不语,叩响古蜀秘境  川剧变脸,刹那烟火...
(抗战胜利80周年)访台湾雾峰...   在台湾台中市,有一处具百余年历史的建筑群——雾峰林家宅园。这里不仅是知名望族居所,也记录着雾峰林...
赏古乐、做扎染……这个暑假沉浸...   眼下正值暑期,各地依托非遗场馆和资源,开展内容丰富的传统文化体验、普及活动,让人们在沉浸式体验中...
小“票根”如何成为激活消费的“...   原标题:小“票根”成为“金钥匙”(新视窗·新供给引领新消费新需求)  在江苏南京溧水区天生桥景区...
【世界说】美国学者:于美国35...   中国日报网8月6日电 澳大利亚“对话”新闻网(The Conversation)4日刊发文章称,...
铭记历史 缅怀先烈 | 红色底...   央视网消息:铭记历史、缅怀英烈,今天(8月6日)的《抗日根据地·今昔巨变》系列报道,我们聚焦鄂豫...
决胜“十四五” 打好收官战|路...   交通,是经济发展的强劲引擎,也是联通万家的民生通途。  “十四五”期间,我国“6轴7廊8通道”国...
丈夫因为没吃到鸡蛋不停抱怨 最...   四川一女子做了一大桌菜,丈夫因鸡蛋被侄子吃了,没吃到鸡蛋喋喋不休抱怨,最终女子崩溃掀桌。(编辑 ...
女子商场掀门帘的瞬间 小偷从背...   8月6日(发布),两女子在商场门口趁顾客掀帘子从背后2秒就偷走手机,目前失主已报案。(编辑:杨杨...
聚焦暑期安全:警惕不法分子利用...   原标题:租借学生微信号、盗取儿童电话卡、以“兼职”名义诱导拨打诈骗电话  警惕!不法分子利用中小...