最近换了新公司,关于消息队列都是用的 Apache Kafka,今天学了个基础的使用,分享一下
pom.xml依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-test
</artifactId>
<scope>test
</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage
</groupId>
<artifactId>junit-vintage-engine
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka
</groupId>
<artifactId>spring-kafka
</artifactId>
<version>2.5.5.RELEASE
</version>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-web
</artifactId>
</dependency>
</dependencies>
配置文件
server:
port: 8888
spring:
kafka:
consumer:
bootstrap-servers: 127.0.0.1
:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: 127.0.0.1
:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
消息生产
package com
.bob
.kafka
.engine
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.kafka
.core
.KafkaTemplate
;
import org
.springframework
.stereotype
.Service
;
@Service
public class Producer {
private static final Logger logger
= LoggerFactory
.getLogger(Producer
.class);
private static final String TOPIC
= "users";
@Autowired
private KafkaTemplate
<String, String> kafkaTemplate
;
public void sendMessage(String message
) {
logger
.info(String
.format("Producing Message -> %s", message
));
kafkaTemplate
.send(TOPIC
, message
);
}
}
消息消费
package com
.bob
.kafka
.engine
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.kafka
.annotation
.KafkaListener
;
import org
.springframework
.stereotype
.Service
;
@Service
public class Consumer {
private final Logger logger
= LoggerFactory
.getLogger(Producer
.class);
@KafkaListener(topics
= "users", groupId
= "group_id")
public void consume(String message
) {
logger
.info(String
.format("Consumed Message -> %s", message
));
}
}
对外接口
package com
.bob
.kafka
.controller
;
import com
.bob
.kafka
.engine
.Producer
;
import org
.springframework
.web
.bind
.annotation
.PostMapping
;
import org
.springframework
.web
.bind
.annotation
.RequestMapping
;
import org
.springframework
.web
.bind
.annotation
.RequestParam
;
import org
.springframework
.web
.bind
.annotation
.RestController
;
@RestController
@RequestMapping(value
= "/kafka")
public class KafkaController {
private final Producer producer
;
public KafkaController(Producer producer
) {
this.producer
= producer
;
}
@PostMapping(value
= "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message
) {
this.producer
.sendMessage(message
);
}
}
启动类
package com
.bob
.kafka
;
import org
.springframework
.boot
.SpringApplication
;
import org
.springframework
.boot
.autoconfigure
.SpringBootApplication
;
@SpringBootApplication
public class KafkaApplication {
public static void main(String
[] args
) {
SpringApplication
.run(KafkaApplication
.class, args
);
}
}
上面就是一个基础案例,下面测一下
curl -X POST -F ‘message=test’ http://localhost:8888/kafka/publish