Spark Streaming输出至MySQL

tech2025-06-04  12

一、环境

系统:Win10开发工具:scala-IDEA-IDE -项目管理工具:Maven 3.6.0 -JDK 1.8 -Scala 2.11.11 -Spark 2.1.1 -MySQL 5.7MySQL c3p0 0.9.1.2 连接池

二、案例

利用从Socket数据源获取单词,进行单词统计,并将统计结果输出至MySQL数据库中;

利用 c3p0 建立数据库连接池,每次连接MySQL都从连接池中获取,减少建立连接的开销;

对 c3p0 连接池进行封装,形成惰性单例模式,减少连接池创建的开销。

MySQL记录输入单词的个数

三、操作

1. 数据库

create database rdd; use rdd; CREATE TABLE IF NOT EXISTS wordfreq ( id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, word VARCHAR(20) NOT NULL COMMENT '单词', cnt INT NOT NULL COMMENT '单词计数', TIME TIMESTAMP NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '统计时间')

2. pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.etc</groupId> <artifactId>LearnSparkStreaming</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency><!-- MySQL 连接池依赖包 --> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId> <version>0.9.1.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <id>scala-compile-first</id> <goals> <goal>compile</goal> </goals> <configuration> <includes> <include>**/*.scala</include> </includes> </configuration> </execution> <execution> <id>scala-test-compile</id> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>

3. c3p0 工具类

package com.util import com.mchange.v2.c3p0.ComboPooledDataSource import java.sql.Connection class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true) //自动注册 try { //设置Mysql信息 cpds.setJdbcUrl("jdbc:mysql://localhost:3306/rdd?useUnicode=true&characterEncoding=UTF-8") cpds.setDriverClass("com.mysql.jdbc.Driver") cpds.setUser("root") cpds.setPassword("root") cpds.setMaxPoolSize(10) //连接池最大连接数 cpds.setMinPoolSize(2) //连接池最小连接数 cpds.setAcquireIncrement(2) //连接数每次递增数量 cpds.setMaxStatements(180) //连接池最大空闲时间 } catch { case e: Exception => e.printStackTrace() } //获取连接 def getConnection: Connection = { try { return cpds.getConnection() } catch { case e: Exception => e.printStackTrace() null } } } //惰性单例,真正计算时才初始化对象 object MysqlManager { @volatile private var mysqlPool: MysqlPool = _ def getMysqlPool: MysqlPool = { if (mysqlPool == null) { synchronized { if (mysqlPool == null) { mysqlPool = new MysqlPool } } } mysqlPool } }

4. 单词统计类

package com.wisdom import com.util.MysqlManager import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext, Time} object WordCountToMysql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) //在reduce聚合之后,输出结果至MySQL(输出操作) wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { //RDD为空时,无需再向下执行,否则在分区中还需要获取数据库连接(无用操作) if (!rdd.isEmpty()) { //一个分区执行一批SQL rdd.foreachPartition((partition: Iterator[(String, Int)]) => { //partition和record共同位于本地计算节点Worker,故无需序列化发送conn和statement //获取Mysql连接 val conn = MysqlManager.getMysqlPool.getConnection if (conn == null) { //做好判空,否则如果某个新加的节点还没设置访问数据库权限,会卡在数据库连接处,不断尝试连接 println("conn is null.") //在Worker节点的Executor中打印 } else { println("conn is not null.") //创建语句 val statement = conn.createStatement() try { conn.setAutoCommit(false) //设置手动提交 partition.foreach((record: (String, Int)) => { //创建sql,加入批处理 //注:字符串列的值需用单引号括起来,如:word列 val sql = "insert into wordfreq (word, cnt, time) value ('" + record._1 + "', " + record._2 + ", sysdate());" statement.addBatch(sql) //加入语句batch }) statement.executeBatch() //批量执行sql语句 conn.commit() //事务提交 } catch { case e: Exception => e.printStackTrace() } finally { statement.close() //关闭语句 conn.close() //关闭连接 } } }) } }) //打印从DStream中生成的RDD的前10个元素到控制台中 wordCounts.print() //print() 是输出操作,默认前10条数据 ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } }

四、运行:

启动netcat 工具后,输入单词 nc -lp 9999 启动 WordCountToMysql的main函数将统计的结果实时输出到mysql
最新回复(0)