http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html http://spark.apache.org/docs/2.1.1/streaming-kafka-integration.html
http://spark.apache.org/docs/2.1.1/streaming-kafka-0-8-integration.html
1.用法及说明 在工程中需要引入 Maven 依赖 spark-streaming-kafka_2.11来使用它。 包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream。 两个核心类:KafkaUtils、KafkaCluster
2.导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency> 3.代码实现 import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object Kafka1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount2") val ssc = new StreamingContext(conf, Seconds(3)) val params = Map[String, String]( "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", "group.id" -> "1602" ) KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, params, Set("first1602") ) .flatMap{ case (_,v) => v.split("\\W+") } .map((_,1)) .reduceByKey(_+_) .print() ssc.start() ssc.awaitTermination() } } 4.测试kafka是否打通 bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first1602 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first1602 5.实现代码缺点:数据会丢失
1.用法及说明 在工程中需要引入 Maven 依赖 spark-streaming-kafka_2.11来使用它。 包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream。 两个核心类:KafkaUtils、KafkaCluster
2.导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency> 3.思考一下spark streaming关闭了 kafka发数据,再次打开spark streaming,怎样才能接受到数据 代码实现 import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object Kafka2 { def createSSC()={ val conf = new SparkConf().setMaster("local[*]").setAppName("kafka2") val ssc = new StreamingContext(conf, Seconds(3)) //把offset的跟踪在checkpoint中 ssc.checkpoint("ck1") val params = Map[String, String]( "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", "group.id" -> "1602" ) KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, params, Set("first1602") ) .flatMap{ case (_,v) => v.split("\\W+") } .map((_,1)) .reduceByKey(_+_) .print() ssc } def main(args: Array[String]): Unit = { /* 从checkpoint中恢复一个streamingContext, 如果checkpoint不存在,则调用后面的函数去创建一个StreamingContext */ val ssc = StreamingContext .getActiveOrCreate("ck1", createSSC) ssc.start() ssc.awaitTermination() } }缺点:小文件太多
1.用法及说明 在工程中需要引入 Maven 依赖 spark-streaming-kafka_2.11来使用它。 包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream。 两个核心类:KafkaUtils、KafkaCluster
2.导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency> 3.从kafka消费数据2中产生大量小文件,怎么样解决? 解决方案:启动实时项目的实时读取偏移量+保存偏移量 4.代码实现 import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaCluster.Err import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} object Kafka3 { val groupId = "1602" val params = Map[String, String]( "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", "group.id" -> "1602") val topics = Set("first1602") //KafkaUtils KafkaCluster val cluster = new KafkaCluster(params) def readOffsets()={ //最终返回的map var resultMap = Map[TopicAndPartition, Long]() val topicAndPartitionSetEither: Either[Err, Set[TopicAndPartition]] = cluster.getPartitions(topics) topicAndPartitionSetEither match { //2.获取topic和分区的信息 case Right(topicAndPartitionSet) => //3.获取到分区信息和其他的offset val topicAndPartitionToLongEither: Either[Err, Map[TopicAndPartition, Long]] = cluster.getConsumerOffsets(groupId, topicAndPartitionSet) topicAndPartitionToLongEither match { //没有每个topic的每个分区都已经存储过偏移量,表示曾经消费过,而且也维护过偏移量 case Right(map)=> resultMap ++= map //表示这个topic的这个分区是第一次消费 case _ => topicAndPartitionSet.foreach(topicAndPartition => { resultMap += topicAndPartition -> 1L }) } case _ => //表示不存在任何topic } resultMap } def saveOffsets(stream: InputDStream[String]) = { //保存offset一定从kafka消费到的直接那个stream保存 //每个批次执行一次传递过去的函数 stream.foreachRDD(rdd =>{ var map = Map[TopicAndPartition,Long]() //如果这个rdd是来自kafka,可以强转成HasOffsetRanges //这类型就包含了,这次消费的offsets的信息 val hasOffsetRanges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges] //如果有两个topic 三个分区 那么就有6个OffsetRange //所有分区的偏移量 val ranges: Array[OffsetRange] = hasOffsetRanges.offsetRanges ranges.foreach(OffsetRange=>{ val key = OffsetRange.topicAndPartition() val value = OffsetRange.untilOffset map += key -> value }) cluster.setConsumerOffsets(groupId,map) }) } def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("kafka2") val ssc = new StreamingContext(conf, Seconds(3)) val sourceStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ssc, params, readOffsets(), (handler: MessageAndMetadata[String, String]) => handler.message() // 从kafka读到数据的value ) sourceStream .flatMap(_.split(("\\W+"))) .map((_,1)) .reduceByKey(_+_) .print(1000) saveOffsets(sourceStream) ssc.start() ssc.awaitTermination() } }