Kafka学习记录 (四)----尝试分析Producer发送消息的学习笔记(二)。

tech2022-08-07  136


文章目录

引言一)BufferPool:1.1 ByteBuff 的内存申请1.2 ByteBuff 的内存归还 二)Sender线程的run方法解析:2.1 流程概述:2.1.1. 获取元数据2.1.2. 判断哪些分区的RecordBatch有消息可发2.1.3. 标识还没有拉取到元数据的topic2.1.4. 检查与要发送数据的主机的网络是否已经建立好。2.1.5. 多个leader partition可能在同一台服务器,期望使用相同的连接2.1.6. 对超时的批次是如何处理的?2.1.7. 执行发送请求2.1.8. 发送请求


引言

内容说明:

BufferPool代码分析sender()解析

一)BufferPool:

池化并复用ByteBuff,而不是用时即创、用后即毁;

ByteBuff的相关交互是用于封装RecordBatch ,入口见下。

org.apache.kafka.clients.producer#doSend()

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { //前几个步骤见前一篇文章 /** * 步骤七: * 把消息放入accumulator(32M的一个内存) * 然后有accumulator把消息封装成为一个批次一个批次的去发送。 * 这一步是申请内存的入口 */ RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); //省略... }

append方法是实现消息批次封装到双端队列的具体逻辑,同时也可以看到了ByteBuff的出现。

org.apache.kafka.clients.producer.internals.RecordAccumulator#append

public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { //前几个步骤见前一篇文章 ByteBuffer buffer = free.allocate(size, maxTimeToBlock); //.... if (appendResult != null) { //释放内存 //线程二到这儿,其实他自己已经把数据写到批次了。所以 //他的内存就没有什么用了,就把内存个释放了(还给内存池了。) free.deallocate(buffer); return appendResult; } /** * 步骤六: * 根据内存大小封装批次 * */ MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); //省略... }

1.1 ByteBuff 的内存申请

org.apache.kafka.clients.producer.internals.BufferPoo、

// ----------相关属性 private final long totalMemory; private final int poolableSize; private final ReentrantLock lock; //池子就是一个队列,队列里面放的就是一个块一块的内存 //就是跟一个连接池是一个道理。 private final Deque<ByteBuffer> free; private final Deque<Condition> waiters; private long availableMemory; private final Metrics metrics; private final Time time; private final Sensor waitTime; /** * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool * is configured with blocking mode. * * @param size The buffer size to allocate in bytes * @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available * @return The buffer * @throws InterruptedException If the thread is interrupted while blocked * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block * forever) */ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { //如果你想要申请的内存的大小,如果超过32M if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); //加锁的代码 this.lock.lock(); try { // check if we have a free buffer of the right size pooled //poolableSize 代表的是一个批次的大小,默认情况一下,一个批次的大小是16K。 //如果我们这次申请的批次的大小等于 我们设定好的一个批次的大小 //并且我们的内存池不为空,那么直接从内存池里面获取一个块内存就可以使用了。 //跟我们连接池是一个道理。 //poolable Size默认批次的大小 //申请的这个内存的大小是否等于默认的批次的内存大小 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block //内存的个数 * 批次的大小 = free的大小 //内存池内存的大小 int freeListSize = this.free.size() * this.poolableSize; // size: 我们这次要申请的内存 //this.availableMemory + freeListSize 目前可用的总内存 //this.availableMemory + freeListSize 目前可用的总内存 大于你要申请的内存。 if (this.availableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request freeUp(size); //进行内存扣减 this.availableMemory -= size; lock.unlock(); //直接分配内存 return ByteBuffer.allocate(size);//16k } else { //还有一种情况就是,我们整个内存池 还剩10k的内存,但是我们这次申请的内存是32k //批次可能就是16k,但是我们的一条消息,就是32K -> max(15,32) = 当前批次 = 32K // we are out of memory and will have to block //统计分配的内存 int accumulated = 0; ByteBuffer buffer = null; // Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); //等待 被人释放内存 this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one /** * * 总的分配的思路,可能一下子分配不了这么大的内存,但是可以先有点分配一点。 * */ //如果分配的内存的大小 还是没有要申请的内存大小大。 //内存池就会一直分配的内存,一点一点的去分配。 //等着别人会释放内存。 //accumulated 5K+16K=21K 16K // size 32K while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { //在等待,等待别人释放内存。 //如果这儿的代码是等待wait操作 //那么我们可以猜想一下,当有人释放内存的时候 //肯定不是得唤醒这儿的代码 //目前代码一直在等待着 //假设,突然有人 往内存池里面还了内存。 //那么这儿的代码就可以被唤醒了。代码就继续往下执行。 // (1) 时间到了 //(2)被人唤醒了。 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } if (waitingTimeElapsed) { this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory //再次方式看一下,内存池里面有没有数据了。 //如果内存池里面有数据 //并且你申请的内存的大小就是一个批次的大小 //32K 16K if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list //这儿就可以直接获取到内存。 buffer = this.free.pollFirst(); accumulated = size; } else { // we'll need to allocate memory, but we may only get // part of what we need on this iteration freeUp(size - accumulated); // 可以给你分配的内存 int got = (int) Math.min(size - accumulated, this.availableMemory); //做内存扣减 13K this.availableMemory -= got; //累加已经分配了多少内存。 accumulated += got; } } // remove the condition for this thread to let the next thread // in line start getting memory Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen."); // signal any additional waiters if there is more memory left // over for them if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } // unlock and return the buffer lock.unlock(); if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally { if (lock.isHeldByCurrentThread()) lock.unlock(); } }

1.2 ByteBuff 的内存归还

/** * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the * memory as free. * * @param buffer The buffer to return * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity * since the buffer may re-allocate itself during in-place compression */ public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { //如果你还回来的内存的大小 就等于一个批次的大小, //我们的参数设置的内存是16K,你计算出来一个批次的大小也是16,申请的内存也是16k //16K 32K if (size == this.poolableSize && size == buffer.capacity()) { //内存里面的东西清空 buffer.clear(); //把内存放入到内存池 this.free.add(buffer); } else { //但是如果 我们释放的内存的大小 //不是一个批次的大小,那就把归为可用内存 //等着垃圾回收即可 this.availableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) //释放了内存(或者是还了内存以后) //都会唤醒等待内存的线程。 //接下来是不是还是要唤醒正在等待分配内存的线程。 moreMem.signal(); } finally { lock.unlock(); } }

二)Sender线程的run方法解析:

2.1 流程概述:

org.apache.kafka.clients.producer.internals.Sender#run(long now)

在上一篇文章中,我们罗列了run方法的内部流程大致为以下八个方法:

2.1.1. 获取元数据
Cluster cluster = metadata.fetch();
2.1.2. 判断哪些分区的RecordBatch有消息可发
//now 是外层传递过来的时间 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

org.apache.kafka.clients.producer.internals.RecordAccumulator#ready

/** * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated * partition batches. * <p> * A destination node is ready to send data if: * <ol> * <li>There is at least one partition that is not backing off its send * <li><b>and</b> those partitions are not muted (to prevent reordering if * {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION} * is set to one)</li> * <li><b>and <i>any</i></b> of the following are true</li> * <ul> * <li>The record set is full</li> * <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li> * <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions * are immediately considered ready).</li> * <li>The accumulator has been closed</li> * </ul> * </ol> */ public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>(); //如果exhausted的值等于true,waiters里面有数据,说明内存池里面的内存不够用了。在上面分析内存分配的时候 //我们看到kafka boolean exhausted = this.free.queued() > 0; //遍历所有的分区 for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { //获取到分区 TopicPartition part = entry.getKey(); //获取到分区对应的队列 Deque<RecordBatch> deque = entry.getValue(); //根据分区 可以获取到这个分区的leader partition在哪一台kafka的主机上面。 Node leader = cluster.leaderFor(part); synchronized (deque) { //如果没有找到对应主机。 unknownLeaderTopics if (leader == null && !deque.isEmpty()) { // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !muted.contains(part)) { //首先从队列的队头获取到批次 RecordBatch batch = deque.peekFirst(); //如果这个catch不null,我们判断一下是否可以发送这个批次。 if (batch != null) { /** * * 解析来就判断这个批次是否符合发送出去的条件 * */ /** * batch.attempts:重试的次数 * batch.lastAttemptMs:上一次重试的时间 * retryBackoffMs:重试的时间间隔 * * backingOff:重新发送数据的时间到了 * * 但是,如果我们用的是场景驱动的方式,那很明显我们是第一次发送消息 * 肯定还没有到重试到地步。 */ boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; /** * nowMs: 当前时间 * batch.lastAttemptMs: 上一次重试的时间。 * waitedTimeMs=这个批次已经等了多久了。 */ long waitedTimeMs = nowMs - batch.lastAttemptMs; /** * timeToWaitMs =lingerMs * lingerMs * 这个值默认是0,如果这个值默认是0 的话,那代表着来一条消息 * 就发送一条消息,那很明显是不合适的。 * 所以我们发送数据的时候,大家一定要记得去配置这个参数。 * 假设我们配置的是100ms * timeToWaitMs = linerMs = 100ms * 消息最多存多久就必须要发送出去了。 */ long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; /** * timeToWaitMs: 最多能等待多久 * waitedTimeMs: 已经等待了多久 * timeLeftMs: 还要在等待多久 */ long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); /** *如果队列大于一,说明这个队列里面至少有一个批次肯定是写满了 * 如果批次写满了肯定是可以发送数据了。 *当然也有可能就是这个队列里面只有一个批次,然后刚好这个批次 * 写满了,也可以发送数据。 * * full:是否有写满的批次 */ boolean full = deque.size() > 1 || batch.records.isFull(); /** * waitedTimeMs:已经等待了多久 * timeToWaitMs:最多需要等待多久 * expired: 时间到了,到了发送消息的时候了 * 如果expired=true 代表就是时间到了,到了发送消息的时候了 */ boolean expired = waitedTimeMs >= timeToWaitMs; /** * 1)full: 如果一个批次写满了(无论时间有没有到) * 2)expired:时间到了(批次没写满也得发送) * 3)exhausted:内存不够(消息发送出去以后,就会释放内存) */ boolean sendable = full || expired || exhausted || closed || flushInProgress(); //可以发送消息 if (sendable && !backingOff) { //把可以发送【批次】的Partition的leader partition所在的主机加入到 //readyNodes readyNodes.add(leader); } else { // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
2.1.3. 标识还没有拉取到元数据的topic
if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); }
2.1.4. 检查与要发送数据的主机的网络是否已经建立好。
if (!this.client.ready(node, now)) { //如果返回的是false !false 代码就进来 //移除result 里面要发送消息的主机。 //所以我们会看到这儿所有的主机都会被移除 iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); }

org.apache.kafka.clients.NetworkClient#ready(Node node, long now)

这里面有个很费解的地方,在上述步骤2中去寻找topic对应的partitionLeaderNode了。但是在初次的代码流程里面,网络连接是未建立的,呢么下来又会把这些partitionLeaderNode在移除掉,不太明白是为什么。

/** * Begin connecting to the given node, return true if we are already connected and ready to send to that node. * * @param node The node to check * @param now The current timestamp * @return True if we are ready to send to the given node */ @Override public boolean ready(Node node, long now) { //如果当前检查的节点为null,就报异常。 if (node.isEmpty()) throw new IllegalArgumentException("Cannot connect to empty node " + node); //判断要发送消息的主机,是否具备发送消息的条件 //第一次进来,不具备。false if (isReady(node, now)) return true; //判断是否可以尝试去建立网络 if (connectionStates.canConnect(node.idString(), now)) // if we are interested in sending to a node and we don't have a connection to it, initiate one //初始化连接 //绑定了 连接到事件而已 initiateConnect(node, now); return false; }

其中,网络的建立与nio事件的绑定的具体过程,在kafka网络篇中出现,这里暂不展开。

2.1.5. 多个leader partition可能在同一台服务器,期望使用相同的连接
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained //如果batches 空的话,这而的代码也就不执行了。 for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } }
2.1.6. 对超时的批次是如何处理的?
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches);
2.1.7. 执行发送请求
List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } //TODO 发送请求的操作 for (ClientRequest request : requests) //绑定 op_write client.send(request, now);
2.1.8. 发送请求
this.client.poll(pollTimeout, now);

