JDK 1.8 Zookeeper 3.6.1 Kafka 2.6.0
查看结果,在zookeeper路径/brokers/topics下新增了节点test1,就是刚才创建的topic主题
先启动控制台的消费者,监听topic=test1的数据
kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1然后运行生产者程序,这时消费端会输出消息信息
[root@cluster01 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1 message1 message2 message3 message4 message5 message6 message7 message8 message9 message10 message11 message12 message13 message14 message15 message16 message17 message18 message19 message20带回调函数的生产者
回调函数是在生产者收到ack时调用,把上面的producer.send(record)加上回调即可
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ System.out.println("success:" + recordMetadata.offset()); } else{ e.printStackTrace(); } } });同步发送:消息发送后,会堵塞当前线程直到收到ack为止,才继续发送
//在后面加上get()即可 producer.send(record).get();运行消费者,看到输出“consumer is polling”之后,再运行生产者,结果如下:
D:\Java\jdk1.8.0_131\bin\java.exe "-javaagent:D:\Idea\IntelliJ IDEA 2019.1.1\lib\idea_rt.jar=52238:D:\Idea\IntelliJ IDEA 2019.1.1\bin" -Dfile.encoding=UTF-8 -classpath D:\Java\jdk1.8.0_131\jre\lib\charsets.jar;D:\Java\jdk1.8.0_131\jre\lib\deploy.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_131\jre\lib\javaws.jar;D:\Java\jdk1.8.0_131\jre\lib\jce.jar;D:\Java\jdk1.8.0_131\jre\lib\jfr.jar;D:\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_131\jre\lib\jsse.jar;D:\Java\jdk1.8.0_131\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_131\jre\lib\plugin.jar;D:\Java\jdk1.8.0_131\jre\lib\resources.jar;D:\Java\jdk1.8.0_131\jre\lib\rt.jar;F:\JavaTest\ConsoleTrain\kafka\target\classes;E:\.m2\repository\org\apache\kafka\kafka-clients\2.6.0\kafka-clients-2.6.0.jar;E:\.m2\repository\com\github\luben\zstd-jni\1.4.4-7\zstd-jni-1.4.4-7.jar;E:\.m2\repository\org\lz4\lz4-java\1.7.1\lz4-java-1.7.1.jar;E:\.m2\repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;E:\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;E:\.m2\repository\org\slf4j\slf4j-nop\1.7.25\slf4j-nop-1.7.25.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.9.5\jackson-databind-2.9.5.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.9.5\jackson-core-2.9.5.jar com.train.consumer.SimpleConsumer consumer is polling consumer is polling partition=1,offset=15,key=key2,value=message2 partition=1,offset=16,key=key6,value=message6 partition=1,offset=17,key=key12,value=message12 consumer is polling partition=0,offset=25,key=key1,value=message1 partition=0,offset=26,key=key3,value=message3 partition=0,offset=27,key=key10,value=message10 partition=0,offset=28,key=key15,value=message15 partition=0,offset=29,key=key19,value=message19 partition=3,offset=20,key=key5,value=message5 partition=3,offset=21,key=key11,value=message11 partition=3,offset=22,key=key14,value=message14 partition=3,offset=23,key=key18,value=message18 partition=2,offset=15,key=key7,value=message7 partition=2,offset=16,key=key8,value=message8 partition=2,offset=17,key=key16,value=message16 partition=4,offset=25,key=key4,value=message4 partition=4,offset=26,key=key9,value=message9 partition=4,offset=27,key=key13,value=message13 partition=4,offset=28,key=key17,value=message17 partition=4,offset=29,key=key20,value=message20 consumer is polling注意观察上图的数据:consumer消费数据,只保证同一个分区内的数据是有序的。当1个consumer去消费不同分区的数据时,分区之间的message不能保证顺序。