超详细的Kafka教程-从部署到开发到原理都有讲解

tech2022-09-14  120

在说Kafka之前,假设你有一定的消息队列的知识。知道消息队列的模式(点对点模式,发布/订阅模式),也知道消息队列的优点,如果不知道没关系,去百度或者Google搜索都有相关详细的资料。那么我们接下来说说Kafka。

为什么选择Kafka

消息中间件有很多。比如ActiveMQ,RabbitMQ,RocketMQ,Kafka。那你在选型的时候一般考虑哪些因素呢?我们来比较下这几个中间件的特点。

特性ActiveMQRabbitMQRocketMQKafka单机吞吐量万级,吞吐量比RocketMQ和Kafka要低了一个数量级万级,吞吐量同ActiveMQ10万级,可以支撑高吞吐量10万级别,高吞吐量。 适合日志采集,实时计算等场景topic数量对吞吐量的影响  topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topictopic从几十个到几百个的时候,吞吐量会「大幅度下降」 所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源时效性ms级微秒级,这是rabbitmq的一大特点,延迟是最低的ms级延迟在ms级以内可用性高,基于主从架构实现高可用性同ActiveMQ非常高,分布式架构非常高,同样也是分布式式消息可靠性有较低的概率丢失数据 经过参数优化配置,可以做到0丢失同RocketMQ一样也可以做到消息零丢失功能支持MQ领域的功能极其完备基于erlang开发,所以并发能力很强,性能极其好,延时很低MQ功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准

优劣总结

「ActiveMQ」

优点:非常成熟,功能强大,在业内大量的公司以及项目中都有应用

缺点:偶尔会有较低概率丢失消息。而且现在社区以及国内应用都越来越少,官方社区现在对    ActiveMQ 5.x维护越来越少,几个月才发布一个版本。较少在大规模吞吐的场景中使用。

「RabbitMQ」

优点:erlang语言开发,性能极其好,延时很低。吞吐量到万级,MQ功能比较完备。而且开源提供的管理界面非常棒,用起来很好用。社区相对比较活跃,几乎每个月都发布几个版本分。在国内一些互联网公司近几年用rabbitmq也比较多一些。

缺点:RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。

「RocketMQ」

优点:接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障。日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都不错的,还可以支撑大规模的topic数量,支持复杂MQ业务场景。

缺点:社区活跃度相对较为一般,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险。

「Kafka」

优点:就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量。

缺点:有可能消息重复消费。对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。

从上面的总结我们知道,Kafka可以用于较简单的消息队列(如果对你来说足够使用)。并且较要求较高的吞吐,那么Kafka是你最合适的选择。

什么是Kafka

Kafka本质还是一个存储容器,最初由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。

Kafka组成架构

从上面的架构图我们获得几个词:

Producer :消息生产者,就是向kafka broker发消息的客户端;

Consumer :消息消费者,向kafka broker取消息的客户端;

Topic :可以理解为一个队列;

Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;

Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;

Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;

Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

Kafka在Linux上的操作

你可能想,我把Kafka安装在Windows下不就完事了吗,为什么还要特意在Linux下面操作呢。但实际生产Kafka等中间件肯定是部署在Linux上面的,作为开发的我们可能也很少接触怎么部署,但是学习一下总归是有好处的。

下载

下载地址:http://kafka.apache.org/downloads

我们选取这个下载

单机部署

把下载的压缩包拷贝到Linux上,解压:

修改config/server.properties

修改Zookeeper的配置。

启动Kafka

注意:如果配置的是单独的Zookeeper,在启动Kafka之前需要启动Zookeeper。如果你有使用docker的经验,你可以使用docker-compose快速搭建一个zk集群。

发现有了kafka进程

端口为9092

集群部署

如果需要部署Kafka集群,我们需要设置多个Broker。

 > cp server.properties config/server.properties config/server-1.properties  > cp server.properties config/server.properties config/server-2.properties

编辑配置文件

 config/server-1.properties:   broker.id=1   listeners=PLAINTEXT://:9093   log.dir=/tmp/kafka-logs-1  config/server-2.properties:   broker.id=2   listeners=PLAINTEXT://:9094   log.dir=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。接下来我们只需要启动两个新的节点:

 > bin/kafka-server-start.sh config/server-1.properties &  ...  > bin/kafka-server-start.sh config/server-2.properties &  ...

现在创建一个副本为3的新topic:my-lvshen-topic

 > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-lvshen-topic

运行命令describe topics查看集群中的topic信息

 > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-lvshen-topic  Topic:my-lvshen-topic  PartitionCount:1  ReplicationFactor:3 Configs:  Topic: my-lvshen-topic Partition: 0  Leader: 1  Replicas: 1,2,0 Isr: 1,2,0

以下是对输出信息的解释:第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。Leader是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。

