GENGEN
主页
vuepress
  • GIT命令
  • python+django
  • vue cli搭建项目
  • babel es6转换es5
  • docker aliyun配置
  • npm 配置
  • linux 常用命令
  • Ubuntu 下Linux 命令
  • github
  • gitee
  • csdn
  • 关于我
主页
vuepress
  • GIT命令
  • python+django
  • vue cli搭建项目
  • babel es6转换es5
  • docker aliyun配置
  • npm 配置
  • linux 常用命令
  • Ubuntu 下Linux 命令
  • github
  • gitee
  • csdn
  • 关于我
  • java基础

    • JDK8 函数式编程
    • JDK8 新特性之Date-Time
    • Servlet 源码分析
    • ArrayList 源码
    • LinkedList 源码
    • HashMap 源码
    • String 源码
    • BigDecimal 源码
    • java 类的加载
    • Class 源码
    • Synchronized锁升级
    • 事务的传播机制
    • knowledge
  • JAVA WEB

    • Java Servlet
    • 权限设计
    • logback日志的链路追踪
  • DATABASE

    • MySQL EXPLAIN详解
    • MySQL 索引
    • MySQL 表锁、行锁
    • MySQL ACID与transcation
    • 分布式事务
    • MySQL MVCC机制
    • Mysql 乐观锁与悲观锁
    • 分布式锁1 数据库分布式锁
    • 分布式锁2 Redis分布式锁
    • 分布式锁3 ZK分布式锁
  • SpringCloud

    • SpringCloud服务注册中心之Eureka
    • SpringCloud服务注册中心之Zookeeper
    • SpringCloud服务调用之Ribbon
    • SpringCloud服务调用之OpenFeign
    • SpringCloud服务降级之Hystrix
    • SpringCloud服务网关之Gateway
    • SpringCloud Config分布式配置中心
    • SpringCloud服务总线之Bus
    • SpringCloud消息驱动之Stream
    • SpringCloud链路追踪之Sleuth
    • SpringCloud Alibaba Nacos
    • SpringCloud Alibaba Sentinel
  • Spring

    • SpringBoot
    • Spring-data-jpa入门
    • SpringCloud问题
    • dispatcherServlet 源码分析
    • @SpringBootApplication注解内部实现与原理
    • spring启动初始化初始化
  • 中间件

    • 分布式协调服务器Zookeeper
    • 服务治理Dubbo
    • 分布式配置管理平台Apollo
    • 消息中间件框架Kafka
    • 分布式调度平台ElasticJob
    • 可视化分析工具Kibana
    • ElacticSearch 基础
    • ElacticSearch进阶
    • ElacticSearch集成
  • 环境部署

    • 应用容器引擎Docker
    • DockerCompose服务编排
    • 负载均衡Nginx
    • Nginx的安装配置
    • K8S基础
  • 代码片段

    • listener 监听模式
    • spingboot 整合redis
    • XSS过滤
    • profile的使用
    • ConfigurationProperties注解
  • 设计模式

    • 工厂模式
    • 单例模式
    • 装饰者模式
    • 适配器模式
    • 模板方法模式
    • 观察者模式
  • 读书笔记

    • 《Spring in Action 4》 读书笔记
    • 《高性能mysql》 读书笔记
  • NoSQL

    • Redis基础
    • Redis高级
    • Redis集群
    • Redis应用
  • MQ

    • rabbitMQ基础
    • rabbitMQ高级
    • rabbitMQ集群
  • JVM

    • JVM体系架构概述
    • 堆参数调整
    • GC 分代收集算法
    • JVM 垃圾回收器
    • JVM 相关问题
  • JUC

    • JUC总览
    • volatile关键字
    • CAS
    • ABA问题
    • collections包下线程安全的集合类
    • Lock 锁
    • LockSupport
    • AQS
    • Fork/Join分支框架
    • JUC tools
    • BlockingQueue 阻塞队列
    • Executor 线程池
    • CompletableFuture
    • 死锁以及问题定位分析
  • Shell

    • shell命令
    • shell基础
  • Activiti

    • IDEA下的Activiti HelloWord
    • 流程定义的CRUD
    • 流程实例的执行
    • 流程变量
  • VUE

    • vue基础
    • vue router
    • Vuex
    • Axios 跨域
    • dialog 弹出框使用
    • vue 动态刷新页面
    • vue 封装分页组件
    • vue 动态菜单
    • vue 常用传值
  • Solidity 智能合约

    • Solidity 基础
    • Solidity ERC-20
    • Solidity 101
  • English

    • 时态

