kafka处理大消息的相关配置

tech2024-01-25  82

kafka的设计初衷是迅速处理短消息,比如几k或者几十k,当然更小也不合适,一般认为处理10k大小的消息吞吐量性能最好。 但有些情况下,没有办法限制消息的体积或者分割消息,就必须更改kafka的相关配置。

kafka消息的最大值

首先,kafka的一条消息肯定是有一个最大值的,在kafka的数据存储格式中,有一个4个字节的整数值来描述这条消息的大小,也就是说一条消息最大是2147483647字节,接近2个g。

producer端相关参数

max.request.size

Int类型,默认值是1048576,也即是1M,一条消息的最大值,超过后这条消息不会被发送

buffer.memory

Long类型,默认值是33554432,32M,生产者用来缓存等待发送到服务器的消息的内存总字节数

batch.size

int类型,默认值16384,16k,当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。我的理解是如果某几条消息可以被打包成一批,就会被限制在这个大小内,但是如果某条消息自身就比这个值大,这条消息还是会发送,除非这条消息的大小超过了max.request.size

request.timeout.ms

int类型,默认值30000,也就是30s,该配置控制客户端等待请求响应的最长时间。如果消息过大的话,显然要增大这个时间。

例如

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("max.request.size", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();

consumer端相关参数

fetch.max.bytes

int类型,默认值52428800,50M,服务器为获取请求应返回的最大数据量。使用者将批量获取记录,并且如果获取的第一个非空分区中的第一个记录批次大于此值,则仍将返回记录批次以确保使用者可以取得进展。因此,这不是绝对最大值。代理可接受的最大记录批处理大小是通过“ message.max.bytes”(代理配置)或“ max.message.bytes”(主题配置)定义的。请注意,使用者并行执行多个提取。

max.poll.interval.ms

int,300000,也就是300s,根据实际情况增加或减小。使用消费者组管理时poll()调用之间的最大延迟。消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。如果报一下错误,可以考虑增大这个参数了。

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=hsta.cdcGroup] Asynchronous auto-commit of offsets {hs_ta_channel_Metadata-0=OffsetAndMetadata{offset=22, leaderEpoch=null, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

max.poll.records

int,500,在单次调用poll()中返回的最大记录数。我认为是如果单条消息体积大的话,这个值应该减小,可以根据实际情况先观察。

request.timeout.ms

int, 305000, 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽则客户端将重新发送请求。

session.timeout.ms

int, 10000,用于发现消费者故障的超时时间。消费者周期性的发送心跳到broker,表示其还活着。如果会话超时期满之前没有收到心跳,那么broker将从分组中移除消费者,并启动重新平衡。请注意,该值必须在broker配置的group.min.session.timeout.ms和group.max.session.timeout.ms允许的范围内。

broker端配置

message.max.bytes

int,1000012,接近1M, 服务器可以接收的消息的最大大小

replica.fetch.max.bytes

int,1048576,限制副本拉取消息的大小,在 0.8.2 以前的版本中,如果 replica.fetch.max.bytes < message.max.bytes,就会造成 follower 副本复制不了消息

max.message.bytes(topic级别配置)

int,1000012,和message.max.bytes作用一样,不过是对单个topic生效,好处是不需要重启kafka服务

开启消息压缩

首先要区分kafka的消息压缩和kafka的compact是两种概念,compact是日志清理策略的一种,和delete策略的概念并列。

消息压缩compress

好处:broker的磁盘空间占用减小,网络带宽占用会减小。

设置:

生产者 compression.type,有效的值有 none,gzip,snappy, 或 lz4。其中性能方面lz4>>snappy>gzip

broker可以配置compression.type(主题级别也可以配置),标准的压缩格式有’gzip’, ‘snappy’, lz4。还可以设置’uncompressed’,就是不压缩;设置为’producer’这意味着保留生产者设置的原始压缩编解码。默认值是producer。

消费者端不用配置。

另外,开启压缩之前要评估一下资源情况,cpu资源也是很宝贵的。

总结

为了保证极限情况下消息能被正常发送和接收,要维持下面的数量关系

max.request.size(producer端) < message.max.bytes(broker端) < fetch.max.bytes(consumer端) 或者是 max.request.size(producer端) < max.message.bytes(topic级别) < fetch.max.bytes(consumer端)

视情况调整堆内存:

kafka-server-start.sh:export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"

如果有总结错的,欢迎指正。

最新回复(0)