参考图书kafka权威指南
kafka是一种消息队列,通过消息发布和订阅模式实现。为了方便,消息的发布在之后会称为消息生产者,消息订阅会称为消息消费者。
消费者订阅主题,生产者发布消息到主题下,消费者收到生产者发布的消息。
主题又称为Topic,每个主题下面有若干个分区。消费者订阅一个或多个分区,通过偏移量读取消息。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NFxUXZBK-1599092696304)(C:\Users\pxy\AppData\Roaming\Typora\typora-user-images\image-20200831144755572.png)]
注:使用时将第二行的host:port改成kafka在的IP地址和端口号
使用produce.send来发送消息。ProducerRecord构造器属性(主题名,key,value), key、value相当于Map的键值对。
上面的简单的例子中,只配置了key,value的序列化方法
1.ack:多少分区收到消息为发送成功。
ack=0:生产者写入就生成,无需要等待服务器的响应。
ack=1:集群的首领节点收到消息,生产者就会受到来自服务器的响应。
ack=all:所有节点收到消息时,生产者才会受到来自服务器的响应。
2.buffer.memory:缓冲区的大小。
max.block.ms:当消息发送速度大于发送到服务器的速度,会导致空间不足,这个时候调用send就会被阻塞或者抛出异常,这个参数为抛出异常前阻塞多 久。
3.compression.type:压缩算法,占用一部分CPU压缩发送的信息,降低网络带宽和提高性能。压缩算法:snappy、gzip和lz4,其中gzip压缩占用的CPU最大,但对网络带宽的占用和性能的提升也是最大的。
4.retries:消息发送失败重试次数,重试次数到了就抛出异常。
retry.backoff.ms:重试时间间隔,默认是100ms,建议测试一下服务器奔溃后恢复时间再设置间隔,避免还没回复就重试次数用尽。
5.batch.size:当消息需要发送给同一个分区时,会放在同一个批次里,按字节数计算,当这个批次占满后会发送。但并不一定需要全部满后才会发送,所以不会影响发送的延迟。设置太大会占用过多空间,设置太小会频繁发送增加不必要的开销。
6.linger.ms:发送延迟,当延迟到时或者发送批次被填满后发送消息。
7.client.id:该参数可以是任意字符串,服务器会用他来识别消息的来源,还可以用在日志和配额指标里。
8.max.in.flight.requests.per.connection:生产者在接受到服务器响应之前可以发送多少消息,值越大吞吐量和占用内存越大。当值为1时就是按照顺序写入。
9.timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms:
request.timeout.ms:生产者在发送消息是等待服务器返回响应时间。
metadata.fetch.timeout.ms:生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。
要是超时会根据retries进行重试或抛出异常
timeous.ms:指定了broker等待同步副本返回消息确认的时间,预asks的配置相匹配,如果在指定时间内没有收到同步副本的确认,那么broker就会返回一 个错误。
10.max.request.size:控制生产者发送请求的大小。他可以指发送单个消息的最大值,也可以指单个请求里所有消息总的大小。
11.receive.buffer.bytes和send.buffer.bytes:指TCP socket接收和发送数据包的缓冲区大小。路过他们被设置为-1,就使用操作系统的默认值。如果生产者和消费者与broker处于不同的数据中介,那么可以适当增加这些值,因为跨数据中心的网络一般都比较高的延迟和比较低的带宽。
通过前文简单的例子,知道构建ProducerRecord对象的时候需要三个参数,分别是Topic,key和value。Topic决定将消息发送到哪个主题,key决定将消息发送到哪个分区。一般在默认没有设置的情况下,Kafka会对Key进行散列存储。但有些时候并不能这样做,有些时候一个分区就需要存放一样东西的key,比如电话号码,这种时候就可以使用自定义分区。
首先使用
./kafka-topics.sh --bootstrap-server localhost:9092 --topic 主题名字 --describe查看主题下有多少分区
如果只有一个分区那自定义分区就没什么意义,因为他都是发到一个分区中去的。这种时候就要手动修改分区数量
./kafka-topics.sh --bootstrap-server localhost:9092 --topic 主题名字 --alter --partitions 分区数量自定义分区类,实现Partitioner类,重写partition方法。
当key为111时放到同一个,也就是最后一个分区中,其他的散列存储到其他分区
public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //等到topic中的分区信息 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int size = partitions.size(); //如果是111放到同一个分区中 if("111".equals(key)){ return size-1; } return ((String)key).substring(0, 3).hashCode() % (size - 1); } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }修改生产者,进行测试
public void testPrdouct03() throws InterruptedException { String ss[]=new String[]{"123","111","101"}; Properties props = new Properties(); props.put("bootstrap.servers","58.247.129.42:9092"); //设置分区器 props.put("partitioner.class","org.example.config.MyPartitioner"); //设置key和value序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key序列化器 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value序列化器 KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 发送业务消息 for(String s:ss){ try { RecordMetadata sun = producer.send(new ProducerRecord<String, String>("sun", s, "value:" + s)).get(); System.out.println(s+" has been send to partition "+sun.partition()); } catch (ExecutionException e) { e.printStackTrace(); } } }注:使用时将第二行的host:port改成kafka在的IP地址和端口号
使用consumer.poll等待延迟获取,里面是时间。
将subscribe用assign代替
//consumer.subscribe(Arrays.asList("sun")); consumer.assign(Arrays.asList(new TopicPartition("sun",1)));经过测试不知道是IDEA编译器的问题还是新版本的问题,提交的偏移量都是在程序再次启动的时候才从那个点开始重新读
大部分开发者提交偏移量的方法是将auto.commit.interval.ms的时间间隔缩短,这种方法可以减少偏移量丢失的可能性,同时在再均衡的时候减少重复数据的数量。但有些时候还是要手动提交偏移量的,而Kafka也提供了手动提交偏移量的方法。
在每次接收到数据后提交偏移量。
手动提交有一个不足就是消费者在对提交偏移量,等待回复之前会一直阻塞,会降低吞吐量。Kafka也提供了异步提交请求的方法commitAsync();
while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for(ConsumerRecord<String,String> record:records){ System.out.printf("consumer02,offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitAsync(); }同步提交会在提交成功之前或提交失败的时候一直重试,而异步提交不会,但这也是异步提交的问题所在,比如当异步提交偏移量2000的时候,可能因为网络问题没有提交到,而这时候已经偏移量3000,而之后2000才提交上去,这个时候会造成重复读取
之所以说到这个问题是因为异步提交也是支持回调的。
consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { } });从头部和尾部开始读取可以使用seekToBegining和seekToEnd
要从指定的偏移量读取使用seek
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("sun")); Set<TopicPartition> assignment = new HashSet<>(); while(assignment.size()==0){ consumer.poll(Duration.ofSeconds(1)); assignment=consumer.assignment(); } consumer.seek(new TopicPartition("sun",0),800); while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(10)); for(ConsumerRecord<String,String> record:records){ System.out.printf("consumer02,offset = %d,partition=%s, key = %s, value = %s%n", record.offset(),record.partition(), record.key(), record.value()); } }在每次设置分区偏移量之前,都先要获取一次分区,获取分区时不需要把时间设置的特别短,太短获取不到分区。
Kafka自带且常用的的序列化方式
org.apache.kafka.common.serialization.ByteArraySerializer org.apache.kafka.common.serialization.ByteArrayDeserializer org.apache.kafka.common.serialization.StringDeserializer org.apache.kafka.common.serialization.StringSerializerkafka权威指南这本书推荐使用Avro进行序列化对象