02-JUC进阶-CompletableFuture

一、Future接口概述

Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消异步任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

举例:比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙完其他事情或者先执行完,过了一会再才去获取子任务的执行结果或变更的任务状态。

二、Future接口实现类FutureTask

1. Future接口能干什么

Future是 Java5 新加的一个接口,它提供一种异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们会就可以通过Future把这个任务放进异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。

Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

2. Future接口相关架构

Future接口架构

  • 目的:异步多线程任务执行且返回有结果,三个特点:多线程、有返回、异步任务(班长为老师去买水作为新启动的异步多线程任务且买到水有结果返回)

  • 代码实现:Runnable接口+Callable接口+Future接口和FutureTask实现类。

    public class FutureTaskDemo3 {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureTask<String> futureTask = new FutureTask<>(new MyThread());
            Thread thread = new Thread(futureTask, "t1");
            // 启动一个新的线程
            thread.start();
    
            System.out.println(futureTask.get());
        }
    }
    
    class MyThread implements Callable<String> {
    
        @Override
        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + " come in ......");
            return "Hello Callable";
        }
    }

3. Future编码实战和优缺点分析

优点

Future+线程池异步多线程任务配合,能显著提高程序的运行效率。

缺点

  • get() 阻塞—一旦调用get()方法求结果,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,如果没有计算完成容易程序堵塞。
public class FutureTaskDemo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("-----come in FutureTask");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "" + ThreadLocalRandom.current().nextInt(100);
        });

        Thread t1 = new Thread(futureTask, "t1");
        t1.start();

        //3秒钟后才出来结果,还没有计算你提前来拿(只要一调用get方法,对于结果就是不见不散,会导致阻塞)
        //System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get());

        //3秒钟后才出来结果,我只想等待1秒钟,过时不候
        System.out.println(Thread.currentThread().getName() + "\t" + futureTask.get(1L, TimeUnit.SECONDS));

        System.out.println(Thread.currentThread().getName() + "\t" + " run... here");

    }
}
  • isDone() 轮询—轮询的方式会耗费无谓的cpu资源,而且也不见得能及时得到计算结果,如果想要异步获取结果,通常会以轮询的方式去获取结果,尽量不要阻塞。
public class FutureTaskDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("-----come in FutureTask");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "" + ThreadLocalRandom.current().nextInt(100);
        });

        new Thread(futureTask, "t1").start();

        System.out.println(Thread.currentThread().getName() + "\t" + "线程完成任务");

        /**
         * 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
         */
        while (true) {
            if (futureTask.isDone()) {
                System.out.println(futureTask.get());
                break;
            }
        }
    }
}

结论:Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

4. 完成复杂的任务

  • 回调通知(Future支持)

    • 应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
    • 通过轮询的方式去判断任务是否完成这样非常占cpu并且代码也不优雅
  • 创建异步任务:Future+线程池组合

    public class FutureTaskDemo4 {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 创建线程池
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            
            FutureTask<String> futureTask = new FutureTask<>(() -> {
                System.out.println(Thread.currentThread().getName() + "-----come in FutureTask");
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "" + ThreadLocalRandom.current().nextInt(100);
            });
    
            // 异步提交任务
            executorService.submit(futureTask);
    
            System.out.println(Thread.currentThread().getName() + "\t" + "线程完成任务");
    
            /**
             * 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
             */
            while (true) {
                if (futureTask.isDone()) {
                    System.out.println(futureTask.get());
                    break;
                }
            }
    
            // 关闭线程池
            executorService.shutdown();
        }
    }
  • 多个任务前后依赖可以组合处理(Future不支持)

    • 想将多个异步任务的结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值
    • 想将两个或多个异步计算合并成为一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果
  • 对计算速度选最快的(Future不支持)

    • 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果

结论

  • 对于简单的业务场景使用 Future 完全 ok
  • 对于复杂业务场景,使用 Future 之前提供的 API 处理起来不够优雅,这时候需要使用 CompletableFuture 以声明式的方式优雅的处理这些需求。
  • Future能干的,CompletableFuture 都能干

三、CompletableFuture对Future的改进

1. CompletableFuture为什么会出现

  • get() 方法在 Future 计算完成之前会一直处在阻塞状态下,阻塞的方式和异步编程的设计理念相违背。
  • isDone() 方法容易耗费 cpu 资源(cpu空转),
  • 对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果

jdk8 设计出 CompletableFuture,CompletableFuture 提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

