博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java8-CompleableFuture的使用1
阅读量:4210 次
发布时间:2019-05-26

本文共 11877 字,大约阅读时间需要 39 分钟。

背景

1. 硬件的极速发展,多核心CPU司空见惯;分布式的软件架构司空见惯;

2. 功能API大多采用混聚的方式把基础服务的内容链接在一起,方便用户生活。

抛出了两个问题:

1. 如何发挥多核能力;

2. 切分大型任务,让每个子任务并行运行;

并发和并行的区别

项目 区别1 实现技术
并行 每个任务跑在单独的cpu核心上 分支合并框架,并行流
并发 不同任务共享cpu核心,基于时间片调度 CompletableFuture

Future接口

java5开始引入。将来某个时刻发生的事情进行建模。

进行一个异步计算,返回一个执行运算的结果引用,当运算结束后,这个引用可以返回给调用方。

可以使用Future把哪些潜在耗时的任务放到异步线程中,让主线程继续执行其他有价值的工作,不在白白等待。

下面是一个例子:使用Future,可以让两个任务并发的运行,然后汇聚结果;

package com.test.completable;import com.google.common.base.Stopwatch;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;/** * 说明:Future应用实例 * @author carter * 创建时间: 2019年11月18日 10:53 **/public class FutureTest {    static final ExecutorService pool = Executors.newFixedThreadPool(2);    public static void main(String[] args) {        Stopwatch stopwatch = Stopwatch.createStarted();        Future
longFuture = pool.submit(() -> doSomethingLongTime()); doSomething2(); try { final Long longValue = longFuture.get(3, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() " future return value :" longValue " : " stopwatch.stop()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } pool.shutdown(); } private static void doSomething2() { Stopwatch stopwatch = Stopwatch.createStarted(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() " doSomething2 :" stopwatch.stop()); } private static Long doSomethingLongTime() { Stopwatch stopwatch = Stopwatch.createStarted(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() " doSomethingLongTime : " stopwatch.stop()); return 1000L; }}

没法编写简介的并发代码。描叙能力不够;比如如下场景:

  1. 将两个异步计算的结果合并为一个,这两个异步计算之间互相独立,但是第二个有依赖第一个结果。
  2. 等待Future中所有的任务都完成;
  3. 仅等待Future集合中最快结束的任务完成,并返回它的结果;
  4. 通过编程的方式完成一个Future任务的执行;
  5. 响应Future的完成事件。

基于这个缺陷,java8中引入了CompletableFuture 类;

实现异步API

技能点:

  1. 提供异步API;
  2. 修改同步的API为异步的API,如何使用流水线把两个任务合并为一个异步计算操作;
  3. 响应式的方式处理异步操作的完成事件;
类型 区别 是否堵塞
同步API 调用方在被调用运行的过程中等待,被调用方运行结束后返回,调用方取得返回值后继续运行 堵塞
异步API 调用方和被调用方是异步的,调用方不用等待被调用方返回结果 非堵塞

package com.test.completable;import com.google.common.base.Stopwatch;import com.google.common.base.Ticker;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;/** * 说明:异步调用计算价格的方法 * @author carter * 创建时间: 2019年11月18日 13:32 **/public class Test {    public static void main(String[] args) {        Shop shop = new Shop("BestShop");        Stopwatch stopwatch = Stopwatch.createStarted();        Stopwatch stopwatch2 = Stopwatch.createStarted();        Future
doubleFuture = shop.getPriceFuture("pizza"); System.out.println("getPriceFuture return after: " stopwatch.stop()); doSomethingElse(); try{ final Double price = doubleFuture.get(); System.out.println("price is " price " return after: " stopwatch2.stop()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } private static void doSomethingElse() { Stopwatch stopwatch = Stopwatch.createStarted(); DelayUtil.delay(); System.out.println("doSomethingElse " stopwatch.stop()); }}

错误处理

如果计算价格的方法产生了错误,提示错误的异常会被现在在试图计算商品价格的当前线程的范围内,最终计算的异步线程会被杀死,这会导致get方法返回结果的客户端永久的被等待。

如何避免异常被掩盖, completeExceptionally会把CompletableFuture内发生的问题抛出去。

private static void test2() {        Shop shop = new Shop("BestShop");        Stopwatch stopwatch = Stopwatch.createStarted();        Stopwatch stopwatch2 = Stopwatch.createStarted();        Future
doubleFuture = shop.getPriceFutureException("pizza"); System.out.println("getPriceFuture return after: " stopwatch.stop()); doSomethingElse(); try{ final Double price = doubleFuture.get(); System.out.println("price is " price " return after: " stopwatch2.stop()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }

方法改造:

//异步方式查询产品价格,异常抛出去    public Future
getPriceFutureException(String product){ final CompletableFuture
doubleCompletableFuture = new CompletableFuture<>(); new Thread(()->{try { doubleCompletableFuture.complete(alculatePriceException(product)); }catch (Exception ex){ doubleCompletableFuture.completeExceptionally(ex); } }).start(); return doubleCompletableFuture; }

无堵塞

即让多个线程去异步并行或者并发的执行任务,计算完之后汇聚结果;

private static void test3(String productName) {        Stopwatch stopwatch = Stopwatch.createStarted();        final List
stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(item -> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName))) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test3 done in " stopwatch.stop()); } private static void test3_parallelStream(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List
stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .parallel() .map(item -> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName))) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test3_parallelStream done in " stopwatch.stop()); } private static void test3_completableFuture(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List
stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName)))) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test3_completableFuture done in " stopwatch.stop()); } private static void test3_completableFuture_pool(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List
stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName)),pool)) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test3_completableFuture done in " stopwatch.stop()); }

