一、环境
系统:Win10开发工具:scala-IDEA-IDE -项目管理工具:Maven 3.6.0 -JDK 1.8 -Scala 2.11.11 -Spark 2.1.1 -MySQL 5.7MySQL c3p0 0.9.1.2 连接池
二、案例
利用从Socket数据源获取单词,进行单词统计,并将统计结果输出至MySQL数据库中;
利用 c3p0 建立数据库连接池,每次连接MySQL都从连接池中获取,减少建立连接的开销;
对 c3p0 连接池进行封装,形成惰性单例模式,减少连接池创建的开销。
MySQL记录输入单词的个数
三、操作
1. 数据库
create database rdd
;
use rdd
;
CREATE TABLE IF NOT EXISTS wordfreq
(
id
BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
word
VARCHAR(20) NOT NULL COMMENT '单词',
cnt
INT NOT NULL COMMENT '单词计数',
TIME TIMESTAMP NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '统计时间')
2. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
</modelVersion>
<groupId>com.etc
</groupId>
<artifactId>LearnSparkStreaming
</artifactId>
<version>1.0-SNAPSHOT
</version>
<dependencies>
<dependency>
<groupId>org.scala-lang
</groupId>
<artifactId>scala-library
</artifactId>
<version>2.11.8
</version>
</dependency>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-core_2.11
</artifactId>
<version>2.1.1
</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop
</groupId>
<artifactId>hadoop-client
</artifactId>
<version>2.7.2
</version>
</dependency>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-sql_2.11
</artifactId>
<version>2.1.1
</version>
</dependency>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-streaming_2.11
</artifactId>
<version>2.1.1
</version>
</dependency>
<dependency>
<groupId>mysql
</groupId>
<artifactId>mysql-connector-java
</artifactId>
<version>5.1.38
</version>
</dependency>
<dependency>
<groupId>c3p0
</groupId>
<artifactId>c3p0
</artifactId>
<version>0.9.1.2
</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools
</groupId>
<artifactId>maven-scala-plugin
</artifactId>
<version>2.15.2
</version>
<executions>
<execution>
<id>scala-compile-first
</id>
<goals>
<goal>compile
</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala
</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile
</id>
<goals>
<goal>testCompile
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3. c3p0 工具类
package com
.util
import com
.mchange
.v2
.c3p0
.ComboPooledDataSource
import java
.sql
.Connection
class MysqlPool
extends Serializable
{
private val cpds
: ComboPooledDataSource
= new ComboPooledDataSource
(true)
try {
cpds
.setJdbcUrl
("jdbc:mysql://localhost:3306/rdd?useUnicode=true&characterEncoding=UTF-8")
cpds
.setDriverClass
("com.mysql.jdbc.Driver")
cpds
.setUser
("root")
cpds
.setPassword
("root")
cpds
.setMaxPoolSize
(10)
cpds
.setMinPoolSize
(2)
cpds
.setAcquireIncrement
(2)
cpds
.setMaxStatements
(180)
} catch {
case e
: Exception
=> e
.printStackTrace
()
}
def getConnection
: Connection
= {
try {
return cpds
.getConnection
()
} catch {
case e
: Exception
=> e
.printStackTrace
()
null
}
}
}
object MysqlManager
{
@volatile private var mysqlPool
: MysqlPool
= _
def getMysqlPool
: MysqlPool
= {
if (mysqlPool
== null) {
synchronized
{
if (mysqlPool
== null) {
mysqlPool
= new MysqlPool
}
}
}
mysqlPool
}
}
4. 单词统计类
package com
.wisdom
import com
.util
.MysqlManager
import org
.apache
.spark
.SparkConf
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.streaming
.{Seconds
, StreamingContext
, Time
}
object WordCountToMysql
{
def main
(args
: Array
[String]): Unit = {
val conf
= new SparkConf
().setMaster
("local[*]").setAppName
("NetworkWordCount")
val ssc
= new StreamingContext
(conf
, Seconds
(5))
val lines
= ssc
.socketTextStream
("localhost", 9999)
val words
= lines
.flatMap
(_
.split
(" "))
val pairs
= words
.map
(word
=> (word
, 1))
val wordCounts
= pairs
.reduceByKey
(_
+ _
)
wordCounts
.foreachRDD
((rdd
: RDD
[(String, Int)], time
: Time
) => {
if (!rdd
.isEmpty
()) {
rdd
.foreachPartition
((partition
: Iterator
[(String, Int)]) => {
val conn
= MysqlManager
.getMysqlPool
.getConnection
if (conn
== null) {
println
("conn is null.")
} else {
println
("conn is not null.")
val statement
= conn
.createStatement
()
try {
conn
.setAutoCommit
(false)
partition
.foreach
((record
: (String, Int)) => {
val sql
= "insert into wordfreq (word, cnt, time) value ('" + record
._1
+ "', " + record
._2
+ ", sysdate());"
statement
.addBatch
(sql
)
})
statement
.executeBatch
()
conn
.commit
()
} catch {
case e
: Exception
=> e
.printStackTrace
()
} finally {
statement
.close
()
conn
.close
()
}
}
})
}
})
wordCounts
.print
()
ssc
.start
()
ssc
.awaitTermination
()
}
}
四、运行:
启动netcat 工具后,输入单词
nc
-lp 9999
启动 WordCountToMysql的main函数将统计的结果实时输出到mysql