2. CompletableFuture 和 CompletionStage 介绍

类架构说明

CompletableFuture

CompletableFuture

  • 接口CompletionStage
    • 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。
      • 有些类似Linux系统的管道分隔符传参数
    • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
    • 一个阶段的计算执行可以是一个 function, Consumer 或者 Runnable。
      • 比如: stage.thenApply(x -> square(x).thenAccept(x -> System.out.print(x)).thenRun(0)-> system.out.println())
  • 类CompletableFuture
    • 在Java8中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法,
    • 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段 (CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
    • 它实现了 Future 和 CompletionStage 接口

3. 四个核心静态方法,创建异步任务

四个静态构造方法

CompletableFuture静态方法

对于上述Executor参数说明:若没有指定,则使用默认的 ForkJoinPoolCommonPool() 作为它的线程池执行异步代码,如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

3.1 四个静态方法演示

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1,
                20,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(50),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());


        // 无返回值,默认线程池
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + "无返回值,默认线程池:ForkJoinPool");
        });
        System.out.println("future1 ==> " + future1.get());
        /*
        * 打印:
        ForkJoinPool.commonPool-worker-1无返回值,默认线程池:ForkJoinPool
        future1 ==> null
        * */
        System.out.println("----------------------------------------------------------");

        // 无返回值,自定义线程池
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + "无返回值,自定义线程池");
        }, threadPoolExecutor);
        System.out.println("future2 ==> " + future2.get());
        /*
        * 打印:
        pool-1-thread-1无返回值,自定义线程池
        future2 ==> null
        * */
        System.out.println("----------------------------------------------------------");

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + "有返回值,默认线程池:ForkJoinPool");
            return "有返回值,默认线程池:ForkJoinPool";
        });
        System.out.println("future3 ==> " + future3.get());
        /*
        * 打印:
        ForkJoinPool.commonPool-worker-1有返回值,默认线程池:ForkJoinPool
        future3 ==> 有返回值,默认线程池:ForkJoinPool
        * */
        System.out.println("----------------------------------------------------------");

        CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + "有返回值,自定义线程池");
            return "有返回值,自定义线程池";
        }, threadPoolExecutor);
        System.out.println("future4 ==> " + future4.get());
        /*
        * 打印:
        pool-1-thread-1有返回值,自定义线程池
        future4 ==> 有返回值,自定义线程池
        * */

        // 关闭线程池
        threadPoolExecutor.shutdown();
    }
}

3.2 回调方法演示

CompletableFuture 减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws InterruptedException {
        // 自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
                20,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(5),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " 线程运行中...");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            if (result > 5) {
                int i = 10 / 0;
            }
            return result;
        }, threadPoolExecutor).whenComplete((v, e) -> { // 当计算完成时调用
            if (e == null) {
                System.out.println(Thread.currentThread().getName() + " 线程计算完成,结果为:" + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            String msg = String.format("%s 线程计算出现异常,原因:%s, 错误信息:%s", Thread.currentThread().getName(), e.getCause(), e.getMessage());
            System.out.println(msg);
            return -1;
        });

        System.out.println(Thread.currentThread().getName() + " 线程先去完成其他任务");

        // 线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        TimeUnit.SECONDS.sleep(2);
        // 关闭自定义线程池
        threadPoolExecutor.shutdown();
    }
}
/*
无异常时打印如下:
pool-1-thread-1 线程运行中...
main 线程先去完成其他任务
pool-1-thread-1 线程计算完成,结果为:2
* */

/*
有异常时打印如下:
pool-1-thread-1 线程运行中...
main 线程先去完成其他任务
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.lang.ArithmeticException: / by zero
	at com.atguigu.juc.cf.CompletableFutureUseDemo.lambda$main$0(CompletableFutureUseDemo.java:32)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	... 3 more
pool-1-thread-1 线程计算出现异常,原因:java.lang.ArithmeticException: / by zero, 错误信息:java.lang.ArithmeticException: / by zero
* */

CompletableFuture优点:

  • 异步任务结束时,会自动回调某个对象的方法
  • 主线程设置好回调后,不用关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法

四、函数式编程

Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式编程

1. 函数式接口

  • Runnable:无参数、无返回值

    Runnable

  • Function:接受一个参数,并且有返回值

    Function

  • Consumer:接受一个参数,没有返回值

    Consumer

  • BiConsumer:接受两个参数,没有返回值

    BiConsumer

  • Supplier:没有参数,有返回值

    Supplier

