前文我们分析了Kafka生产者端的源代码,了解了生产者产生消息的过程。消息由生产者发布到某个主题的某个分区上,其实最终是被存储在服务端的某个broker上。而消费者由订阅行为来决定它所要消费的主题和分区。消费者通过poll操作,不断的从服务端拉取该主题分区上产生的消息。
相信有兴趣看kafka源代码的同学,肯定对kafka的基本概念和原理有所了解。关于消费者,我们知道在服务端会有GroupCoordinator,负责每个consumer group的leader的选举,以及分发分区分配结果,而coumer的leader则会根据分区分配策略进行分区分配。这里需要注意,分区分配结果并不是由leader分发给同组的consumer,而是leader返回给GroupCoordinator,再有GroupCoordinator进行分发。
每当Broker有变化,或者Consumer Group有出入组的变化时,会触发ConsumerGroup的rebalance。也就是上述的分区分配工作。
另外消费者本地保存了它所负责主题分区的消费状态,通过手动和自动的方式提交到服务端的内部主题中。rebalance过后,消费者重新从内部主题获取对应主题分区的消费位置。
关于消费者更多的内容介绍请参考我另外一篇文章《Apache Kafka核心组件和流程-协调器(消费者和组协调器)》
上面我们回顾了Consumer的设计和流程,为我们进入源代码分析做好铺垫。接下来我们将从KafkaConsumer入手,进行代码分析。
我们先看下使用KafkaConsumer进行消费的部分代码:
private final KafkaConsumer<Integer, String> consumer;
.........
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
以上代码来自于源代码包中的例子,我们可以看到KafkaConsumer先订阅topic,然后通过poll方法进行消息拉取。
可以看到KafkaConsumer通过poll方法进行消费,这也是KafkaConsumer最主要的方法。
我们先看看KafkaConsumer内部的其他组件有哪些,见下图:
上图介绍了KafkaConsumer内部的几个重要组件:
1、前文说过消费者要自己记录消费的位置(但也需要提交到服务端保存,为了rebalance后的消费能衔接上),所以我们需要SubScriptionState来保存消费的状态。
2、ConsumerCoordinator负责和GroupCoordinator通讯,例如在leader选举,入组,分区分配等过程。
3、ConsumerNetworkClient是对NetworkClient的封装,如果你是从producer看过来的,想必对NetworkClient十分了解,他对nio的组件进行封装,实现网络IO。
4、PartitionAssignor,这是分区分配策略,在进行分区分配的时候会用到。
5、Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。
我们抛开订阅、rebalance这些流程,先以kafka消费流程为主,进行分析。有些组件在消费流程中是涉及不到的。消费流程主要涉及到Fetcher、SubScriptionState和ConsumerNetworkClient。特别是Fetcher,承担了重要的工作。不过我们还需要一步步来,先进入poll方法的分析。
这是消息拉取的入口方法,他会从上次消费的位置拉取消息,也可以手动指定消费位置。入参是阻塞的时长,如果有消息将会立即返回,否则会阻塞到超时,如果没有数据则返回空的数据集合。
代码如下:
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.pollNoWakeup();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
}
}
逻辑说明:
1、通过acquireAndEnsureOpen()确保本对象是单线程进入,这是因为KafkaConsumer非线程安全。
2、检查是否订阅了topic
3、进入主循环,条件是没有超时
4、在主循环中通过pollForFetches()拉取一次消息。这个方法中先检查是否经存在拉取过的未加工消息,这是因为上一轮次拉取做了提前拉取处理。有可能已经拉取回消息等待处理。如果没有已拉取未加工数据,则准备新的拉取请求,网络IO拉取消息,加工拉取回来的数据。
5、如果上一步拉取到消息,并不会立即返回,而是再一次触发消息拉取,并且使用的是非阻塞方式,调用client.pollNoWakeup()。这样做的目的是,提前网络IO,把消息拉取请求发出去。在网络IO的同时,消息数据返回给consumer的调用者进行业务处理。这样做到了并行处理,提高了效率。等下次调用KafkaConsumer进行poll,当进行到第4步时,有可能直接返回了上轮次提前拉取到的消息,从而省去了网络IO时间。
我们通过下图帮助理解上面4、5步的设计:
图中带颜色的方框代表在整个拉取消息的流程中,不同的处理过程,分布于不同的对象中。图中下半部分展示的是Kafka处理逻辑。可以看到在第一轮次调用了两次ConusmerNetworkClient进行IO处理,第二次IO的同时,调用者已经开始拿到返回的消息进行业务处理,这里实现了并行处理。进入第二轮次,我们发现kafkaConsumer可以直接取到上轮第二次IO回来的消息进行加工,加工后返回调用者,进行业务处理,同时下一轮次的消息拉取异步进行中。可以看到第二轮次的总时长已经没有了网络IO的时长,因为这部分工作在上一轮次已经异步进行完成。
如果不这样做,会怎么样呢?我们看图中上半部分,我们发现每个轮次都是一样的,网络IO都需要同步等待,从第二轮开始,整个消息拉取处理的时长明显增加了IO部分,会更长。
以上情况比较极端,每次提前IO都会返回数据,并且消息的业务处理时长大于网络IO。这种情况下,能最大发挥出异步IO的优势。
以上这种设计的小细节真的值得我们来学习。读源代码在了解原理的同时,我们也要多总结优秀的设计思想,对我们的工作很有帮助。
从上面的分析看到,真正消息拉取的代码是:
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);下面我们继续分析pollForFetches方法。
这个方法完成了从服务端拉取消息的动作,这个过程主要使用了Fetcher和ConsumerNetworkClient两个组件。Fetcher负责准备好拉取消息的request、处理response,并且把消息转化为对调用者友好的格式。ConsumerNetworkClient负责把请求发送出去,接收返回,也就是网络IO工作。
他的主要流程是如下四步:
1、查看是否已经存在拉取回来未加工的消息原始数据,有的话立即调用fetcher.fetchedRecords()加工,然后返回。
2、如果没有未加工的原始数据,那么调用fetcher.sendFetches()准备拉取请求。
3、通过ConsumerNetworkClient发送拉取请求。
4、加工拉取回的原始数据,返回。
其实正常来说2,3,4步流程就足够了。为什么会有第1步呢?那些已经存在的未加工的数据哪里来的?如果你理解了前面所讲的异步拉取设计,那么你应该知道答案。这些已经存在的未加工数据来自于上一轮次的异步IO。正是因为有了异步的IO拉取,才会有第一步的处理可能。
完整代码如下:
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});
timer.update(pollTimer.currentTimeMs());
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}
return fetcher.fetchedRecords();
}
可以看到以上过程除了IO操作外,都是通过Fetcher完成的,足以体现他的重要。接下来的章节将会重点分析Fetcher。
本篇先是回顾了Kafka消费者的设计,然后从KafkaConsumer的Poll方法入手对拉取的逻辑进行分析。Kafka很巧妙的采用异步IO方式,缩短整个流程的时长。接下来我们将会进入Fetcher的分析,看其如何准备拉取消息的请求,并完成消息的转化处理。