Kafka生产者-拦截器

tech2024-03-12  76

拦截器(Interceptor)是Kafka0.10版本以后引入的新功能,主要用于实现客户端的定制化控制逻辑。 使用拦截器可以在消息发送前做自定义的逻辑处理工作。 使用场景:

按照特定规则过滤指定消息修改消息内容统计类需求

自定义拦截器Java实现

package com.example.kafkaproducer; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /** * @author Admin */ public class DefineInterceptor implements ProducerInterceptor<String, String> { /** * 初始化数据成功发送统计器 */ private volatile long sendSuccess = 0; /** * 初始化数据失败发送统计器 */ private volatile long sendFailure = 0; /** * 将每条发送的数据增加test-前缀 * @param record * @return */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { String modifiedValue = "test-" + record.value(); return new ProducerRecord<>( record.topic(), record.partition(), record.key(), modifiedValue ); } /** * 统计数据发送成功率 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception == null) { sendSuccess++; } else { sendFailure++; } } @Override public void close() { if (sendSuccess != 0 && sendFailure !=0) { System.out.println("发送成功率:" + String.format("%f", (double)(sendSuccess / (sendSuccess + sendFailure)) * 100 + "%")); } else { System.out.println("未有数据发出..."); } } @Override public void configure(Map<String, ?> configs) { } }

自定义拦截器使用

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, DefineInterceptor.class.getName());
最新回复(0)