总结:

函数式接口名称 方法名称 参数 返回值
Runnable run 无参数 无返回值
Function apply 1个参数 有返回值
Consume accept 1个参数 无返回值
Supplier get 没有参数 有返回值
Biconsumer accept 2个参数 无返回值

2. 链式编程

public class Chain {
    public static void main(String[] args) {
        //-------------------老式写法------------
//        Student student = new Student();
//        student.setId(1);
//        student.setMajor("cs");
//        student.setName("小卡");
        new Student().setId(1).setName("大卡").setMajor("cs");
    }
    
}

@NoArgsConstructor
@AllArgsConstructor
@Data
@Accessors(chain = true)//开启链式编程
class Student{
    private int id;
    private String name;
    private String major;
}

五、join 和 get 对比

join 与 get 功能几乎一样,区别在于编码时是否需要抛出异常

  • get() 方法需要抛出异常
  • join() 方法不需要抛出异常
public class Chain {
    public static void main(String[] args) throws ExecutionException, InterruptedException { //抛出异常
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello 12345";
        });
        System.out.println(completableFuture.get());
    }

}

public class Chain {
    public static void main(String[] args)  {// 不用抛出异常
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello 12345";
        });
        System.out.println(completableFuture.join());
    }
}

六、CompletableFuture 案例讲解

1. 需求说明

电商网站比价需求分析:

  1. 需求说明:

    1. 同一款产品,同时搜索出同款产品在各大电商平台的售价
    2. 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
  2. 输出返回:

    1. 出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List

例如:《Mysql》 in jd price is 88.05 《Mysql》 in taobao price is 90.43

  1. 解决方案,对比同一个产品在各个平台上的价格,要求获得一个清单列表

    1. step by step,按部就班,查完淘宝查京东,查完京东查天猫….
    2. all in,万箭齐发,一口气多线程异步任务同时查询

2. 不使用多线程实现需求

从不同的网站逐个查询,直到所有网站查询完成,返回结果。