Replicas是复制分区日志的节点列表,不管这些节点是Leader还是仅仅活着。

isr是一组「同步」Replicas,是Replicas列表的子集,它活着并被指到Leader。

命令操作

创建Topic

在安装目录下输入命令

 bin/kafka-topics.sh --zookeeper 192.168.42.128:2181/kafka --create --topic LVSHEN-TOPIC --partitions 1 --replication-factor 1

创建了一个topic:「LVSHEN_TOPIC」。

查看Topic信息

  bin/kafka-topics.sh --zookeeper 192.168.42.128:2181/kafka --describe --topic LVSHEN-TOPIC

生产者生产数据

  bin/kafka-console-producer.sh --broker-list 192.168.42.128:9092 --topic LVSHEN-TOPIC

消费者接收数据

  bin/kafka-console-consumer.sh --bootstrap-server 192.168.42.128:9092 --topic 'LVSHEN-TOPIC'

 

Kafka Tool

如果觉得用命令查看太过麻烦,我们可以用工具查看(前提是你的生产环境和你的本地已经打通)。这里推荐一个工具「Kafka Tool」。

如图,左边会显示Brokers,Topics,Consumers,右边会显示相关的具体信息。

SpringBoot 默认方式开发Kafka Demo

这里我是采用SpringBoot开发,接下来写一个Java的demo。

Maven导入

 <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->  <dependency>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>      <version>2.4.3.RELEASE</version>  </dependency>

配置文件

 ## kafka ##  spring.kafka.bootstrap-servers=192.168.42.128:9092  spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer  spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer  spring.kafka.consumer.group-id=test  spring.kafka.consumer.enable-auto-commit=true  spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer    #定义Topic  spring.kafka.topic=lvshen_demo_test    spring.kafka.listener.missing-topics-fatal=false

生产者类

 @Component  @Slf4j  public class KafkaProducer {        @Autowired      private KafkaTemplate kafkaTemplate;        @Value("${spring.kafka.topic}")      private String topic;        /**       * 发送kafka消息       *       * @param jsonString       */      public void send(String jsonString) {          ListenableFuture future = kafkaTemplate.send(topic, jsonString);          future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));      }    }

消费者类

 @Component  @Slf4j  public class KafkaConsumer {      @KafkaListener(topics = "${spring.kafka.topic}")      public void listen(ConsumerRecord<?, ?> record) {          log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());      }  }

