你绝对能看懂的Kafka源代码分析-KafkaConsumer类代码分析

tech2024-06-02  79

前文我们分析了Kafka生产者端的源代码,了解了生产者产生消息的过程。消息由生产者发布到某个主题的某个分区上,其实最终是被存储在服务端的某个broker上。而消费者由订阅行为来决定它所要消费的主题和分区。消费者通过poll操作,不断的从服务端拉取该主题分区上产生的消息。

相信有兴趣看kafka源代码的同学,肯定对kafka的基本概念和原理有所了解。关于消费者,我们知道在服务端会有GroupCoordinator,负责每个consumer group的leader的选举,以及分发分区分配结果,而coumer的leader则会根据分区分配策略进行分区分配。这里需要注意,分区分配结果并不是由leader分发给同组的consumer,而是leader返回给GroupCoordinator,再有GroupCoordinator进行分发。

每当Broker有变化,或者Consumer Group有出入组的变化时,会触发ConsumerGroup的rebalance。也就是上述的分区分配工作。

另外消费者本地保存了它所负责主题分区的消费状态,通过手动和自动的方式提交到服务端的内部主题中。rebalance过后,消费者重新从内部主题获取对应主题分区的消费位置。

关于消费者更多的内容介绍请参考我另外一篇文章《Apache Kafka核心组件和流程-协调器(消费者和组协调器)》

上面我们回顾了Consumer的设计和流程,为我们进入源代码分析做好铺垫。接下来我们将从KafkaConsumer入手,进行代码分析。

KafkaConsumer

我们先看下使用KafkaConsumer进行消费的部分代码:

 

private final KafkaConsumer<Integer, String> consumer;

 

.........

 

consumer.subscribe(Collections.singletonList(this.topic));

 

ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));

 

for (ConsumerRecord<Integer, String> record : records) {

 

System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());

 

}

以上代码来自于源代码包中的例子,我们可以看到KafkaConsumer先订阅topic,然后通过poll方法进行消息拉取。

可以看到KafkaConsumer通过poll方法进行消费,这也是KafkaConsumer最主要的方法。

我们先看看KafkaConsumer内部的其他组件有哪些,见下图:

上图介绍了KafkaConsumer内部的几个重要组件:

1、前文说过消费者要自己记录消费的位置(但也需要提交到服务端保存,为了rebalance后的消费能衔接上),所以我们需要SubScriptionState来保存消费的状态。

2、ConsumerCoordinator负责和GroupCoordinator通讯,例如在leader选举,入组,分区分配等过程。

3、ConsumerNetworkClient是对NetworkClient的封装,如果你是从producer看过来的,想必对NetworkClient十分了解,他对nio的组件进行封装,实现网络IO。

4、PartitionAssignor,这是分区分配策略,在进行分区分配的时候会用到。

5、Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。

我们抛开订阅、rebalance这些流程,先以kafka消费流程为主,进行分析。有些组件在消费流程中是涉及不到的。消费流程主要涉及到Fetcher、SubScriptionState和ConsumerNetworkClient。特别是Fetcher,承担了重要的工作。不过我们还需要一步步来,先进入poll方法的分析。

poll()方法

这是消息拉取的入口方法,他会从上次消费的位置拉取消息,也可以手动指定消费位置。入参是阻塞的时长,如果有消息将会立即返回,否则会阻塞到超时,如果没有数据则返回空的数据集合。

代码如下:

 

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {

acquireAndEnsureOpen();

try {

if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {

throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

}

 

// poll for new data until the timeout expires

do {

client.maybeTriggerWakeup();

 

if (includeMetadataInTimeout) {

if (!updateAssignmentMetadataIfNeeded(timer)) {

return ConsumerRecords.empty();

}

} else {

while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {

log.warn("Still waiting for metadata");

}

}

 

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

if (!records.isEmpty()) {

// before returning the fetched records, we can send off the next round of fetches

// and avoid block waiting for their responses to enable pipelining while the user

// is handling the fetched records.

//

// NOTE: since the consumed position has already been updated, we must not allow

// wakeups or any other errors to be triggered prior to returning the fetched records.

if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {

client.pollNoWakeup();

}

 

return this.interceptors.onConsume(new ConsumerRecords<>(records));

}

} while (timer.notExpired());

 

return ConsumerRecords.empty();

} finally {

release();

}

}

逻辑说明:

1、通过acquireAndEnsureOpen()确保本对象是单线程进入,这是因为KafkaConsumer非线程安全。

2、检查是否订阅了topic

3、进入主循环,条件是没有超时