CompletableFuture 异步回调

  • 该类实现了Future和CompletionStage两个接口,该类作为一个异步任务,可以在自己异步执行完成以后触发其它的异步任务,从而达到异步回调的效果。
    • Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果
    • CompletionStage定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对结果进行转换产生新的结果等等

代码演示

模拟一段进饭店点菜场景,点完饭,开启线程饭店做饭,客户线程打游戏。等饭店线程做好了,进入主线程。

  • 第一种,炒菜和 打饭在一个线程中
/**
*第一种supplyAsync开启异步,并且cf1.join 可以获取异步结果
*/
public class CompletableDemo {

    static void sleep(Long millis) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void printTimeAndThread(String s) {
        String result = new StringBuffer("").append(System.currentTimeMillis()).append(" | \t").
                append(Thread.currentThread().getName()).append(" | \t")
                .append(s).toString();
        System.out.println(result);
    }

    public static void main(String[] args) {
        printTimeAndThread("小白进入饭店,点饭菜");
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("厨师炒菜");
            sleep(200L);

            printTimeAndThread("服务员打饭");
            return "番茄鸡蛋 + 米饭";
        });
        printTimeAndThread("小白打王者");
        printTimeAndThread(String.format("%s,小白开吃", cf1.join()));
    }

}
  • 输出结果
1629603444962 | 	main | 	小白进入饭店,点饭菜
1629603444997 | 	main | 	小白打王者 //主线程
1629603444997 | 	ForkJoinPool.commonPool-worker-1 | 	厨师炒菜 //子线程
1629603445202 | 	ForkJoinPool.commonPool-worker-1 | 	服务员打饭
1629603445212 | 	main | 	番茄鸡蛋 + 米饭,小白开吃
  • 第二种场景,厨师炒完菜,开启另一个线程,服务员打饭
/**
* supplyAsync开启异步后,打王者和厨师炒菜同时进行,
* thenCompose服务员打饭必须等待厨师炒菜完成以后拿到结果,属于两个结果的合并
* 厨师炒菜和服务员打饭是两个线程,但是服务员打饭线程开启的条件是厨师线程完成
* 如果thenCompose换成 thenApply,就开启一个线程
*/
public class CompletableDemo {

    public static void main(String[] args) {
        printTimeAndThread("小白进入饭店,点饭菜");
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("厨师炒菜");
            sleep(200L);
            return "番茄鸡蛋";
        }).thenCompose(di -> CompletableFuture.supplyAsync(() -> {
            printTimeAndThread("服务员打饭");
            sleep(200L);
            return di + " + 米饭";
        }));
        printTimeAndThread("小白打王者");
        printTimeAndThread(String.format("%s,小白开吃", cf1.join()));
    }

}
  • 输出结果
1629603323169 | 	main | 	小白进入饭店,点饭菜
1629603323201 | 	ForkJoinPool.commonPool-worker-1 | 	厨师炒菜 //线程1
1629603323201 | 	main | 	小白打王者 //主线程
1629603323410 | 	ForkJoinPool.commonPool-worker-1 | 	服务员打饭 //线程2
1629603323625 | 	main | 	番茄鸡蛋 + 米饭,小白开吃

模拟小白进入之前,米饭也没做好

/**
* 跟上一个场景相比,把thenCompose换成了thenCombine
* 这时候 厨师炒菜线程和服务员蒸饭线程是同时开启
* 最后得到两个线程的结果(dish rice)合并
*/
public static void main(String[] args) {
    printTimeAndThread("小白进入饭店,点饭菜");
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        printTimeAndThread("厨师炒菜");
        sleep(200L);
        return "番茄鸡蛋";
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
        printTimeAndThread("服务员蒸饭");
        sleep(200L);
        return " + 米饭";
    }), (dish, rice) -> {
        return String.format("%s,%s 好了", dish, rice);
    });
    printTimeAndThread("小白打王者");
    printTimeAndThread(String.format("%s,小白开吃", cf1.join()));
}
  • 执行结果
