Netty异步Future源码解读

tech2025-01-10  9

作者: LemonNan 本文地址: https://juejin.im/post/6844904021887565831

说在前面

本文的 Netty源码使用的是 4.1.31.Final 版本,不同版本会有一些差异.

JDK Future

在说Netty的异步Future之前,先简单介绍一下JDK自带的Future机制.

首先先上一段代码

public class JDKFuture { static ExecutorService executors = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(16)); public static void main(String[] args) throws Exception{ int cnt = 1; Future[] jdkFuture=new Future[cnt]; Object jdkFutureResult; for(int i = 0;i < cnt; i++){ jdkFuture[i] = executors.submit(new JDKCallable(i)); } System.out.println(String.format("%s 在 %s 即将获取任务执行结果", Thread.currentThread(), new Date())); jdkFutureResult = jdkFuture[0].get(); System.out.println(String.format("%s 在 %s 任务结果获取完毕 %s", Thread.currentThread(), new Date(), jdkFutureResult)); executors.shutdown(); } static class JDKCallable implements Callable{ int index; JDKCallable(int ind){ this.index = ind; } public Object call() throws Exception { try { System.out.println(String.format("线程 [%s] 提交任务[%s]", Thread.currentThread(), this.index)); // 耗时2秒,模拟耗时操作 Thread.sleep(2000); System.out.println(String.format("线程 [%s] 执行任务[%s]执行完毕", Thread.currentThread(), this.index)); }catch(InterruptedException e){ e.printStackTrace(); } return String.format("任务%s执行结果",this.index); } } }

输出结果为:

线程 [Thread[pool-1-thread-1,5,main]] 提交任务[0] Thread[main,5,main] 在 Mon Dec 16 16:40:38 CST 2019 即将获取任务执行结果 线程 [Thread[pool-1-thread-1,5,main]] 执行任务[0]执行完毕 Thread[main,5,main] 在 Mon Dec 16 16:40:40 CST 2019 任务结果获取完毕 任务0执行结果

可以看到主线程在使用 future.get() 的时候,因为子线程还未处理完返回结果而导致主线程活生生的等了2秒钟(耗时操作),这也是JDK自带的Future机制不够完善的地方.因为jdk自身的future机制不够完善,所以Netty自实现了一套Future机制.

Netty 异步Future/Promise

Netty的Future是异步的,那他是怎么实现的呢?接下来就从源码开始探究.

先看一下 Netty 的 Future 和 Promise 这两个接口

Future

/** * The result of an asynchronous operation * 异步操作的结果 * 对状态的判断、添加listener、获取结果 */ public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); boolean isCancellable(); Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> sync() throws InterruptedException; Future<V> syncUninterruptibly(); Future<V> await() throws InterruptedException; Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); V getNow(); @Override boolean cancel(boolean mayInterruptIfRunning); }

Promise

Promise是一个特殊的Future,它可写,可写意味着可以修改里面的结果.

