Kafka入门

tech2022-09-15  68

参考图书kafka权威指南

文章目录

Kafka介绍主题和分区 Maven依赖生产者简单的例子配置属性分区自定义分区自定义分区实现 消费者简单的例子配置属性从指定分区读数据提交偏移量使用commitSync()提交偏移量使用commitAsync()异步提交偏移量提交特定偏移量 从指定偏移量开始读取根据时间指定偏移量开始读取 序列化

Kafka介绍

kafka是一种消息队列,通过消息发布和订阅模式实现。为了方便,消息的发布在之后会称为消息生产者,消息订阅会称为消息消费者。

消费者订阅主题,生产者发布消息到主题下,消费者收到生产者发布的消息。

主题和分区

主题又称为Topic,每个主题下面有若干个分区。消费者订阅一个或多个分区,通过偏移量读取消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NFxUXZBK-1599092696304)(C:\Users\pxy\AppData\Roaming\Typora\typora-user-images\image-20200831144755572.png)]

Maven依赖

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.13</artifactId> <version>2.6.0</version> </dependency>

生产者

简单的例子

Properties props = new Properties(); props.put("bootstrap.servers","host:port"); //设置key和value序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); int i = 1; // 发送业务消息 // 读取文件 读取内存数据库 读socket端口 while (true) { Thread.sleep(1000); producer.send(new ProducerRecord<String, String>("sun", "key:" + i, "value:" + i)); System.out.println("key:" + i + " " + "value:" + i); i++; }

注:使用时将第二行的host:port改成kafka在的IP地址和端口号

使用produce.send来发送消息。ProducerRecord构造器属性(主题名,key,value), key、value相当于Map的键值对。

配置属性

上面的简单的例子中,只配置了key,value的序列化方法

1.ack:多少分区收到消息为发送成功。

​ ack=0:生产者写入就生成,无需要等待服务器的响应。

​ ack=1:集群的首领节点收到消息,生产者就会受到来自服务器的响应。

​ ack=all:所有节点收到消息时,生产者才会受到来自服务器的响应。

2.buffer.memory:缓冲区的大小。

​ max.block.ms:当消息发送速度大于发送到服务器的速度,会导致空间不足,这个时候调用send就会被阻塞或者抛出异常,这个参数为抛出异常前阻塞多 久。

3.compression.type:压缩算法,占用一部分CPU压缩发送的信息,降低网络带宽和提高性能。压缩算法:snappy、gzip和lz4,其中gzip压缩占用的CPU最大,但对网络带宽的占用和性能的提升也是最大的。

4.retries:消息发送失败重试次数,重试次数到了就抛出异常。

​ retry.backoff.ms:重试时间间隔,默认是100ms,建议测试一下服务器奔溃后恢复时间再设置间隔,避免还没回复就重试次数用尽。

5.batch.size:当消息需要发送给同一个分区时,会放在同一个批次里,按字节数计算,当这个批次占满后会发送。但并不一定需要全部满后才会发送,所以不会影响发送的延迟。设置太大会占用过多空间,设置太小会频繁发送增加不必要的开销。

6.linger.ms:发送延迟,当延迟到时或者发送批次被填满后发送消息。

7.client.id:该参数可以是任意字符串,服务器会用他来识别消息的来源,还可以用在日志和配额指标里。

8.max.in.flight.requests.per.connection:生产者在接受到服务器响应之前可以发送多少消息,值越大吞吐量和占用内存越大。当值为1时就是按照顺序写入。

9.timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms:

​ request.timeout.ms:生产者在发送消息是等待服务器返回响应时间。

​ metadata.fetch.timeout.ms:生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。

​ 要是超时会根据retries进行重试或抛出异常

​ timeous.ms:指定了broker等待同步副本返回消息确认的时间,预asks的配置相匹配,如果在指定时间内没有收到同步副本的确认,那么broker就会返回一 个错误。

10.max.request.size:控制生产者发送请求的大小。他可以指发送单个消息的最大值,也可以指单个请求里所有消息总的大小。