这个方法包含着初次网络的建立。

org.apache.kafka.clients.NetworkClient#poll

/** * Do actual reads and writes to sockets. * * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately, * must be non-negative. The actual timeout will be the minimum of timeout, request timeout and * metadata timeout * @param now The current time in milliseconds * @return The list of responses received */ @Override public List<ClientResponse> poll(long timeout, long now) { // ... 省略 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); // ... 省略 }

org.apache.kafka.common.network.Selector#poll

@Override public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); //从Selector上找到有多少个key注册了 int readyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); //因为我们用场景驱动的方式 //我们刚刚确实是注册了一个key if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { //立马就要对这个Selector上面的key要进行处理。 pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } //TODO 对stagedReceives里面的数据要进行处理 addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); }

pollSelectionKeys

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { //获取到所有key Iterator<SelectionKey> iterator = selectionKeys.iterator(); //遍历所有的key while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); //根据key找到对应的KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ /** * * 我们代码第一次进来应该要走的是这儿分支,因为我们前面注册的是 * SelectionKey key = socketChannel.register(nioSelector, * SelectionKey.OP_CONNECT); * */ if (isImmediatelyConnected || key.isConnectable()) { //TODO 核心的代码来了 //去最后完成网络的连接 //如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你 //完成网络的连接。 if (channel.finishConnect()) { //网络连接已经完成了以后,就把这个channel存储到 this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; //接受服务端发送回来的响应(请求) //networkReceive 代表的就是一个服务端发送 //回来的响应 //里面不断的读取数据,读取数据的代码我们之前就已经分析过 //里面还涉及到粘包和拆包的一些问题(见后文)。 while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ //核心代码,处理发送请求的事件 //selector 注册了一个OP_WRITE //selector 注册了一个OP_READ if (channel.ready() && key.isWritable()) { //获取到我们要发送的那个网络请求。 //是这句代码就是要往服务端发送数据了。 //TODO:服务端 //里面我们发现如果消息被发送出去了,然后移除OP_WRITE Send send = channel.write(); //已经完成响应消息的发送 if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); } } }

通过自定义协议 ,处理黏包与拆包

channel.read() 的实现方法为

public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } //一直在读取数据。 receive(receive); //是否读完一个完整的响应消息 if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } return result; }