/** * Special {@link Future} which is writable. * 一个可写的特殊的Future * 继承 Future, 继承的方法就不列出 */ public interface Promise<V> extends Future<V> { /** * Marks this future as a success and notifies all * listeners. * If it is success or failed already it will throw an {@link IllegalStateException}. * 将这个 future 标记为 success 并且通知所有的 listeners * 如果已经成功或者失败将会抛出异常 */ Promise<V> setSuccess(V result); /** * Marks this future as a success and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a success. Otherwise {@code false} because this future is * already marked as either a success or a failure. * 尝试设置结果,成功返回true, 失败 false, 上面的方法设置失败会抛出异常 */ boolean trySuccess(V result); // 这2个跟上面的差不多 Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); /** * Make this future impossible to cancel. * * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done * without being cancelled. {@code false} if this future has been cancelled already. */ boolean setUncancellable(); }

源码解读

看到这里都同学都默认是用netty写过程序的~,还没写过的话可以看看官方文档或者我的另一篇Netty使用.

接下来就开始源码的解读.

那么从哪里开始呢?

总所周知!,我们使用Netty开发的时候,写出数据用的是 writeAndFlush(msg), 至于 write(msg) 嘛, 不就是少了个 flush (没错,是我比较懒).

开始

在大家知道 channel().write 和 ctx.write 的区别后, 我们就从 channel().write 开始讲起.

不行,我感觉还是要说一下一些补充的,不然心里不舒服.

Netty中有一个pipeline,也就是事件调用链,开发的时候在调用链里面加入自己处理事件的handle,但是在这条 pipeline 中, Netty给我们加上了 Head 和 tail 这两个handle,方便Netty框架处理事件.

先看 DefaultChannelPipeline 的初始化,在初始化代码里给我们添加了2个handle, head 和 tail, 这2个东西很有用,为什么这么说呢?详情看后面解答

protected DefaultChannelPipeline(Channel channel) { this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel"); this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null); this.voidPromise = new VoidChannelPromise(channel, true); // ChannelInboundHandler this.tail = new DefaultChannelPipeline.TailContext(this); // ChannelInboundHandler && ChannelOutboundHandler this.head = new DefaultChannelPipeline.HeadContext(this); this.head.next = this.tail; this.tail.prev = this.head; }

Real 开始

没错,还是从 channel().write(msg) 开始说起(为什么我要用还是).

跟踪代码 channel().write(), 首先会调用到 DefaultChannelPipeline的 writeAndFlush 方法.

1.DefaultChannelPipeline#writeAndFlush

public final ChannelFuture writeAndFlush(Object msg) { return this.tail.writeAndFlush(msg); }

this.tail 就是上面构造函数里面初始化的 tailHandle, 而 write 是出栈事件, 会从 tailHandle 开始往前传递,最后传递到 headHandle(怎么感觉好像提前剧透了).

public ChannelFuture writeAndFlush(Object msg) { // 这里new了一个 promise, 然后这个promise将会一直传递,一直传递..... return this.writeAndFlush(msg, this.newPromise()); }

接下来来到了AbstractChannelHandlerContext 的 writeAndFlush.

/** * 执行 write and flush 操作 * @param msg * @param promise */ private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { // 这个方法在 ChannelHandler#handlerAdded 调用后,才会返回 true if (invokeHandler()) { // write 继续传递 invokeWrite0(msg, promise); // flush data invokeFlush0(); } else { writeAndFlush(msg, promise); } } private void write(Object msg, boolean flush, ChannelPromise promise) { // 查找下一个 OutboundHandle, 因为是要输出 AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); // 下一个 OutboundHandle 所在的线程 EventExecutor executor = next.executor(); // 如果在是同一个线程(由于Netty的channel在一个ThreadPool中只绑定一个Thread, 不同线程的话也意味着是不同线程池) if (executor.inEventLoop()) { // 在同一个线程池(这里意味着同一个线程)中, if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { // 在不同线程池(不同线程池那自然就是不同线程),需要创建一个任务,提交到下一个线程池 final AbstractWriteTask task; if (flush) { // 提交给下一个线程池 && flush task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } // 因为是 write 事件, so 接下来提交任务到下一个 OutboundHandle(出栈) 所在的线程, 由它执行 if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. // 任务提交失败,取消任务 task.cancel(); } } }

2.HeadContext#write、flush

接下来本篇文章最重要的地方了, HeadContext !

HeadContext的write和flush方法 实际上都是调用 unsafe的方法实现.

write

// 如果是 writeAndFlush ,调用 write后会调用flush @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 这个调用 AbstrachUnsafe.write unsafe.write(msg, promise); } // 这是 unsafe 的 write 方法 @Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; // outboundBuffer = null 表明 channel已经关闭并且需要将 future 结果设置为 false if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } // 将 msg添加进 buffer 中 outboundBuffer.addMessage(msg, size, promise); }

flush

如果是WriteAndFlush, 则在调用write后,会调用Head的flush方法,同 write是调用AbstractUnsafe的flush

/** * write 之后再调用这个 flush */ @Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // buffer 标记为可以被 flush outboundBuffer.addFlush(); // 接下来就是真正的 flush flush0(); }

ChannelOutboundBuffer 是个啥呢?

ChannelOutboundBuffer 简单来说就是存储当前channel写出的数据, 并且在调用flush的时候将他们都写出去.

跟着源码一直走,在 flush0 之后,最终会调用到 AbstractNioMessageChannel#doWrite 方法.(上面还有doRead方法,是接收数据的时候调用的)

@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { // Object msg = in.current(); if (msg == null) { // Wrote all messages. // 判断写事件 if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } try { // 循环写出数据 boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { // 真正的写出数据 // 最终会调用 javaChannel().send(nioData, mi); // 很眼熟吧,这个是java nio的方法,注册的时候也是javaChannel().register() if (doWriteMessage(msg, in)) { done = true; break; } } // 成功写出,从 buffer 中移除刚才写出的数据 if (done) { in.remove(); } else { // Did not write all messages. // 写出失败 if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } catch (Exception e) { // 出错后是否继续写出后面的数据 if (continueOnWriteError()) { in.remove(e); } else { throw e; } } } }

3.Promise

到上面位置,数据是写出去了,那promise的相关作用呢?没看出来啊?

说实话,这个藏得挺深,居然! 放在了 buffer.remove() 里!

public boolean remove() { // 刚写出去数据的Entry Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; // 这个就是writeAndFlush 的时候 new 的 DefaultPromise() ChannelPromise promise = e.promise; int size = e.pendingSize; // buffer 中移除 removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. ReferenceCountUtil.safeRelease(msg); // !!! 划重点 !!! // 这里设置了 promise 的结果, 调用了 trySuccess, 通知所有 listener // !!! 划重点 !!! safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } // recycle the entry // 重置Entry的信息,方便重用. // 跟 Entry entry = Entry.newInstance(msg, size, total(msg), promise); 相对应, newInstance 是获取一个缓存的 Entry e.recycle(); return true; }

promise 通知所有 listener 是在写数据成功,并且在 buffer.remove() 调用的时候在里面 safeSuccess(promise) , 最终调用 Promise 的 trySuccess() 从而触发 notifyListeners() 通知所有 listeners.

4.NotifyListener

这个是在 Promise#trySuccess的时候调用的,通知所有listeners操作已经完成.

/** * 通知监听者,任务已经完成 */ private void notifyListeners() { // 获取future所属线程(池) EventExecutor executor = executor(); // 执行通知是当前线程 则直接回调信息 // currentThread == this.executor if (executor.inEventLoop()) { // 获取 ThreadLocal 变量 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); // listen 的层级数 final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { // 通知所有的 listener notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } // 如果 executor 不是当前线程, 则交给 future 所属 executor 去执行 // 意思是添加通知的 executor 可能是前面的 executor , 然后到后面的 executor 也就是当前线程才执行通知 // 此时将通知交回给之前的 executor // 执行通知的不是当前线程, 封装成一个任务, 由之前提交的线程完成通知(回调) safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }

总结

Netty 的 Future 异步机制是在操作完成后,将通知封装成Task,由Promise所属线程(Executors)执行.

最新回复(0)