11.receive.buffer.bytes和send.buffer.bytes:指TCP socket接收和发送数据包的缓冲区大小。路过他们被设置为-1,就使用操作系统的默认值。如果生产者和消费者与broker处于不同的数据中介,那么可以适当增加这些值,因为跨数据中心的网络一般都比较高的延迟和比较低的带宽。

分区

通过前文简单的例子,知道构建ProducerRecord对象的时候需要三个参数,分别是Topic,key和value。Topic决定将消息发送到哪个主题,key决定将消息发送到哪个分区。一般在默认没有设置的情况下,Kafka会对Key进行散列存储。但有些时候并不能这样做,有些时候一个分区就需要存放一样东西的key,比如电话号码,这种时候就可以使用自定义分区。

自定义分区

首先使用

./kafka-topics.sh --bootstrap-server localhost:9092 --topic 主题名字 --describe

查看主题下有多少分区

如果只有一个分区那自定义分区就没什么意义,因为他都是发到一个分区中去的。这种时候就要手动修改分区数量

./kafka-topics.sh --bootstrap-server localhost:9092 --topic 主题名字 --alter --partitions 分区数量
自定义分区实现

自定义分区类,实现Partitioner类,重写partition方法。

当key为111时放到同一个,也就是最后一个分区中,其他的散列存储到其他分区

public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //等到topic中的分区信息 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int size = partitions.size(); //如果是111放到同一个分区中 if("111".equals(key)){ return size-1; } return ((String)key).substring(0, 3).hashCode() % (size - 1); } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }

修改生产者,进行测试

