11-CompletableFuture

一、CompletableFuture 简介

CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。

CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的 CompletableFuture 类。

二、Future 与 CompletableFuture

Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

Future 的主要缺点如下:

  • 不支持手动完成

    我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成

  • 不支持进一步的非阻塞调用

    通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能

  • 不支持链式调用

    对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的。

  • 不支持多个 Future 合并

    比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后,执行某些函数,是没法通过 Future 实现的。

  • 不支持异常处理

    Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。

三、CompletableFuture 入门

1. 使用 CompletableFuture

场景:主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止。

/**
 * 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止
 *
 * @throws InterruptedException
 * @throws ExecutionException
 */
private static void method1() throws InterruptedException, ExecutionException {
    CompletableFuture<String> future = new CompletableFuture<>();
    new Thread(() -> {
        try {
            System.out.println(Thread.currentThread().getName() + "子线程开始干活");
            //子线程睡 5 秒
            Thread.sleep(5000);
            //在子线程中完成主线程
            future.complete("success");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }, "A").start();
    //主线程调用 get 方法阻塞
    System.out.println("主线程调用 get 方法获取结果为: " + future.get());
    System.out.println("主线程完成,阻塞结束!!!!!!");
}

打印结果:

A子线程开始干活
主线程调用 get 方法获取结果为: success
主线程完成,阻塞结束!!!!!!

2. 没有返回值的异步任务

/**
    * 没有返回值的异步任务
    *
    * @throws InterruptedException
    * @throws ExecutionException
    */
   private static void method2() throws InterruptedException, ExecutionException {
       CompletableFuture<Void> future = new CompletableFuture<>();
       new Thread(() -> {
           try {
               System.out.println(Thread.currentThread().getName() + "开始作业");
               TimeUnit.SECONDS.sleep(5);
               System.out.println(Thread.currentThread().getName() + " 完成作业");
           } catch (Exception e) {
               e.printStackTrace();
           }
       }, "线程1").start();

       //主线程调用 get 方法阻塞
       future.get();
       System.out.println("主线程结束");
   }

运行结果:

线程1开始作业
线程1 完成作业

3. 有返回值的异步任务

/**
 * 有返回值的异步任务
 *
 * @throws InterruptedException
 * @throws ExecutionException
 */
private static void method3() throws InterruptedException, ExecutionException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            try {
                System.out.println(Thread.currentThread().getName() + "开始作业");
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName() + " 完成作业");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "异步结果";
        }
    });

    //主线程调用 get 方法阻塞
    future.get();
    System.out.println("主线程结束");
}

运行结果:

ForkJoinPool.commonPool-worker-19开始作业
ForkJoinPool.commonPool-worker-19 完成作业
主线程结束

4. 线程依赖

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

private static int num = 0;
/**
 * 线程依赖
 * 先对一个数加 10,然后取平方
 * @throws InterruptedException
 * @throws ExecutionException
 */
private static void method4() throws InterruptedException, ExecutionException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + "开始对值进行 +10 操作");
        num += 10;
        System.out.println(Thread.currentThread().getName() + " 完成作业");
        return num;
    }).thenApply(new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer data) {
            System.out.println(Thread.currentThread().getName() + "开始对值进行 取平方 操作");
            System.out.println("integer: " + data);
            return data*data;
        }
    });

    //主线程调用 get 方法阻塞
    System.out.println("主线程阻塞获取结果,结果为:" + future.get());
    System.out.println("主线程结束");
}

运行结果:

ForkJoinPool.commonPool-worker-19开始对值进行 +10 操作
ForkJoinPool.commonPool-worker-19 完成作业
ForkJoinPool.commonPool-worker-19开始对值进行 取平方 操作
integer: 10
主线程阻塞获取结果,结果为:100
主线程结束

5. 消费处理结果

thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

/**
 * thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
 * 
 * @throws InterruptedException
 * @throws ExecutionException
 */
private static void method5() throws InterruptedException, ExecutionException {
    CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + "开始对值进行 +10 操作");
        num += 10;
        System.out.println(Thread.currentThread().getName() + " 完成作业");
        return num;
    }).thenApply(data -> {
        System.out.println(Thread.currentThread().getName() + "开始对值进行 取平方 操作");
        System.out.println("integer: " + data);
        return data * data;
    }).thenAccept(integer -> {
        System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + integer);
    });

    //主线程调用 get 方法阻塞
    System.out.println("主线程阻塞获取结果,结果为:" + completableFuture.get());
    System.out.println("主线程结束");
}

运行结果:

ForkJoinPool.commonPool-worker-19开始对值进行 +10 操作
ForkJoinPool.commonPool-worker-19 完成作业
ForkJoinPool.commonPool-worker-19开始对值进行 取平方 操作
integer: 10
子线程全部处理完成,最后调用了 accept,结果为:100
主线程阻塞获取结果,结果为:null
主线程结束

6. 异常处理

exceptionally 异常处理,出现异常时触发

/**
 * exceptionally 异常处理,出现异常时触发
 * @throws InterruptedException
 * @throws ExecutionException
 */
private static void method6() throws InterruptedException, ExecutionException {
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        int i = 1 / 0;
        System.out.println(Thread.currentThread().getName() + "开始对值进行 +10 操作");
        num += 10;
        System.out.println(Thread.currentThread().getName() + " 完成作业");
        return num;
    }).exceptionally(throwable -> {
        System.out.println(throwable.getMessage());
        return -1;
    });

    //主线程调用 get 方法阻塞
    System.out.println("主线程阻塞获取结果,结果为:" + completableFuture.get());
    System.out.println("主线程结束");
}