public class NetMallCase {

    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("pdd"),
            new NetMall("taobao"),
            new NetMall("dangdangwang"),
            new NetMall("tmall")
    );

    /**
     * 不使用多线程,获取图书在不同平台的价格
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByStep(List<NetMall> list, String productName) {
        List<String> result = list.stream().map(netMall -> {
            String str = String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName));
            return str;
        }).collect(Collectors.toList());
        return result;
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        List<String> resuts = getPriceByStep(list, "mysql");
        for (String data : resuts) {
            System.out.println(data);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("不使用多线程,costTime: " + (endTime - startTime) + " 毫秒");
    }

}

class NetMallData {
    @Getter
    private String mallName;

    public NetMallData(String mallName) {
        this.mallName = mallName;
    }

    public double calcPrice(String productName) {
        //检索需要1秒钟
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

执行结果:

mysql in jd price is 109.95
mysql in pdd price is 109.86
mysql in taobao price is 109.79
mysql in dangdangwang price is 110.08
mysql in tmall price is 110.55
不使用多线程,costTime: 5070 毫秒

多个查询逐个完成,执行效率低,执行时间长。

3. 使用 CompletableFuture 实现需求

public class NetMallCase {

    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("pdd"),
            new NetMall("taobao"),
            new NetMall("dangdangwang"),
            new NetMall("tmall")
    );

    /**
     * 不使用多线程,获取图书在不同平台的价格
     *
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByStep(List<NetMall> list, String productName) {
        List<String> result = list.stream().map(netMall -> {
            String str = String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName));
            return str;
        }).collect(Collectors.toList());
        return result;
    }

    /**
     * 使用多线程异步的从不同平台获取图书价格
     *
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByAsync(List<NetMall> list, String productName) {
        List<String> results = list.stream().map(netMall ->
                        CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
        return results;
    }

    public static void main(String[] args) {
        // 使用异步多线程获取
        long startTime2 = System.currentTimeMillis();
        List<String> resuts2 = getPriceByAsync(list, "mysql");
        for (String data : resuts2) {
            System.out.println(data);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("使用异步多线程,costTime: " + (endTime2 - startTime2) + " 毫秒");
    }

}

class NetMallData {
    @Getter
    private String mallName;

    public NetMallData(String mallName) {
        this.mallName = mallName;
    }

    public double calcPrice(String productName) {
        //检索需要1秒钟
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

执行结果:

mysql in jd price is 110.91
mysql in pdd price is 109.59
mysql in taobao price is 109.30
mysql in dangdangwang price is 110.85
mysql in tmall price is 110.67
使用异步多线程,costTime: 1056 毫秒

使用异步多线程,执行效率大大提升。

七、CompletableFuture 常用API

1.获得结果和触发计算

  • 获取结果

    • public T get() 阻塞等待,直到得到返回结果
    • public T get(long timeout,TimeUnit unit) 阻塞等待指定的时长,超时则报错:TimeOutException
    • public T join() 类似于get(),区别在于不需要抛出异常
    • public T getNow(T valueIfAbsent) 立即获取结果不阻塞
      • 计算完,返回计算完成后的结果
      • 没算完,返回设定的valueAbsent(直接返回了备胎值xxx)
  • 主动触发计算

    • public boolean complete(T value) 是否立即打断get()方法返回括号值
      • (执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值 complete;如果没打断,返回false 和原来的abc)
    /**
     * 获得结果和触发计算
     *
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public static void m1() throws InterruptedException, ExecutionException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), 	Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }, threadPoolExecutor);
    
        //System.out.println(future.get()); // 1
        //System.out.println(future.get(2L,TimeUnit.SECONDS)); // 1
        //暂停几秒钟线程
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //System.out.println(future.getNow(9999));
    
        System.out.println(future.complete(-44) + "\t" + future.get());
        
        // 关闭线程池
        threadPoolExecutor.shutdown();
    }

    打印结果:

    true	-44

2. 对计算结果进行处理

  • thenApply 计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。

    private static void thenApplyDemo() throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
    
            if (result > 5) {
                int i = 10 / 0;
            }
            return 1;
        }, threadPoolExecutor).thenApply(data -> data + 2).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("执行完成,结果:" + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            String msg = String.format("执行出现异常,原因:%s, 异常信息:%s", e.getCause(), e.getMessage());
            System.out.println(msg);
            return -1;
        });
        
        System.out.println("执行结束,结果:" + completableFuture.join());
    
        // 关闭自定义线程池
        threadPoolExecutor.shutdown();
    
        // 主线程停留3秒
        TimeUnit.SECONDS.sleep(3);
    }

    正常执行结束打印:

    执行完成,结果:3
    执行结束,结果:3

    异常执行结束打印:

    java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.ArithmeticException: / by zero
    	at com.atguigu.juc.cf.CompletableFutureDemo.lambda$thenApplyDemo$0(CompletableFutureDemo.java:27)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    	... 3 more
    执行出现异常,原因:java.lang.ArithmeticException: / by zero, 异常信息:java.lang.ArithmeticException: / by zero
    执行结束,结果:-1
  • handle 类似于 thenApply,但是有异常的话仍然可以往下走一步。

    private static void handleDemo() throws InterruptedException {
        // 自定义线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                4,
                5,
                50,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        CompletableFuture.supplyAsync(() -> {
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
    
            if (result > 5) {
                int i = 10 / 0;
            }
            return 10;
        }, threadPool).handle((v, e) -> {
            if (e == null) {
                System.out.println("handle1执行完成,结果:" + v);
            } else {
                System.out.println("handle1执行异常");
            }
            return v + 1;
        }).handle((v, e) -> {
            if (e == null) {
                System.out.println("handle2执行完成,结果:" + v);
            } else {
                System.out.println("handle2执行异常");
            }
            return "abc";
        }).handle((v, e) -> {
            if (e == null) {
                System.out.println("handle3执行完成,结果:" + v);
            } else {
                System.out.println("handle3执行异常");
            }
            return "xyz";
        }).whenCompleteAsync((v, e) -> {
            if (e == null) {
                System.out.println(Thread.currentThread().getName() + " whenComplete执行完成,结果:" + v);
            }
        }, threadPool).exceptionally(e -> {
            e.printStackTrace();
            String msg = String.format("执行出现异常,原因:%s, 异常信息:%s", e.getCause(), e.getMessage());
            System.out.println(msg);
            return "error";
        });
    
        TimeUnit.SECONDS.sleep(5);
    
        // 关闭线程池
        threadPool.shutdown();
    
    }

    正常执行结束打印:

    handle1执行完成,结果:10
    handle2执行完成,结果:11
    whenComplete执行完成,结果:abc

    异常执行结束打印:

    handle1执行异常
    handle2执行异常
    handle3执行完成,结果:abc
    pool-1-thread-2 whenComplete执行完成,结果:xyz

    注意:supplyAsync 方法执行异常时, 程序依然往下走,经过了 handle1handle2handle3 ,而没有经过 exceptionally ,当 handle3 出现异常时(在whenCompleteAsync 上方的方法),才会经过 exceptionally 方法

thenApply 在实际工作中最常用

3. 对计算结果进行消费

接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口,之前的是Function

  • thenAccept

    • thenAccept(Consumer action)
    • 任务A执行完执行B,B需要A的结果,但是任务B无返回值
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
  • thenRun

    • thenRun(Runnable runnable)
    • 任务A执行完执行B,并且B不需要A的结果
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
  • thenApply

    • thenApply(Function fn)
    • 任务A执行完执行B,B需要A的结果,同时任务B有返回值
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());

示例代码:

/**
 * 对计算结果进行消费
 */
