Future模式解读

tech2025-03-05  12

Future和Callable

Future模式在框架中基本都会用到,是一个很好用的异步工具。 平常通过实现Runnable接口的run方法启动的线程无法获得异步线程的返回值,但是Future模式可以帮助我们获得异步线程的返回值,还可以抛出返回结果的异常。所以Future模式比Runnable方式更加强大。

Future表示未来的意思,负责在未来某个时间给你返回一个值。 Callable可以看成是Runnable,负责定义程序需要异步执行的业务逻辑。 可以看到,Callable定义的任务最终封装在FutureTask下,而这个FutureTask是一个Runnable。

直接上代码:

/*实现Callable接口,允许有返回值*/ private static class UseCallable implements Callable<Integer>{ private int sum; @Override public Integer call() throws Exception { System.out.println("Callable子线程开始计算!"); // Thread.sleep(1000); for(int i=0 ;i<5000;i++){ if(Thread.currentThread().isInterrupted()) { System.out.println("Callable子线程计算任务中断!"); return null; } sum=sum+i; System.out.println("sum="+sum); } System.out.println("Callable子线程计算结束!结果为: "+sum); return sum; } } public static void main(String[] args) throws InterruptedException, ExecutionException { UseCallable useCallable = new UseCallable(); //包装 FutureTask<Integer> futureTask = new FutureTask<>(useCallable); Random r = new Random(); new Thread(futureTask).start(); Thread.sleep(1); if(r.nextInt(100)>50){ System.out.println("Get UseCallable result = "+futureTask.get()); }else{ System.out.println("Cancel................. "); futureTask.cancel(true); } }

继续,看看FutureTask的get方法的实现逻辑:

public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }

接着进入awaitDone方法,这段代码的核心逻辑在for(;😉,我们注意到最后一行,挂起当前线程。即在结果返回前会挂起当前线程。

private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this);//挂起线程 } }

按照这个逻辑,我们看看成功返回结果后,会不会唤起这个线程。找到set方法

protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v;//将返回值赋予outcome UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }

继续进入finishCompletion方法,可以看到唤醒了线程。

private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);//唤醒线程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }

唤醒线程后就更简单了。从线程阻塞的位置开始,走到report方法:

private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x;//返回正常结果 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

总结:Future模式能实现异步返回结果的逻辑就很清晰了。首先获取返回结果get()是阻塞的,当Callable得到了结果后,唤醒阻塞线程,继续执行下面的业务逻辑。

最新回复(0)