测试

 @Test  public void testDemo() throws InterruptedException {     log.info("start send");     kafkaProducer.send("I am Lvshen");     log.info("end send");     // 休眠10秒,为了使监听器有足够的时间监听到topic的数据     Thread.sleep(10);  }

如上图,控制台消费接收到了数据:

 c.l.d.k.kafka.consumer.KafkaConsumer     : topic=lvshen_demo_test, offset=1, message=I am Lvshen

Kafka Tool也显示接收到了消息:

 

自定义Kafka demo开发

假如你不想使用application.properties里面kafka的配置,我们可以采用第二种开发方法。

定义配置文件

config/kafka-config.properties

 #consumer  kafka.bootstrapServers=192.168.42.128:9092  kafka.groupId=bootKafka  kafka.enableAutoCommit=true  kafka.autoCommitIntervalMs=100  kafka.sessionTimeoutMs=15000    #producer  kafka.retries=1  kafka.batchSize=16384  kafka.lingerMs=1  kafka.bufferMemory=1024000

配置类

 @Component  @ConfigurationProperties(prefix="kafka")  @PropertySource(value = {"classpath:config/kafka-config.properties"}, encoding = "utf-8")  @Getter  @Setter  @AllArgsConstructor  @NoArgsConstructor  public class KafkaConfigProperties {      private String bootstrapServers;      private String groupId;      private String enableAutoCommit;      private String autoCommitIntervalMs;      private String sessionTimeoutMs;      private String retries;      private String batchSize;      private String lingerMs;      private String bufferMemory;    }    //文件配置类  @Component("kafkaConfigurations")  @EnableKafka  public class KafkaConfiguration {      @Autowired      private KafkaConfigProperties kafkaConfigProperties;      /**       * ConcurrentKafkaListenerContainerFactory为创建Kafka监听器的工程类,这里只配置了消费者       */      @Bean      public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {          ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();          factory.setConsumerFactory(consumerFactory());          factory.getContainerProperties().setPollTimeout(1500);          factory.setMissingTopicsFatal(false);          return factory;      }        /**       * 根据consumerProps填写的参数创建消费者工厂       */      @Bean      public ConsumerFactory<Integer, String> consumerFactory() {          return new DefaultKafkaConsumerFactory<>(consumerProps());      }        /**       * 根据senderProps填写的参数创建生产者工厂       */      @Bean      public ProducerFactory<Integer, String> producerFactory() {          return new DefaultKafkaProducerFactory<>(senderProps());      }        /**       * kafkaTemplate实现了Kafka发送接收等功能       */      @Bean("kafkaTemplates")      public KafkaTemplate<Integer, String> kafkaTemplate() {          KafkaTemplate template = new KafkaTemplate<>(producerFactory());          return template;      }        /**       * 消费者配置参数       */      private Map<String, Object> consumerProps() {          Map<String, Object> props = new HashMap<>();          // 连接地址          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers());          // GroupID          props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigProperties.getGroupId());          // 是否自动提交          props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);          // 自动提交的频率          props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfigProperties.getAutoCommitIntervalMs());          // Session超时设置          props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getSessionTimeoutMs());          // 键的反序列化方式          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);          // 值的反序列化方式          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);          return props;      }        /**       * 生产者配置       */      private Map<String, Object> senderProps() {          Map<String, Object> props = new HashMap<>();          // 连接地址          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers());          // 重试,0为不启用重试机制          props.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries());          // 控制批处理大小,单位为字节          props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaConfigProperties.getBatchSize());          // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量          props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaConfigProperties.getLingerMs());          // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录          props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaConfigProperties.getBufferMemory());          // 键的序列化方式          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);          // 值的序列化方式          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);          return props;      }  }

获取监听

 @Component  @Slf4j  public class DemoListener {      /**       * 声明consumerID为demo,监听topicName为topic.quick.demo的Topic       */      @KafkaListener(id = "demo", topics = "topic.quick.demo")      public void listen(String msgData) {          log.info("demo receive : "+msgData);      }    }

测试

 @Test      public void testDemoDepth() throws InterruptedException {          log.info("start send");          kafkaTemplate.send("topic.quick.demo", "this is a test for depth kafka");          log.info("end send");          // 休眠10秒,为了使监听器有足够的时间监听到topic的数据          Thread.sleep(500000);      }

Listener监听到kafka里面的数据。

Kafka存储机制

经过上面的描述,我们发现「Partition」很重要。其实「Partition」还可以细分为「Segment」。至于什么是「Segment」,下面会有详细说明。

Partition中文件的存储方式:

每个partion(目录)相当于一个巨型文件被平均分配到多个大小相segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。(默认情况下每个文件大小为1G)。

每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

好了以上就是关于Kafka的简短的介绍了,如果想要深入学习,可以去官网多多了解相关知识。

Kafka为什么性能如此优越

读写快

kafka会将数据顺序写入磁盘,我们用的磁盘大部分用的是机械磁盘。机械结构的银盘,寻址是最耗时的。所以硬盘随机I/O是很耗性能的,如果是顺序I/O,那么性能会有很大的改善。

MMFile

Kafka的数据并不是实时的写入磁盘(「Memory Mapped Files」),它充分利用了现代操作系统「分页存储」来提高I/O效率。操作系统会选择适当的时机将数据写入硬盘。但这样也会不可靠,写到「mmap」中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。

Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到「mmap」之后就立即flush然后再返回Producer叫 同步 (sync);写入「mmap」之后立即返回Producer不调用flush叫 异步 (async)。

零拷贝

消费者向broker索要消息时,「kafka」使用 零拷贝(zero-copy) ,建立一个磁盘空间和内存的直接映射,数据不再复制到“用户态缓冲区”,直接复制到socket缓冲区。

一般的读写是这样的,会有用户态和内核态的切换,这个切换也是比较耗时的。

如果采用零拷贝,不会经过用户态。

关于零拷贝的详细描述,可以看看我的另一篇文章:【使用了零拷贝技术的Kafka,当然很快】。

往期推荐

我写出这样干净的代码,老板直夸我

云南丽江旅游攻略

使用ThreadLocal怕内存泄漏?

Java进阶之路思维导图

程序员必看书籍推荐

3万字的Java后端面试总结(附PDF)

扫码二维码,获取更多精彩。或微信搜Lvshen_9,可后台回复获取资料

1.回复"java" 获取java电子书; 2.回复"python"获取python电子书; 3.回复"算法"获取算法电子书; 4.回复"大数据"获取大数据电子书; 5.回复"spring"获取SpringBoot的学习视频。 6.回复"面试"获取一线大厂面试资料 7.回复"进阶之路"获取Java进阶之路的思维导图 8.回复"手册"获取阿里巴巴Java开发手册(嵩山终极版) 9.回复"总结"获取Java后端面试经验总结PDF版 10.回复"Redis"获取Redis命令手册,和Redis专项面试习题(PDF) 11.回复"并发导图"获取Java并发编程思维导图(xmind终极版)

另:点击【我的福利】有更多惊喜哦。

最新回复(0)