4、在主循环中通过pollForFetches()拉取一次消息。这个方法中先检查是否经存在拉取过的未加工消息,这是因为上一轮次拉取做了提前拉取处理。有可能已经拉取回消息等待处理。如果没有已拉取未加工数据,则准备新的拉取请求,网络IO拉取消息,加工拉取回来的数据。

5、如果上一步拉取到消息,并不会立即返回,而是再一次触发消息拉取,并且使用的是非阻塞方式,调用client.pollNoWakeup()。这样做的目的是,提前网络IO,把消息拉取请求发出去。在网络IO的同时,消息数据返回给consumer的调用者进行业务处理。这样做到了并行处理,提高了效率。等下次调用KafkaConsumer进行poll,当进行到第4步时,有可能直接返回了上轮次提前拉取到的消息,从而省去了网络IO时间。

我们通过下图帮助理解上面4、5步的设计:

图中带颜色的方框代表在整个拉取消息的流程中,不同的处理过程,分布于不同的对象中。图中下半部分展示的是Kafka处理逻辑。可以看到在第一轮次调用了两次ConusmerNetworkClient进行IO处理,第二次IO的同时,调用者已经开始拿到返回的消息进行业务处理,这里实现了并行处理。进入第二轮次,我们发现kafkaConsumer可以直接取到上轮第二次IO回来的消息进行加工,加工后返回调用者,进行业务处理,同时下一轮次的消息拉取异步进行中。可以看到第二轮次的总时长已经没有了网络IO的时长,因为这部分工作在上一轮次已经异步进行完成。

如果不这样做,会怎么样呢?我们看图中上半部分,我们发现每个轮次都是一样的,网络IO都需要同步等待,从第二轮开始,整个消息拉取处理的时长明显增加了IO部分,会更长。

以上情况比较极端,每次提前IO都会返回数据,并且消息的业务处理时长大于网络IO。这种情况下,能最大发挥出异步IO的优势。

以上这种设计的小细节真的值得我们来学习。读源代码在了解原理的同时,我们也要多总结优秀的设计思想,对我们的工作很有帮助。

从上面的分析看到,真正消息拉取的代码是:

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

下面我们继续分析pollForFetches方法。

pollForFetches()方法

这个方法完成了从服务端拉取消息的动作,这个过程主要使用了Fetcher和ConsumerNetworkClient两个组件。Fetcher负责准备好拉取消息的request、处理response,并且把消息转化为对调用者友好的格式。ConsumerNetworkClient负责把请求发送出去,接收返回,也就是网络IO工作。

他的主要流程是如下四步:

1、查看是否已经存在拉取回来未加工的消息原始数据,有的话立即调用fetcher.fetchedRecords()加工,然后返回。

2、如果没有未加工的原始数据,那么调用fetcher.sendFetches()准备拉取请求。

3、通过ConsumerNetworkClient发送拉取请求。

4、加工拉取回的原始数据,返回。

其实正常来说2,3,4步流程就足够了。为什么会有第1步呢?那些已经存在的未加工的数据哪里来的?如果你理解了前面所讲的异步拉取设计,那么你应该知道答案。这些已经存在的未加工数据来自于上一轮次的异步IO。正是因为有了异步的IO拉取,才会有第一步的处理可能。

完整代码如下:

 

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {

long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

 

// if data is available already, return it immediately

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();

if (!records.isEmpty()) {

return records;

}

 

// send any new fetches (won't resend pending fetches)

fetcher.sendFetches();

 

// We do not want to be stuck blocking in poll if we are missing some positions

// since the offset lookup may be backing off after a failure

 

// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call

// updateAssignmentMetadataIfNeeded before this method.

if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {

pollTimeout = retryBackoffMs;

}

 

Timer pollTimer = time.timer(pollTimeout);

client.poll(pollTimer, () -> {

// since a fetch might be completed by the background thread, we need this poll condition

// to ensure that we do not block unnecessarily in poll()

return !fetcher.hasCompletedFetches();

});

timer.update(pollTimer.currentTimeMs());

 

// after the long poll, we should check whether the group needs to rebalance

// prior to returning data so that the group can stabilize faster

if (coordinator.rejoinNeededOrPending()) {

return Collections.emptyMap();

}

 

return fetcher.fetchedRecords();

}

可以看到以上过程除了IO操作外,都是通过Fetcher完成的,足以体现他的重要。接下来的章节将会重点分析Fetcher。

小结

本篇先是回顾了Kafka消费者的设计,然后从KafkaConsumer的Poll方法入手对拉取的逻辑进行分析。Kafka很巧妙的采用异步IO方式,缩短整个流程的时长。接下来我们将会进入Fetcher的分析,看其如何准备拉取消息的请求,并完成消息的转化处理。

最新回复(0)