Kafka生产者测试程序(Java)

tech2023-08-12  120

pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>kafka-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka-producer</name> <description>Kafka Producer Test</description> <properties> <java.version>1.8</java.version> <scala.version>2.12</scala.version> <kafka.version>0.11.0.3</kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.version}</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

Java程序

package com.example.kafkaproducer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * @author Admin */ public class Producer { private static final String BOOTSTARPSERVER = "node01:9092,node02:9092,node03:9092"; private static final String TOPIC = "test"; public static void main(String[] args) { Properties properties = new Properties(); // 设置Kafka集群地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTARPSERVER); // 设置Key序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置Value序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 10); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC, "kafka-demo", String.valueOf(Math.random())); try { while (true) { try { kafkaProducer.send(producerRecord); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } } } finally { kafkaProducer.close(); } } }
最新回复(0)