Java操作Kafka创建Topic、Producer、Consumer

tech2022-07-14  162

环境

JDK 1.8 Zookeeper 3.6.1 Kafka 2.6.0

引入依赖

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.5</version> </dependency>

创建主题

public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.25.132:9092"); //kafka服务地址 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); AdminClient client = KafkaAdminClient.create(props);//创建操作客户端 //创建名称为test1的topic,有5个分区 NewTopic topic = new NewTopic("test1", 5, (short) 1); client.createTopics(Arrays.asList(topic)); client.close();//关闭 }

查看结果,在zookeeper路径/brokers/topics下新增了节点test1,就是刚才创建的topic主题

生产者

public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.132:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props); //异步发送20条消息 for (int i = 1; i <= 20; i++){ ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key" + i, "message" + i); producer.send(record); } producer.close(); }

先启动控制台的消费者,监听topic=test1的数据

kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1

然后运行生产者程序,这时消费端会输出消息信息

[root@cluster01 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1 message1 message2 message3 message4 message5 message6 message7 message8 message9 message10 message11 message12 message13 message14 message15 message16 message17 message18 message19 message20

带回调函数的生产者

回调函数是在生产者收到ack时调用,把上面的producer.send(record)加上回调即可

producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ System.out.println("success:" + recordMetadata.offset()); } else{ e.printStackTrace(); } } });

同步发送:消息发送后,会堵塞当前线程直到收到ack为止,才继续发送

//在后面加上get()即可 producer.send(record).get();

消费者

public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.132:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");//groupid相同的属于同一个消费者组 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交offset props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费test1主题 consumer.subscribe(Arrays.asList("test1")); while (true){ System.out.println("consumer is polling"); //5秒等待 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000)); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("offset=%d,key=%s,value=%s", record.offset(), record.key(), record.value())); } //同步提交,失败会重试 consumer.commitSync(); //异步提交,失败不会重试 //consumer.commitAsync(); } }

运行消费者,看到输出“consumer is polling”之后,再运行生产者,结果如下:

D:\Java\jdk1.8.0_131\bin\java.exe "-javaagent:D:\Idea\IntelliJ IDEA 2019.1.1\lib\idea_rt.jar=52238:D:\Idea\IntelliJ IDEA 2019.1.1\bin" -Dfile.encoding=UTF-8 -classpath D:\Java\jdk1.8.0_131\jre\lib\charsets.jar;D:\Java\jdk1.8.0_131\jre\lib\deploy.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_131\jre\lib\javaws.jar;D:\Java\jdk1.8.0_131\jre\lib\jce.jar;D:\Java\jdk1.8.0_131\jre\lib\jfr.jar;D:\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_131\jre\lib\jsse.jar;D:\Java\jdk1.8.0_131\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_131\jre\lib\plugin.jar;D:\Java\jdk1.8.0_131\jre\lib\resources.jar;D:\Java\jdk1.8.0_131\jre\lib\rt.jar;F:\JavaTest\ConsoleTrain\kafka\target\classes;E:\.m2\repository\org\apache\kafka\kafka-clients\2.6.0\kafka-clients-2.6.0.jar;E:\.m2\repository\com\github\luben\zstd-jni\1.4.4-7\zstd-jni-1.4.4-7.jar;E:\.m2\repository\org\lz4\lz4-java\1.7.1\lz4-java-1.7.1.jar;E:\.m2\repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;E:\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;E:\.m2\repository\org\slf4j\slf4j-nop\1.7.25\slf4j-nop-1.7.25.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.9.5\jackson-databind-2.9.5.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.9.5\jackson-core-2.9.5.jar com.train.consumer.SimpleConsumer consumer is polling consumer is polling partition=1,offset=15,key=key2,value=message2 partition=1,offset=16,key=key6,value=message6 partition=1,offset=17,key=key12,value=message12 consumer is polling partition=0,offset=25,key=key1,value=message1 partition=0,offset=26,key=key3,value=message3 partition=0,offset=27,key=key10,value=message10 partition=0,offset=28,key=key15,value=message15 partition=0,offset=29,key=key19,value=message19 partition=3,offset=20,key=key5,value=message5 partition=3,offset=21,key=key11,value=message11 partition=3,offset=22,key=key14,value=message14 partition=3,offset=23,key=key18,value=message18 partition=2,offset=15,key=key7,value=message7 partition=2,offset=16,key=key8,value=message8 partition=2,offset=17,key=key16,value=message16 partition=4,offset=25,key=key4,value=message4 partition=4,offset=26,key=key9,value=message9 partition=4,offset=27,key=key13,value=message13 partition=4,offset=28,key=key17,value=message17 partition=4,offset=29,key=key20,value=message20 consumer is polling

注意观察上图的数据:consumer消费数据,只保证同一个分区内的数据是有序的。当1个consumer去消费不同分区的数据时,分区之间的message不能保证顺序。

最新回复(0)