学了最基础的 Kafka 消息的发送和消费,现在我们进一步学习,当遇见错误信息时,该如何处理,下面是 Spring for kafka 的一句话,告诉我们 Spring Boot 做错误处理有多简单
If you are using Spring Boot, you simply need to add the error handler as a @Bean and boot will add it to the auto-configured factory.
配置文件
server:
port: 8888
spring:
kafka:
consumer:
bootstrap-servers: 127.0.0.1
:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: 127.0.0.1
:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
消息发送
import com
.bob
.kafka
.model
.Foo
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.kafka
.core
.KafkaTemplate
;
import org
.springframework
.stereotype
.Component
;
@Component
public class ErrorHandleProducer {
private static final Logger logger
= LoggerFactory
.getLogger(ErrorHandleProducer
.class);
private static final String TOPIC
= "err_handle";
@Autowired
private KafkaTemplate
<String, Object> kafkaTemplate
;
public void sendMessage(Foo foo
) {
logger
.info(String
.format("Producing Message -> %s", foo
.toString()));
this.kafkaTemplate
.send(TOPIC
, foo
);
}
}
消息消费
包含了正常的消息消费,和错误处理
import com
.bob
.kafka
.model
.Foo
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.kafka
.annotation
.KafkaListener
;
import org
.springframework
.stereotype
.Component
;
@Component
public class ErrorHandleConsumer {
private final Logger logger
= LoggerFactory
.getLogger(ErrorHandleConsumer
.class);
@KafkaListener(id
= "fooGroup", topics
= "err_handle")
public void listen(Foo in
) {
logger
.info("Consumed Message: " + in
.getFoo());
if (!in
.getFoo().startsWith("foo")) {
throw new RuntimeException("failed");
}
}
@KafkaListener(id
= "dltGroup", topics
= "err_handle.DLT")
public void dltListen(Foo in
) {
logger
.info("Consumed Message from DLT:" + in
.toString());
}
}
关于 dltListen 函数的相关解释 dead-letter
对外接口
import com
.bob
.kafka
.engine
.ErrorHandleProducer
;
import com
.bob
.kafka
.engine
.Producer
;
import com
.bob
.kafka
.model
.Foo
;
import org
.springframework
.kafka
.core
.KafkaTemplate
;
import org
.springframework
.web
.bind
.annotation
.PathVariable
;
import org
.springframework
.web
.bind
.annotation
.PostMapping
;
import org
.springframework
.web
.bind
.annotation
.RestController
;
@RestController
public class KafkaErrorHandleController {
private ErrorHandleProducer produce
;
public KafkaErrorHandleController(ErrorHandleProducer produce
) {
this.produce
= produce
;
}
@PostMapping(path
= "/send/{message}")
public void send(@PathVariable String message
) {
produce
.sendMessage(new Foo(message
));
}
}
测试
发送错误请求
控制台信息
发送正确请求
控制台信息
Kafka 序列化错误
异常信息
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of com.bob.kafka.model.Foo (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator) at [Source: (String)"{“foo”:“fail”}"; line: 1, column: 2]
解决方案 : 给实体类写个无参构造函数就可以了
参考资料
Spring for Kafka Spring Boot在反序列化过程中:cannot deserialize from Object value