执行结果:

java.lang.ArithmeticException: / by zero
主线程阻塞获取结果,结果为:-1
主线程结束

handle 类似于 thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常

/**
 * handle 类似于 thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常
 */
private static void method7() {
    System.out.println("主线程开始");
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("加 10 任务开始");
        num += 10;
        return num;
    }).handle((i, ex) -> {
        System.out.println("进入 handle 方法");
        if (ex != null) {
            System.out.print("发生了异常,内容为:");
            System.out.println(ex.getMessage());
            return -1;
        } else {
            System.out.println("正常完成,内容为: " + i);
            return i;
        }
    });
}

运行结果:

主线程开始
加 10 任务开始
进入 handle 方法
正常完成,内容为: 10

发生异常时:

主线程开始
加 10 任务开始
进入 handle 方法
发生了异常,内容为:java.lang.ArithmeticException: / by zero

7. 结果合并

thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果.

/**
 * thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果
 * @throws ExecutionException
 * @throws InterruptedException
 */
private static void method8() throws ExecutionException, InterruptedException {
    System.out.println("主线程开始......");
    //第一步加 10
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + "线程执行num加10");
        num += 10;
        return num;
    });

    // 合并
    CompletableFuture<Integer> future2 = future1.thenCompose(data -> CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + "线程执行合并");
        return data + 1;
    }));

    System.out.println("主线程阻塞获取结果: " + future1.get());
    System.out.println("主线程阻塞获取结果: " + future2.get());
}

执行结果:

主线程开始......
ForkJoinPool.commonPool-worker-19线程执行num加10
ForkJoinPool.commonPool-worker-5线程执行合并
主线程阻塞获取结果: 10
主线程阻塞获取结果: 11

thenCombine 合并两个没有依赖关系的 CompletableFutures 任务

/**
 * thenCombine 合并两个没有依赖关系的 CompletableFutures 任务
 * @throws ExecutionException
 * @throws InterruptedException
 */
private static void method9() throws ExecutionException, InterruptedException {
    System.out.println("主线程开始......");
    //第一个线程加 10
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + "线程执行num加10");
        num += 10;
        return num;
    });

    // 第二个线程 *10
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + "线程执行num * 10");
        num *= 10;
        return num;
    });

    // 合并两个CompletableFuture
    CompletableFuture<Object> combine = future1.thenCombine(future2, (a, b) -> {
        List<Integer> list = new ArrayList<>();
        list.add(a);
        list.add(b);
        return list;
    });

    System.out.println("主线程阻塞获取future1结果: " + future1.get());
    System.out.println("主线程阻塞获取future2结果: " + future2.get());
    System.out.println("主线程阻塞获取combine结果: " + combine.get());
}

执行结果:

主线程开始......
ForkJoinPool.commonPool-worker-19线程执行num加10
ForkJoinPool.commonPool-worker-5线程执行num * 10
主线程阻塞获取future1结果: 10
主线程阻塞获取future2结果: 100
主线程阻塞获取combine结果: [10, 100]

合并多个任务的结果 allOf 与 anyOf

allOf: 一系列独立的 future 任务,等其所有的任务执行完后做一些事情

/**
 * allOf: 一系列独立的 future 任务,等其所有的任务执行完后做一些事情
 */
private static void method10() {
    System.out.println("主线程开始");
    List<CompletableFuture> list = new ArrayList<>();
    CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("加 10 任务开始");
        num += 10;
        return num;
    });
    list.add(job1);

    CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("乘以 10 任务开始");
        num = num * 10;
        return num;
    });
    list.add(job2);

    CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("减以 10 任务开始");
        num = num * 10;
        return num;
    });
    list.add(job3);

    CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
        System.out.println("除以 10 任务开始");
        num = num * 10;
        return num;
    });
    list.add(job4);

    //多任务合并
    List<Integer> collect =
            list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
    System.out.println("所有任务执行结束,结果为: " + collect);
}

运行结果:

主线程开始
乘以 10 任务开始
加 10 任务开始
减以 10 任务开始
除以 10 任务开始
所有任务执行结束,结果为: [10, 0, 100, 1000]

anyOf: 只要在多个 future 里面有一个返回,整个任务就可以结束,而不需要等到每一个future 结束

/**
 * anyOf: 只要在多个 future 里面有一个返回,整个任务就可以结束,而不需要等到每一个future 结束
 * @throws ExecutionException
 * @throws InterruptedException
 */
private static void method11() throws ExecutionException, InterruptedException {
    System.out.println("主线程开始");
    CompletableFuture<Integer>[] futures = new CompletableFuture[4];
    CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("加 10 任务开始");
        num += 10;
        return num;
    });
    futures[0] = job1;

    CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("乘以 10 任务开始");
        num = num * 10;
        return num;
    });
    futures[1] = job2;

    CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("减以 10 任务开始");
        num = num * 10;
        return num;
    });
    futures[2] = job3;

    CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
        System.out.println("除以 10 任务开始");
        num = num * 10;
        return num;
    });
    futures[3] = job4;

    CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
    System.out.println(future.get());
    System.out.println("其中一个任务执行结束,结果为: " + future.get());
}

执行结果:

主线程开始
乘以 10 任务开始
加 10 任务开始
减以 10 任务开始
10
除以 10 任务开始
其中一个任务执行结束,结果为: 10

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 george_95@126.com