Kafka消费者测试程序(Java)

tech2023-09-03  90

pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>kafka-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka-consumer</name> <description>Kafka Consumer Test</description> <properties> <java.version>1.8</java.version> <scala.version>2.12</scala.version> <kafka.version>0.11.0.3</kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.version}</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

Java程序

package com.example.kafkaconsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; /** * @author Admin */ public class Consumer { private static final String BOOTSTRAP_SERVERS = "node01:9092,node02:9092,node03:9092"; private static final String TOPIC = "test"; private static final String GROUP_ID = "group.demo"; public static void main(String[] args) { Properties properties = new Properties(); // 设置Kafka集群地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // 设置Key反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 设置Value反序列化器 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 设置组ID properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Collections.singletonList(TOPIC)); while (true) { ConsumerRecords<String, String> consumerRecord = kafkaConsumer.poll(1000L); for (ConsumerRecord<String, String> re : consumerRecord) { System.out.println(re.value()); } } } }
最新回复(0)