receive = > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel

public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; //size是一个4字节大小的内存空间 //如果size还有剩余的内存空间。 if (size.hasRemaining()) { //先读取4字节的数据,(代表的意思就是后面跟着的消息体的大小) int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; //一直要读取到当这个size没有剩余空间 //说明已经读取到了一个4字节的int类型的数了。 if (!size.hasRemaining()) { size.rewind(); //4 -> 10 int receiveSize = size.getInt(); if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); //分配一个内存空间,这个内存空间的大小 //就是刚刚读出来的那个4字节的int的大小。 //10 this.buffer = ByteBuffer.allocate(receiveSize); } } if (buffer != null) { //去读取数据 int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; } /** * checks if there are any staged receives and adds to completedReceives */ private void addToCompletedReceives() { if (!this.stagedReceives.isEmpty()) { Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next(); KafkaChannel channel = entry.getKey(); if (!channel.isMute()) { //获取到每个连接对应的 请求队列 Deque<NetworkReceive> deque = entry.getValue(); //获取到响应 //对于我们服务端来说,这儿接收到的是请求 NetworkReceive networkReceive = deque.poll(); //把响应存入到completedReceives 数据结构里面 this.completedReceives.add(networkReceive); this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit()); if (deque.isEmpty()) iter.remove(); } } } }
最新回复(0)