CompletableFuture

CompletableFuture概述

CompletableFuture是一个可以通过编程方式显式地设置计算结果和状态以便让任务结束的Future,并且其可以作为一个CompletionStage(计算阶段),当它的计算完成时可以触发一个函数或者行为;当多个线程企图调用同一个CompletableFuture的complete、cancel方式时只有一个线程会成功。CompletableFuture除了含有可以直接操作任务状态和结果的方法外,还实现了CompletionStage接口的一些方法,这些方法遵循:

● 当CompletableFuture任务完成后,同步使用任务执行线程来执行依赖任务结果的函数或者行为。

● 所有异步的方法在没有显式指定Executor参数的情形下都是复用ForkJoinPool. commonPool()线程池来执行。

● 所有CompletionStage方法的实现都是相互独立的,以便一个方法的行为不会因为重载了其他方法而受影响。

显式设置CompletableFuture结果

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 自定义一个线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1,
                1,
                0L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());

        CompletableFuture<String> future = new CompletableFuture<>();

        threadPoolExecutor.submit(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            future.complete("success......");
        });
        System.out.println(future.get());
        System.out.println("main end......");

        threadPoolExecutor.shutdown();
    }
}

如上所述,这里使用CompletableFuture实现了通知等待模型,主线程调用future的get()方法等待future返回结果,一开始由于future结果没有设置,所以主线程被阻塞挂起,等异步任务休眠3s,然后调用future的complete方法模拟主线程等待的条件完成,这时候主线程就会从get()方法返回。

基于CompletableFuture实现异步计算与结果转换

runAsync

基于runAsync系列方法实现无返回值的异步计算:当你想异步执行一个任务,并且不需要任务的执行结果时可以使用该方法,比如异步打日志,异步做消息通知等:

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建异步任务
        CompletableFuture future = CompletableFuture.runAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("over......");
        });
        // 同步等待异步执行结果
        System.out.println(future.get());
    }
}

以上代码调用返回的future的get()方法企图等待future任务执行完毕,由于runAsync方法不会有返回值,所以当任务执行完毕后,设置future的结果为null,即等任务执行完毕后返回null。

需要注意的是,在默认情况下,runAsync(Runnable runnable)方法是使用整个JVM内唯一的ForkJoinPool.commonPool()线程池来执行异步任务的,使用runAsync (Runnable runnable, Executor executor)方法允许我们使用自己制定的线程池来执行异步任务。我们创建了一个自己的线程池bizPoolExecutor,在调用runAsync方法提交异步任务时,把其作为第二参数进行传递,则异步任务执行时会使用bizPoolExecutor中的线程执行,具体代码如下所示。

public class Test {
    // 自定义一个线程池
    public static ThreadPoolExecutor bizPoolExecutor = new ThreadPoolExecutor(
            1,
            1,
            0L,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建异步任务
        CompletableFuture future = CompletableFuture.runAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("over......");
        },bizPoolExecutor);
        // 同步等待异步执行结果
        System.out.println(future.get());
        // 关闭线程池
        bizPoolExecutor.shutdown();
    }
}

supplyAsync

基于supplyAsync系列方法实现有返回值的异步计算:当你想异步执行一个任务,并且需要任务的执行结果时可以使用该方法,比如异步对原始数据进行加工,并需要获取到被加工后的结果等。

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建异步任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("over......");
            return "success......";
        });
        // 同步等待异步执行结果
        System.out.println(future.get());
    }
}

需要注意的是,在默认情况下,supplyAsync(Suppliersupplier)方法是使用整个JVM内唯一的ForkJoinPool.commonPool()线程池来执行异步任务的,使用supply-Async(Supplier supplier, Executor executor)方法允许我们使用自己制定的线程池来执行异步任务,代码如下:

public class Test {
    // 自定义一个线程池
    public static ThreadPoolExecutor bizPoolExecutor = new ThreadPoolExecutor(
            1,
            1,
            0L,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建异步任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("over......");
            return "success......";
        }, bizPoolExecutor);
        // 同步等待异步执行结果
        System.out.println(future.get());
        // 关闭线程池
        bizPoolExecutor.shutdown();
    }
}

thenRun

基于thenRun实现异步任务A,执行完毕后,激活异步任务B执行,需要注意的是,这种方式激活的异步任务B是拿不到任务A的执行结果的:

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建异步任务
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A over......");
            return "A";
        });

        CompletableFuture futureB = futureA.thenRun(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B over......");
        });
        // 同步等待异步执行结果
        System.out.println(futureB.get());
    }
}

