CompletableFuture分析和实战

tech2025-03-17  6

研究了下CompletableFuture,感觉这个异步类很好用

一. Future

JDK 5引入了Future模式。Future接口是Java多线程Future模式的实现,在java.util.concurrent包中,可以来进行异步计算。

Future模式是多线程设计常用的一种设计模式。Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。

Future的接口很简单,只有五个方法。

public interface Future<V> { //取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束 boolean cancel(boolean mayInterruptIfRunning); //任务是否已经取消,任务正常完成前将其取消,则返回 true boolean isCancelled(); //任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true boolean isDone(); //等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException V get() throws InterruptedException, ExecutionException; //同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计 算超时,将抛出TimeoutException 一般情况下,我们会结合Callable和Future一起使用,通过ExecutorService的submit方法执行Callable,并返回Future。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

Future模式的缺点

Future虽然可以实现获取异步执行结果的需求,但是它没有提供通知的机制,我们无法得知Future什么时候完成。

要么使用阻塞,在future.get()的地方等待future返回的结果,这时又变成同步操作。要么使用isDone()轮询地判断Future是否完成,这样会耗费CPU的资源。

CompletableFuture

Netty、Guava分别扩展了Java 的 Future 接口,方便异步编程。

Java 8新增的CompletableFuture类正是吸收了所有Google Guava中ListenableFuture和SettableFuture的特征,还提供了其它强大的功能,让Java拥有了完整的非阻塞编程模型:Future、Promise 和 Callback(在Java8之前,只有无Callback 的Future)。

CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。

CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

whenComplete、whenCompleteAsync

public void test1(){ CompletableFutureTest completableFutureTest = new CompletableFutureTest(); CompletableFuture<String> future = completableFutureTest.completeTest(); System.out.println("1"+Thread.currentThread()); future.whenCompleteAsync((t,u)->{ System.out.println("2"+Thread.currentThread()+t); }); System.out.println("4"+Thread.currentThread()); System.out.println("主线程任务执行完毕"); } public CompletableFuture<String> completeTest(){ CompletableFuture<String> future = new CompletableFuture(); System.out.println("-1"+Thread.currentThread()); new Thread(){ @Override public void run() { System.out.println("0"+Thread.currentThread()); String result = null; try{ Thread.sleep(3000); result="suucess"; } catch(Exception ex){ future.completeExceptionally(ex); } future.complete(result); } }.start(); return future; }

线程中使用CompletableFuture 结果:

-1Thread[main,5,main] 1Thread[main,5,main] 0Thread[Thread-0,5,main] 4Thread[main,5,main] 主线程任务执行完毕 2Thread[ForkJoinPool.commonPool-worker-1,5,main]suucess

结果分析:新起的线程里使用了future.complete方法,并通过该方法进行了线程的传递。包括线程本身和返回值,如果是同步whenComplete方法,则使用旧线程执行,如果是异步方法则在whenCompleteAsync方法里执行的是新线程

runAsync、runAsync方法

public static void test2(){ System.out.println("main"+Thread.currentThread()); try { CompletableFutureTest.runAsyncTest2(); System.out.println("mainNext"+Thread.currentThread()); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void runAsyncTest2() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("runAsync:" + Thread.currentThread()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } System.out.println("run end ..."); return "123"; }); System.out.println(future.get()); }

结果:

mainThread[main,5,main] runAsync:Thread[ForkJoinPool.commonPool-worker-1,5,main] run end … 123 mainNextThread[main,5,main]

结果分析://同步方法runAsync返回类型必须是Void,因为同步方法runAsync不支持返回值,而supplyAsync支持返回值,当然也可以不返回 CompletableFuture方法无论是同步还是异步,都是调用了新线程去执行

runAsync、supplyAsync、whenComplete、whenCompleteAsync方法

