kafka 重启consumer 重复消费问题

tech2024-12-14  23

kafka 重启consumer 重复消费问题

原文链接:https://blog.csdn.net/z1941563559/java/article/details/88753938

问题描述:kafka的某些topic在消费完毕后一段时间,重启唯一消费者,offset会重置为最小offset重新消费,一直导致kafka消费的重复消费问题。 问题产生原因:是offset信息过期导致的。我一直以为消费者保持在线,最新位移信息是不会过期的。但即使消费者在线,位移信息也会如约过期。配置的数据保留时间log.retention.hours=168小时比位移保留时间offsets.retention.minutes=1440即24小时要长。offset信息过期后,重启消费者。由于找不到offset信息,会根据配置auto.offset.reset=earliest从最小位移开始消费,导致之前已经消费的数据再次被消费。  

 

 

解决方案:

原文 :https://issues.apache.org/jira/browse/KAFKA-3806

调整log.retention.hours和offsets.retention.minutes的默认值

在特殊情况下,将log.retention.hours(168小时= 7天)和offsets.retention.minutes(1440分钟= 1天)的默认值结合使用可能很危险。偏移保留应始终大于对数保留。

我们已经观察到以下情况和问题:

生产者更新已于两天前禁止了该主题的数据生产,该主题未删除。消费者使用了所有数据并正确分配了对Kafka的偏移量。消费者不再对该主题进行抵消提交,因为没有更多传入数据,也没有任何可确认的内容。(我们已禁用自动提交功能,我不确定启用自动提交的行为方式。)一天后:Kafka根据offsets.retention.minutes清除了太旧的偏移量。两天后:长期运行的使用者在更新后重新启动,由于该主题已被offsets.retention.minutes删除,因此未找到该主题的任何已提交偏移量,因此从一开始就开始使用它。由于log.retention.hours较长,消息仍在Kafka中,大约5天的消息被再次读取。

解决此问题的已知解决方法:

显式配置log.retention.hours和offsets.retention.minutes,不要使用默认值。

提案:

将offsets.retention.minutes的默认值延长到至少比log.retention.hours大两倍。在Kafka启动期间检查这些值,如果offsets.retention.minutes小于log.retention.hours,则记录警告。在迁移指南中添加一条注释,以了解ZooKeeper和Kafka中存储偏移量之间的区别(http://kafka.apache.org/documentation.html#upgrade)。

 

 

修改:

默认参数 offsets.retention.minutes & log.retention.minutes 的默认值问题。

默认参数前者是7天,后者是24小时。会导致数据虽然保存但offset失效导致客户端数据重复消费的问题。

0.10.0.0官方的参数说明:http://kafka.apache.org/0100/documentation.html#log

offsets.retention.minutes

Log retention window in minutes for offsets topic

Kafka Server端保存的offset的过期时间。默认值1440(1440分钟也就是24小时),应该调整为与log.retention.hours一致,即10080。

 

log.retention.hours & log.retention.minutes

这两个参数都是用来设置删除日志的,无论哪个属性已经溢出,都会进行文件的删除。

log.retention.hours:  The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property 参数int类型,默认值:168 (168小时也就是7天)。

log.retention.minutes:  The number of minutes to keep a log file before deleting it (in minutes), secondary to log.retention.ms property. If not set, the value in log.retention.hours is used 参数int类型,默认值:null。

最新回复(0)