在futureB上调用get()方法也会返回null,因为回调事件是没有返回值的。

默认情况下futureA对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,大家可以使用thenRunAsync (Runnable action,Executor executor)来指定设置的回调事件使用自定义线程池线程来执行,也就是futureA对应的任务与在其上设置的回调执行将不会在同一个线程中执行。

thenAccept

基于thenAccept实现异步任务A,执行完毕后,激活异步任务B执行,需要注意的是,这种方式激活的异步任务B是可以拿到任务A的执行结果的:

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建异步任务
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A over......");
            return "A";
        });

        CompletableFuture futureB = futureA.thenAccept(new Consumer<String>() {
            // 这里的 result 为 futureA 返回的结果
            @Override
            public void accept(String result) {
                System.out.println("previous result: " + result);
                try {
                    // 休眠3秒
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("B over......");
            }
        });
        // 同步等待异步执行结果
        System.out.println(futureB.get());
    }
}

需要注意的是,这里可以在回调的方法accept(String result)的参数t中来获取futureA对应的任务结果,另外需要注意的是,由于accept(String result)方法没有返回值,所以在futureB上调用get()方法最终也会返回null。

在默认情况下,futureA对应的异步任务和在futureA上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,大家可以使用thenAccept-Async(Consumer<? super T> action, Executor executor)来指定设置的回调事件使用自定义线程池线程来执行,也就是futureA对应的任务与在其上设置的回调执行将不会在同一个线程中执行。

thenApply

基于thenApply实现异步任务A,执行完毕后,激活异步任务B执行。需要注意的是,这种方式激活的异步任务B是可以拿到任务A的执行结果的,并且可以获取到异步任务B的执行结果。

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建异步任务
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A over......");
            return "A";
        });

        CompletableFuture<String> futureB = futureA.thenApply(new Function<String, String>() {
            // 这里的 result 为 futureA 返回的结果
            @Override
            public String apply(String result) {
                System.out.println("previous result: " + result);
                try {
                    // 休眠3秒
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("B over......");
                return result + " => B";
            }
        });
        // 同步等待异步执行结果
        System.out.println(futureB.get());
    }
}

需要注意的是,这里可以在回调方法apply(String result)的参数t中获取futureA对应的任务结果,另外需要注意的是,由于apply(String result)方法有返回值,所以在futureB上调用get()方法最终也会返回回调方法返回的值。

默认情况下futureA对应的异步任务和在futureA上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,大家可以使用thenApplyAsync (Function<?super T, ? extends U> fn, Executor executor)来指定设置的回调事件使用自定义线程池线程来执行,也就是futureA对应的任务与在其上设置的回调执行将不会在同一个线程中执行。

whenComplete

基于whenComplete设置回调函数,当异步任务执行完毕后进行回调,不会阻塞调用线程:

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建异步任务
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A over......");
            // 1.2 放回计算结果
            return "A";
        });
        // 2. 添加回调函数
        futureA.whenComplete(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String result, Throwable throwable) {
                System.out.println("previous result: " + result);
                try {
                    // 休眠3秒
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("B over......");
            }
        });
        System.out.println("main ......");
        // 3. 挂起当前线程,等待异步任务执行完毕
        Thread.currentThread().join();
    }
}

这里代码1开启了一个异步任务,任务内先休眠3s,然后代码1.2返回计算结果;代码2则在返回的futureA上调用whenComplete设置一个回调函数,然后main线程就返回了。在整个异步任务的执行过程中,main函数所在线程是不会被阻塞的,等异步任务执行完毕后会回调设置的回调函数。这里代码3挂起了main函数所在线程,是因为具体执行异步任务的是ForkJoin的commonPool线程池,其中线程都是Deamon线程,所以,当唯一的用户线程main线程退出后整个JVM进程就退出了,会导致异步任务得不到执行。

如上所述,当我们使用CompletableFuture实现异步编程时,大多数时候是不需要显式创建线程池,并投递任务到线程池内的。我们只需要简单地调用CompletableFuture的runAsync或者supplyAsync等方法把异步任务作为参数即可,其内部会使用ForkJoinPool线程池来进行异步执行的支持,这大大简化了我们异步编程的负担,实现了声明式编程(告诉程序我要执行异步任务,但是具体怎么实现我不需要管),当然如果你想使用自己的线程池来执行任务,也是可以非常方便地进行设置的。

多个CompletableFuture进行组合运算

