很常见的一个问题:
目前市面上来说一般都是这个:
也就是我们常说的直连模式。
最大的好处莫过于我们可以很方便的来控制任务的并行度。具体是如何做到的呢?我们来分析一下源码就一目了然了。
直接定位到主角DirectKafkaInputDStream 我们看下层级结构:
他是InputDstream的一个具体实现。
我们先看下 compute这个方法:
1:返回的是Option[KafkaRDD[K, V]] 注意是个Option
2: def clamp 这个方法的作用是 // limits the maximum number of messages per partition 用来限制每个分区最多有多少条消息,返回的是个Map[TopicPartition, Long],这个clamp方法限定了这个对应的KafkaRDD的终止的offset,有了当前的offset(也就是其实的offset)和终止的offset然后也就确定了这个KafkaRDD的offset 的范围了。
3:如何来控制每个分区的数据的条数的呢?def maxMessagesPerPartition ,这里利用的是rateController 的背压机制来进行控制,kafka这里是做了一层的包装:其实调用的创建了一个RateEstimator,目前的rateEstimator仅实现了pid,这个RateEstimator也有很多的参数可以设置,不过没有更改过默认的设置,哈哈
Some(new DirectKafkaRateController(id, RateEstimator.create(ssc.conf, context.graph.batchDuration))) override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { val untilOffsets = clamp(latestOffsets()) val offsetRanges = untilOffsets.map { case (tp, uo) => val fo = currentOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo) } val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled", true) val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, useConsumerCache) // Report the record number and metadata of this batch interval to InputInfoTracker. val description = offsetRanges.filter { offsetRange => // Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets commitAll() Some(rdd) }4:class OffsetRange 这个狠关键的一个角色:它其实就是
val topic: String, val partition: Int, val fromOffset: Long, val untilOffset: Long 的一个封装 ,对应的topic 以及 paritition 以及对应的起始和终止的offset。5: new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, useConsumerCache) 这里传入的有个一getPreferredHosts,这个方法里会去遍历partition 对应的host等信息,找到对应的分区的各种信息,属于哪个topic ,leader 所在的host等信息,这些信息是封装在了 TopicPartition
6:到这里已经生成了KafkaRDD了,关于分区的信息其实都已经被包装进去了。我们来看下,KafkaRDD 中的 def getPartitions方法。
其实就是new了若干个KafkaRDDPartition,每个分区都会对应的生成一个。所以说RDD的分区数目和kafka的分区都是一一对应的
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)