1629604126405 | 	main | 	小白进入饭店,点饭菜
1629604126439 | 	ForkJoinPool.commonPool-worker-1 | 	厨师炒菜
1629604126439 | 	ForkJoinPool.commonPool-worker-2 | 	服务员蒸饭
1629604126440 | 	main | 	小白打王者
1629604126661 | 	main | 	番茄鸡蛋, + 米饭 好了,小白开吃

Tips

  • 第二个,第三个代码其实都能用第一个这种解决,但是不容易看出来线程之间的关系,即它们之间的并发、依赖、互斥。代码量大的话,就容易出错
  • 方法
    • supplyAsync 开启 (一个异步)
    • thenCompose 连接多个任务(结果由第最后一个返回)
    • thenCombine 合并多个任务 (结果由合并函数BiFunction返回)

模拟小白回家,可以坐600路和700路,哪个先来坐哪个

/**
* applyToEither 二选一,两个线程一起运行,哪个先返回就把结果返回
*/
 public static void main(String[] args) {
        printTimeAndThread("小白等车");
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            sleep(new Double(Math.random()).longValue());
            return "600路来了";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            sleep(new Double(Math.random()).longValue());
            return "700路来了";
        }), firstComeBus -> firstComeBus);

        printTimeAndThread(String.format("%s,小白坐上车", cf1.join()));
    }

模拟一个分支出问题了,需要异常处理

/**
exceptionally 处理异常
*/
public static void main(String[] args) {
    printTimeAndThread("小白等车");
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        sleep(100L);
        return "600路来了";
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
        sleep(50L);
        if (1 + 1 == 2) {
            printTimeAndThread("撞树了");
            throw new RuntimeException("撞树了");
        }
        return "700路来了";
    }), firstComeBus -> firstComeBus).exceptionally(e -> {
        printTimeAndThread("叫出租车");
        return "出租车,到了";
    });

    printTimeAndThread(String.format("%s,小白坐上车", cf1.join()));
}

最后模拟一个场景,三个线程,m1 和 m2有依赖关系,需要m1执行完m2执行,然后线程池操作

@Data
class SourceMet {
    private String m1;

    private String m2;

    private String m3;
    
}


class Method1 {

    public SourceMet call(SourceMet m1) {
        m1.setM1("a结果获取了");
        return m1;
    }
}

/**
 * 依赖 Method1
 */
class Method2 {
    public SourceMet call(SourceMet m) {
        if (!Objects.isNull(m.getM1())){
            m.setM2("b结果获取了");
        }
        return m;
    }
}


class Method3 {
    public SourceMet call(SourceMet m) {
        m.setM3("c结果获取了");
        return m;
    }
}

public class CompletableDemo {

    static void sleep(Long millis) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void printTimeAndThread(String s) {
        String result = new StringBuffer("").append(System.currentTimeMillis()).append(" | \t").
                append(Thread.currentThread().getName()).append(" | \t")
                .append(s).toString();
        System.out.println(result);
    }

    /**
     * 三个线程同时执行,
     * Method2依赖Method1的结果
     *
     * @param args
     */
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        SourceMet met = new SourceMet();

        Method1 m1 = new Method1();
        Method2 m2 = new Method2();
        Method3 m3 = new Method3();
        executor.execute(() -> {
            m3.call(met);
        });
        CompletableFuture cf1 = CompletableFuture.supplyAsync(new Supplier<SourceMet>() {
            @Override
            public SourceMet get() {
                return m1.call(met);
            }
        }, executor).thenCompose(a -> CompletableFuture.supplyAsync(new Supplier<SourceMet>() {
            @Override
            public SourceMet get() {
                return m2.call(met);
            }
        }, executor));

        System.out.println(met.toString());
        
    }
}
Last Updated:
Contributors: wal365@126.com
Prev
Executor 线程池
Next
死锁以及问题定位分析