CompletableFuture功能强大的原因之一是其可以让两个或者多个Completable-Future进行运算来产生结果,下面我们来看其提供的几组函数:

thenCompose

基于thenCompose实现当一个CompletableFuture执行完毕后,执行另外一个CompletableFuture:

public class Test {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> result = futureA("123").thenCompose(str -> futureB(str));
        System.out.println(result.get());
    }

    public static CompletableFuture<String> futureA(String str) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠1秒
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A over......");
            // 返回计算结果
            return "A " + str;
        });
    }

    public static CompletableFuture<String> futureB(String str) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠2秒
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B over......");
            // 返回计算结果
            return "B " + str;
        });
    }
}

thenCombine

基于thenCombine实现当两个并发运行的CompletableFuture任务都完成后,使用两者的结果作为参数再执行一个异步任务

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> result = futureA("123").thenCombine(futureB("456"), (one, two) -> {
            return one + " " + two;
        });
        System.out.println(result.get());
    }

    public static CompletableFuture<String> futureA(String str) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠1秒
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A over......");
            // 返回计算结果
            return "A " + str;
        });
    }

    public static CompletableFuture<String> futureB(String str) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠2秒
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B over......");
            // 返回计算结果
            return "B " + str;
        });
    }
}

allOf

基于allOf等待多个并发运行的CompletableFuture任务执行完毕:

public class Test {
    // 自定义一个线程池
    public static ThreadPoolExecutor bizPoolExecutor = new ThreadPoolExecutor(
            1,
            1,
            0L,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建异步任务
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A over......");
            // 1.2 返回计算结果
            return "A";
        });
        // 2.创建异步任务
        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
                System.out.println("B over......");
            // 2.2 返回计算结果
            return "B";
        });

        List<CompletableFuture<String>> futureList = new ArrayList<>();
        futureList.add(futureA);
        futureList.add(futureB);

        CompletableFuture result = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));

        // 3. 等待所有 future 完成
        System.out.println(result.get());
    }
}

用allOf方法把多个CompletableFuture转换为一个result,代码3在result上调用get()方法会阻塞调用线程,直到futureList列表中所有任务执行完毕才返回。

anyOf

基于anyOf等多个并发运行的CompletableFuture任务中有一个执行完毕就返回:

public class Test {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建异步任务
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠1秒
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A over......");
            // 1.2 返回计算结果
            return "A";
        });
        // 2.创建异步任务
        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                // 休眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
                System.out.println("B over......");
            // 2.2 返回计算结果
            return "B";
        });

        List<CompletableFuture<String>> futureList = new ArrayList<>();
        futureList.add(futureA);
        futureList.add(futureB);

        CompletableFuture result = CompletableFuture.anyOf(futureList.toArray(new CompletableFuture[futureList.size()]));

        // 3. 等待某一个 future 完成
        System.out.println(result.get());
    }
}

调用anyOf方法把多个CompletableFuture转换为一个result,代码3在result上调用get()方法会阻塞调用线程,直到futureList列表中有一个任务执行完毕才返回。

Stream结合CompletableFuture

正常同步调用代码如下所示

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 生成ip列表
        List<String> ipList = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            ipList.add("192.168.0." + i);
        }
        // 2. 发起广播调用
        Long start = System.currentTimeMillis();
        List<String> result = new ArrayList<>();
        for (String ip : ipList) {
            result.add(rpcCall(ip, ip));
        }
        result.stream().forEach(s -> System.out.println(s));
        System.out.println("cost: " + (System.currentTimeMillis() - start));
    }
    public static String rpcCall(String ip, String param) {
        System.out.println(ip + ":" + param);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return param;
    }
}

下面我们将Stream和CompletableFuture结合使用

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 生成ip列表
        List<String> ipList = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            ipList.add("192.168.0." + i);
        }
        // 2. 并发调用
        Long start = System.currentTimeMillis();
        List<CompletableFuture<String>> futureList = ipList.stream()
                .map(ip -> CompletableFuture.supplyAsync(() -> rpcCall(ip, ip))) // 同步转异步
                .collect(Collectors.toList());// 收集结果
        // 3. 等待所有异步任务执行完毕
        List<String> result = futureList.stream().map(future -> future.join()).collect(Collectors.toList());
        result.stream().forEach(s -> System.out.println(s));
        System.out.println("cost: " + (System.currentTimeMillis() - start));
    }
    public static String rpcCall(String ip, String param) {
        System.out.println(ip + ":" + param);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return param;
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!