public static void test3(){ CompletableFutureTest.runWhenComplete(); } public static void runWhenComplete(){ /*同步执行体 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } System.out.println("run end ..."); }); future.whenComplete(new BiConsumer<Void, Throwable>() { //future.whenCompleteAsync(new BiConsumer<Void, Throwable>() { @Override public void accept(Void t, Throwable action) { System.out.println("whenComplete:"+Thread.currentThread()); System.out.println("执行完成!"); } }); future.exceptionally(new Function<Throwable, Void>() { @Override public Void apply(Throwable t) { System.out.println("exceptionally:"+Thread.currentThread()); System.out.println("执行失败!"+t.getMessage()); return null; } });*/ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread()); System.out.println("supplyAsync:"+Thread.currentThread()); return "1234"; }); //future.whenComplete(new BiConsumer<String, Throwable>() { future.whenCompleteAsync(new BiConsumer<String, Throwable>() { @Override public void accept(String t, Throwable action) { System.out.println("whenComplete:"+Thread.currentThread()); try { System.out.println("执行完成!"+future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }); }

结果分析:whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。 whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。 并且supplyAsync返回的异步future在whenComplete执行的为同一个线程,在whenCompleteAsync中也是同一个线程,因为没做线程切换操作,默认会使用forkjoin线程

注意:future.get()在等待执行结果时,程序会一直block,如果此时调用complete(T t)会立即执行。但是complete(T t)只能调用一次,后续的重复调用会失效。 如果future已经执行完毕能够返回结果,此时再调用complete(T t)则会无效。

thenApply方法

public static void thenApplytest(){ System.out.println("main:"+Thread.currentThread()); CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() { @Override public Long get() { System.out.println("get:"+Thread.currentThread()); long result = new Random().nextInt(100); System.out.println("result1="+result); return result; } }).thenApply(new Function<Long, Long>() { //}).thenApplyAsync(new Function<Long, Long>() { @Override public Long apply(Long t) { System.out.println("apply:"+Thread.currentThread()); long result = t*5; System.out.println("result2="+result); return result; } }); long result = 0; try { result = future.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }

thenApply: main:Thread[main,5,main] get:Thread[ForkJoinPool.commonPool-worker-1,5,main] result1=71 apply:Thread[main,5,main] result2=355 355 thenApplyAsync:

main:Thread[main,5,main] get:Thread[ForkJoinPool.commonPool-worker-1,5,main] result1=78 apply:Thread[ForkJoinPool.commonPool-worker-1,5,main] result2=390 390

结果分析: thenApply 方法,当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。 异步执行supplyAsync方法体里进行thenApply方法,get方法是异步新建线程,但是apply方法则是主线程执行的,而thenApplyAsync则还是异步新建线程执行

后面的线程方面结果皆如此,所以不写线程的东西了 handle

public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor); public static void handle() throws Exception{ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int i= 10/0; return new Random().nextInt(10); } }).handle(new BiFunction<Integer, Throwable, Integer>() { @Override public Integer apply(Integer param, Throwable throwable) { int result = -1; if(throwable==null){ result = param * 2; }else{ System.out.println(throwable.getMessage()); } return result; } }); System.out.println(future.get()); }

接收任务的处理结果,并消费处理,无返回结果。

从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输错操作。

thenAccept 接收任务的处理结果,并消费处理,无返回结果。

public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor); public static void thenAccept() throws Exception{ CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { return new Random().nextInt(10); } }).thenAccept(integer -> { System.out.println(integer); }); future.get(); }

thenRun

public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); public static void thenRun() throws Exception{ CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { return new Random().nextInt(10); } }).thenRun(() -> { System.out.println("thenRun ..."); }); future.get(); }

该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法 thenCombine thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor); public static void thenCombine() throws Exception { System.out.println("main:"+Thread.currentThread()); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println("get1:"+Thread.currentThread()); return "hello"; } }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println("get2:"+Thread.currentThread()); return "hello"; } }); CompletableFuture<String> result = future1.thenCombine(future2, new BiFunction<String, String, String>() { @Override public String apply(String t, String u) { System.out.println("apply:"+Thread.currentThread()); return t+" "+u; } }); System.out.println(result.get()); }

结果:

//thenCombine main:Thread[main,5,main] get1:Thread[ForkJoinPool.commonPool-worker-1,5,main] get2:Thread[ForkJoinPool.commonPool-worker-1,5,main] apply:Thread[main,5,main] hello hello