public static void m3() {
    CompletableFuture.supplyAsync(() -> {
        return 1;
    }).thenApply(f -> {
        return f + 2;
    }).thenApply(f -> {
        return f + 3;
    }).thenAccept(r -> System.out.println(r));

    // 任务A执行完执行B,并且B不需要A的结果, 且B无返回值
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());

    // 任务A执行完执行B,B需要A的结果,但是任务B无返回值
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());

    // 任务A执行完执行B,B需要A的结果,同时任务B有返回值
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
}

4. CompleteFuture和线程池说明(非常重要)

上面的几个方法都有普通版本和后面加Async的版本, 例如:thenRunthenRunAsyncthenApplythenApplyAsyncwhenCompletewhenCompleteAsync ……

thenRunthenRunAsync为例,有什么区别?

  • thenRun 没有传入自定义线程池,使用默认线程池 ForkJoinPool

  • thenRunAsync可传入一个自定义线程池

    • 如果你执行第一个任务的时候,传入了一个自定义线程池,调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池。
    • 调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是 ForkJoin线程池
  • 特殊情况,任务处理太快,系统优化切换原则,直接使用main线程处理

  • thenRun 代码示例

    private static void m7() throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPoolExecutor).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    
        Thread.sleep(2000);
        threadPoolExecutor.shutdown();
    }

    运行结果:

    1号任务	pool-1-thread-1
    2号任务	pool-1-thread-1
    3号任务	pool-1-thread-1
    4号任务	pool-1-thread-1
  • thenRunAsync 代码示例

    private static void m7() throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPoolExecutor).thenRunAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    
        Thread.sleep(2000);
        threadPoolExecutor.shutdown();
    }

    运行结果:

    1号任务	pool-1-thread-1
    2号任务	ForkJoinPool.commonPool-worker-25
    3号任务	ForkJoinPool.commonPool-worker-25
    4号任务	ForkJoinPool.commonPool-worker-25
  • 特殊情况, 直接使用main线程处理

    private static void m7() throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
    //        try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPoolExecutor).thenRunAsync(()->{
    //        try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
    //        try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
    //        try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    
        Thread.sleep(2000);
        threadPoolExecutor.shutdown();
    }

    运行结果:

    1号任务	pool-1-thread-1
    2号任务	ForkJoinPool.commonPool-worker-25
    3号任务	main
    4号任务	main

5. 对计算速度进行选用

  • applyToEither 优先选择运行速度快的任务的结果
/**
 * 对计算速度进行选用
 */
public static void m4() {
    System.out.println(CompletableFuture.supplyAsync(() -> {
        //暂停几秒钟线程
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 2;
    }), r -> {
        return r;
    }).join());

    //暂停几秒钟线程
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

运行结果:

1

6. 对计算结果进行合并

  • thenCombine 合并

    • 两个 CompletionStage 任务都完成后,最终能把两个任务的结果一起交给 thenCombine 来处理
    • 先完成的先等着,等待其它分支任务
    public static void m5() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
            return 20;
        }), (x, y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
            return 30;
        }), (a, b) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
            return a + b;
        });
        System.out.println("-----主线程结束,END");
        System.out.println(thenCombineResult.get());
    
    
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
    }

    打印结果:

    ForkJoinPool.commonPool-worker-25	---come in 1
    ForkJoinPool.commonPool-worker-25	---come in 2
    main	---come in 3
    ForkJoinPool.commonPool-worker-18	---come in 4
    main	---come in 5
    -----主线程结束,END
    60

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 george_95@126.com