Kafka Error HandlerSpring Boot 接入Kafka关于消费端的错误处理

tech2024-11-13  36

学了最基础的 Kafka 消息的发送和消费,现在我们进一步学习,当遇见错误信息时,该如何处理,下面是 Spring for kafka 的一句话,告诉我们 Spring Boot 做错误处理有多简单

If you are using Spring Boot, you simply need to add the error handler as a @Bean and boot will add it to the auto-configured factory.

配置文件

server: port: 8888 spring: kafka: consumer: bootstrap-servers: 127.0.0.1:9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: 127.0.0.1:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

消息发送

import com.bob.kafka.model.Foo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * @author F */ @Component public class ErrorHandleProducer { private static final Logger logger = LoggerFactory.getLogger(ErrorHandleProducer.class); private static final String TOPIC = "err_handle"; @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public void sendMessage(Foo foo) { logger.info(String.format("Producing Message -> %s", foo.toString())); this.kafkaTemplate.send(TOPIC, foo); } }

消息消费

包含了正常的消息消费,和错误处理

import com.bob.kafka.model.Foo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @author F */ @Component public class ErrorHandleConsumer { private final Logger logger = LoggerFactory.getLogger(ErrorHandleConsumer.class); @KafkaListener(id = "fooGroup", topics = "err_handle") public void listen(Foo in) { logger.info("Consumed Message: " + in.getFoo()); if (!in.getFoo().startsWith("foo")) { throw new RuntimeException("failed"); } } @KafkaListener(id = "dltGroup", topics = "err_handle.DLT") public void dltListen(Foo in) { logger.info("Consumed Message from DLT:" + in.toString()); } }

关于 dltListen 函数的相关解释 dead-letter

对外接口

import com.bob.kafka.engine.ErrorHandleProducer; import com.bob.kafka.engine.Producer; import com.bob.kafka.model.Foo; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * @author F */ @RestController public class KafkaErrorHandleController { private ErrorHandleProducer produce; public KafkaErrorHandleController(ErrorHandleProducer produce) { this.produce = produce; } @PostMapping(path = "/send/{message}") public void send(@PathVariable String message) { produce.sendMessage(new Foo(message)); } }

测试

发送错误请求

控制台信息

发送正确请求

控制台信息

Kafka 序列化错误

异常信息

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of com.bob.kafka.model.Foo (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator) at [Source: (String)"{“foo”:“fail”}"; line: 1, column: 2]

解决方案 : 给实体类写个无参构造函数就可以了

参考资料

Spring for Kafka Spring Boot在反序列化过程中:cannot deserialize from Object value

最新回复(0)