public void testPrdouct03() throws InterruptedException { String ss[]=new String[]{"123","111","101"}; Properties props = new Properties(); props.put("bootstrap.servers","58.247.129.42:9092"); //设置分区器 props.put("partitioner.class","org.example.config.MyPartitioner"); //设置key和value序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key序列化器 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value序列化器 KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 发送业务消息 for(String s:ss){ try { RecordMetadata sun = producer.send(new ProducerRecord<String, String>("sun", s, "value:" + s)).get(); System.out.println(s+" has been send to partition "+sun.partition()); } catch (ExecutionException e) { e.printStackTrace(); } } }

消费者

简单的例子

Properties props = new Properties(); props.put("bootstrap.servers","host:port"); props.put("group.id", "1111"); //定义消费组 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("sun"));//订阅主题 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }

注:使用时将第二行的host:port改成kafka在的IP地址和端口号

使用consumer.poll等待延迟获取,里面是时间。

配置属性

fetch.min.bytes:服务器收到消费者请求消息的请求,当消息字节数加起来大于这个值的时候才会返回消息。fetch.max.wait.ms:消息最长等待时间,比如设置成100ms,那么到100ms时就算没有达到最小字节数也会返回消息。max.partition.fetch.bytes:默认是1MB,当消费者在poll消息的时候,最大不会过这个值。session.timeout.ms:默认3s,消费者在认定死亡之前可以与服务器断开连接的时间。如果在这时间内,没有发送心跳给协调器,协调器就会触发再均衡,把他的分区分配给群组里的其他消费者。 heartbeat.interval.ms:像服务器发送心跳的时间,如果session.timeout.ms设置成3s,这个就要设置成1s比较合理。 auto.offset.reset:[latest,earliest],默认值为latest。指当分区偏移量失效时,默认是从最新的开始读取,而另一个是从起始位置开始读。enable.auto.commit:提交偏移量的方式,默认为true。 auto.commit.interval.ms:消费者自动提交偏移量的时间间隔,默认为5秒。 partition.assignment.strategy:分区分配给消费者的策略,Kafka自带的2种org.apache.kafka.clients.consumer.RangeAssignor和org.apache.kafka.clients.consumer.RoundRobinAssignor。分别为Range策略,指分配主题的连续空间给消费者;Round策略,分配平均的空间给消费者。也可以自定义分配方式。client.id:表示从客户端发过来的消息,通常被用在日志、度量指标和配额里。max.poll.records:单次调用poll方法能够返回的最大数量。receive.buffer.bytes和send.buffer.bytes:指TCP socket接收和发送数据包的缓冲区大小。路过他们被设置为-1,就使用操作系统的默认值。如果生产者和消费者与broker处于不同的数据中介,那么可以适当增加这些值,因为跨数据中心的网络一般都比较高的延迟和比较低的带宽。

从指定分区读数据

将subscribe用assign代替

//consumer.subscribe(Arrays.asList("sun")); consumer.assign(Arrays.asList(new TopicPartition("sun",1)));

提交偏移量

经过测试不知道是IDEA编译器的问题还是新版本的问题,提交的偏移量都是在程序再次启动的时候才从那个点开始重新读

大部分开发者提交偏移量的方法是将auto.commit.interval.ms的时间间隔缩短,这种方法可以减少偏移量丢失的可能性,同时在再均衡的时候减少重复数据的数量。但有些时候还是要手动提交偏移量的,而Kafka也提供了手动提交偏移量的方法。

使用commitSync()提交偏移量

while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for(ConsumerRecord<String,String> record:records){ System.out.printf("consumer02,offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } try { consumer.commitSync(); } catch (Exception e) { e.printStackTrace(); } }

在每次接收到数据后提交偏移量。

使用commitAsync()异步提交偏移量

手动提交有一个不足就是消费者在对提交偏移量,等待回复之前会一直阻塞,会降低吞吐量。Kafka也提供了异步提交请求的方法commitAsync();

while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for(ConsumerRecord<String,String> record:records){ System.out.printf("consumer02,offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitAsync(); }

同步提交会在提交成功之前或提交失败的时候一直重试,而异步提交不会,但这也是异步提交的问题所在,比如当异步提交偏移量2000的时候,可能因为网络问题没有提交到,而这时候已经偏移量3000,而之后2000才提交上去,这个时候会造成重复读取

之所以说到这个问题是因为异步提交也是支持回调的。

consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { } });

提交特定偏移量

HashMap<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<TopicPartition, OffsetAndMetadata>(); while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for(ConsumerRecord<String,String> record:records){ System.out.printf("consumer02,offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1,"no metadata")); } consumer.commitAsync(currentOffset); }

从指定偏移量开始读取

从头部和尾部开始读取可以使用seekToBegining和seekToEnd

要从指定的偏移量读取使用seek

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("sun")); Set<TopicPartition> assignment = new HashSet<>(); while(assignment.size()==0){ consumer.poll(Duration.ofSeconds(1)); assignment=consumer.assignment(); } consumer.seek(new TopicPartition("sun",0),800); while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(10)); for(ConsumerRecord<String,String> record:records){ System.out.printf("consumer02,offset = %d,partition=%s, key = %s, value = %s%n", record.offset(),record.partition(), record.key(), record.value()); } }

在每次设置分区偏移量之前,都先要获取一次分区,获取分区时不需要把时间设置的特别短,太短获取不到分区。

根据时间指定偏移量开始读取

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig()); kafkaConsumer.subscribe(Arrays.asList(topic)); Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { kafkaConsumer.poll(100L); assignment = kafkaConsumer.assignment(); } Map<TopicPartition, Long> map = new HashMap<>(); for (TopicPartition tp : assignment) { map.put(tp, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(map); for (TopicPartition topicPartition : offsets.keySet()) { OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition); if (offsetAndTimestamp != null) { kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset()); } } while (isRunning.get()) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000L); System.out.println("本次拉取的消息数量:" + consumerRecords.count()); System.out.println("消息集合是否为空:" + consumerRecords.isEmpty()); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println("消费到的消息key:" + consumerRecord.key() + ",value:" + consumerRecord.value() + ",offset:" + consumerRecord.offset()); } }

序列化

Kafka自带且常用的的序列化方式

org.apache.kafka.common.serialization.ByteArraySerializer org.apache.kafka.common.serialization.ByteArrayDeserializer org.apache.kafka.common.serialization.StringDeserializer org.apache.kafka.common.serialization.StringSerializer

kafka权威指南这本书推荐使用Avro进行序列化对象

最新回复(0)