代码中有一个简单的计算场景,我想查询4家商店的iphone11售价;

华强北,益田苹果店,香港九龙城,京东商城;

每一家的查询大概耗时1s;

任务处理方式 耗时 优缺点说明
顺序执行 4秒多 简单,好理解
并行流 1秒多 无法定制流内置的线程池,使用简单,改造简单
CompletableFuture 默认线程池 2秒多 默认线程池
CompletableFuture 指定线程池 1秒多 指定了线程池,可定制性更好,相比于并行流

多个异步任务的流水线操作

场景: 先计算价格,在拿到折扣,最后计算折扣价格;

private static void test4(String productName) {        Stopwatch stopwatch = Stopwatch.createStarted();        final List
stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(shop->shop.getPrice_discount(productName)) .map(Quote::parse) .map(DisCount::applyDiscount) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test4 done in " stopwatch.stop()); } private static void test4_completableFuture(String productName) { Stopwatch stopwatch = Stopwatch.createStarted(); final List
stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")) .map(shop->CompletableFuture.supplyAsync(()->shop.getPrice_discount(productName),pool)) .map(future->future.thenApply( Quote::parse)) .map(future->future.thenCompose(quote -> CompletableFuture.supplyAsync(()->DisCount.applyDiscount(quote),pool))) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println(stringList); System.out.println("test4_completableFuture done in " stopwatch.stop()); }

以上是有依赖关系的两个任务的聚合,即任务2,依赖任务1的结果。使用的是thenCompose方法;

接下来如果有两个任务可以异步执行,最后需要依赖着两个任务的结果计算得到最终结果,采用的是thenCombine;

//两个不同的任务,最后需要汇聚结果,采用combine    private static void test5(String productName) {        Stopwatch stopwatch = Stopwatch.createStarted();        Shop shop = new Shop("香港九龙");      Double pricefinal =  CompletableFuture.supplyAsync(()->shop.getPrice(productName))                .thenCombine(CompletableFuture.supplyAsync(shop::getRate),(price, rate)->price * rate).join();        System.out.println("test4 done in  "   stopwatch.stop());    }

completion事件

让任务尽快结束,无需等待;有多个服务来源,你请求多个,谁先返回,就先响应;

结果依次返回:

//等待所有的任务执行完毕; CompletableFuture.allOf()    public void findPriceStream(String productName){        List
shops = Arrays.asList(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城")); final CompletableFuture[] completableFutureArray = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice_discount(productName), pool)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> DisCount.applyDiscount(quote), pool))) .map(f -> f.thenAccept(System.out::println)) .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(completableFutureArray).join(); }

多个来源获取最快的结果:

//有两个获取天气的途径,哪个快最后结果就取哪一个    public static void getWeather(){        final Object join = CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> a_weather()), CompletableFuture.supplyAsync(() -> b_weather())).join();        System.out.println(join);    }    private static String b_weather() {        DelayUtil.delay(3);        return "bWeather";    }    private static String a_weather() {        DelayUtil.delay(5);        return "aWeather";    }

源码分析

可完备化的将来;CompletableFuture ;

先看签名:

public class CompletableFuture
implements Future
, CompletionStage
{}

实现了Futrue,CompletionStage接口;这两个接口简单说明一下:

接口 关键特性
Future 直接翻译为未来,标识把一个任务异步执行,需要的的时候,通过get方法获取,也可以取消cancel,此外还提供了状态查询方法,isDone, isCancled,实现类是FutureTask
CompletionStage 直接翻译是完成的阶段,提供了函数式编程方法

可以分为如下几类方法

里面的实现细节后面单独成文章再讲。

小结

1. 执行一些比较耗时的操作,尤其是依赖一个或者多个远程服务的操作,可以使用异步任务改善程序的性能,加快程序的响应速度;

2. 使用CompletableFuture你可以轻松的实现异步API;

3. CompletableFuture提供了异常管理机制,让主线程有机会接管子任务抛出的异常;

4. 把同步API封装到CompletableFuture中,可以异步得到它的结果;

5. 如果异步任务之间互相独立,而他们之间的某一些结果是另外一些的输入,可以把这些任务进行compose;

6. 可以为CompletableFuture中的任务注册一个回调函数,当任务执行完毕之后再进行一些其它操作;

7. 你可以决定什么时候结束程序的运行,是所有的CompletableFuture任务所有对象执行完毕,或者只要其中任何一个完成即可。

原创不易,转载请注明出处,欢迎多沟通交流

你可能感兴趣的文章
欢迎加入【亿能测试快讯】邮件列表!
查看>>
为什么我们的自动化测试“要”这么难
查看>>
LoadRunner性能脚本开发实战训练
查看>>
测试之途,前途?钱途?图何?
查看>>
adb常用命令
查看>>
通过LR监控Linux服务器性能
查看>>
通过FTP服务的winsockes录制脚本
查看>>
LRwinsocket协议测试AAA服务器
查看>>
Net远程管理实验
查看>>
反病毒专家谈虚拟机技术 面临两大技术难题
查看>>
几种典型的反病毒技术:特征码技术、覆盖法技术等
查看>>
Software Security Testing软件安全测试
查看>>
SQL注入漏洞全接触--进阶篇
查看>>
SQL注入漏洞全接触--高级篇
查看>>
SQL注入法攻击一日通
查看>>
论文浅尝 | 通过共享表示和结构化预测进行事件和事件时序关系的联合抽取
查看>>
论文浅尝 | 融合多粒度信息和外部语言知识的中文关系抽取
查看>>
论文浅尝 | GMNN: Graph Markov Neural Networks
查看>>
廖雪峰Python教程 学习笔记3 hello.py
查看>>
从内核看epoll的实现(基于5.9.9)
查看>>