Spring Boot 接入Kafka 消息发送和消费案例

tech2023-07-06  116

最近换了新公司,关于消息队列都是用的 Apache Kafka,今天学了个基础的使用,分享一下

pom.xml依赖

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>

配置文件

server: port: 8888 spring: kafka: consumer: bootstrap-servers: 127.0.0.1:9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: 127.0.0.1:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

消息生产

package com.bob.kafka.engine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class Producer { private static final Logger logger = LoggerFactory.getLogger(Producer.class); private static final String TOPIC = "users"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { logger.info(String.format("Producing Message -> %s", message)); kafkaTemplate.send(TOPIC, message); } }

消息消费

package com.bob.kafka.engine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class Consumer { private final Logger logger = LoggerFactory.getLogger(Producer.class); @KafkaListener(topics = "users", groupId = "group_id") public void consume(String message) { logger.info(String.format("Consumed Message -> %s", message)); } }

对外接口

package com.bob.kafka.controller; import com.bob.kafka.engine.Producer; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "/kafka") public class KafkaController { private final Producer producer; public KafkaController(Producer producer) { this.producer = producer; } @PostMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producer.sendMessage(message); } }

启动类

package com.bob.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }

上面就是一个基础案例,下面测一下

curl -X POST -F ‘message=test’ http://localhost:8888/kafka/publish

最新回复(0)