//thenCombineAsync 结果会出现变化,不确使用哪个线程执行thenCombineAsync,甚至get1,get2时常也会同一个线程执行 main:Thread[main,5,main] get2:Thread[ForkJoinPool.commonPool-worker-2,5,main] get1:Thread[ForkJoinPool.commonPool-worker-1,5,main] apply:Thread[ForkJoinPool.commonPool-worker-2,5,main] hello hello

thenCombine使用的是主线程执行,thenCombineAsync使用的是异步线程

thenAcceptBoth 当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor); private static void thenAcceptBoth() throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int t = new Random().nextInt(3); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f1="+t); return t; } }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int t = new Random().nextInt(3); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f2="+t); return t; } }); f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() { @Override public void accept(Integer t, Integer u) { System.out.println("f1="+t+";f2="+u+";"); } }); }

applyToEither 方法 两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor); public static void applyToEither() throws Exception { System.out.println("main:"+Thread.currentThread()); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { System.out.println("get1:"+Thread.currentThread()); int t = new Random().nextInt(3); /* try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); }*/ System.out.println("f1="+t); return t; } }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { System.out.println("get2:"+Thread.currentThread()); int t = new Random().nextInt(3); /* try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); }*/ System.out.println("f2="+t); return t; } }); CompletableFuture<Integer> result = f1.applyToEitherAsync(f2, new Function<Integer, Integer>() { @Override public Integer apply(Integer t) { System.out.println("apply:"+Thread.currentThread()); System.out.println(t); return t * 2; } }); System.out.println(result.get()); }

注意:accept方法无论是异步还是同步,最后applyToEither里这个执行者都是主线程

acceptEither 方法 两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor); private static void acceptEither() throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int t = new Random().nextInt(3); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f1="+t); return t; } }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int t = new Random().nextInt(3); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f2="+t); return t; } }); f1.acceptEither(f2, new Consumer<Integer>() { @Override public void accept(Integer t) { System.out.println(t); } }); }

runAfterEither 方法 两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor); private static void runAfterEither() throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int t = new Random().nextInt(3); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f1="+t); return t; } }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int t = new Random().nextInt(3); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f2="+t); return t; } }); f1.runAfterEither(f2, new Runnable() { @Override public void run() { System.out.println("上面有一个已经完成了。"); } }); }

runAfterBoth 两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor); private static void runAfterBoth() throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int t = new Random().nextInt(3); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f1="+t); return t; } }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int t = new Random().nextInt(3); try { TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("f2="+t); return t; } }); f1.runAfterBoth(f2, new Runnable() { @Override public void run() { System.out.println("上面两个任务都执行完成了。"); } }); }

thenCompose 方法 thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ; public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ; public static void thenCompose() throws Exception { System.out.println("main:"+Thread.currentThread()); CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { System.out.println("get:"+Thread.currentThread()); int t = new Random().nextInt(3); System.out.println("t1="+t); return t; } }).thenCompose(new Function<Integer, CompletionStage<Integer>>() { @Override public CompletionStage<Integer> apply(Integer param) { System.out.println("apply:"+Thread.currentThread()); return CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { System.out.println("get2:"+Thread.currentThread()); int t = param *2; System.out.println("t2="+t); return t; } }); } }); System.out.println("thenCompose result : "+f.get()); }

结果:

thenCompose: main:Thread[main,5,main] get:Thread[ForkJoinPool.commonPool-worker-1,5,main] t1=2 apply:Thread[ForkJoinPool.commonPool-worker-1,5,main] get2:Thread[ForkJoinPool.commonPool-worker-2,5,main] t2=4 thenCompose result : 4

thenComposeAsync:get1,get2有可能是同一个线程,随机,apply则永远是和get1线程同一个 main:Thread[main,5,main] get:Thread[ForkJoinPool.commonPool-worker-1,5,main] t1=1 apply:Thread[ForkJoinPool.commonPool-worker-1,5,main] get2:Thread[ForkJoinPool.commonPool-worker-2,5,main] t2=2 thenCompose result : 2

最新回复(0)