flink实时流处理案例
添加依赖编写程序为程序添加参数host,port本机cmd开启相对应的端口(window得提前安装nc 命令)运行程序,在cmd上随便打,控制台可实时计算统计
添加依赖
<!-- https
://mvnrepository
.com
/artifact
/org
.apache
.flink
/flink
-streaming
-scala
-->
<dependency
>
<groupId
>org
.apache
.flink
</groupId
>
<artifactId
>flink
-streaming
-scala_2
.12</artifactId
>
<version
>1.11.1</version
>
</dependency
>
<!-- https
://mvnrepository
.com
/artifact
/org
.apache
.flink
/flink
-clients 从Flink1
.11开始,移除了flink
-streaming
-java对flink
-clients的依赖
-->
<dependency
>
<groupId
>org
.apache
.flink
</groupId
>
<artifactId
>flink
-clients_2
.12</artifactId
>
<version
>1.11.1</version
>
</dependency
>
<!-- https
://mvnrepository
.com
/artifact
/org
.apache
.flink
/flink
-scala
-->
<dependency
>
<groupId
>org
.apache
.flink
</groupId
>
<artifactId
>flink
-scala_2
.12</artifactId
>
<version
>1.11.1</version
>
</dependency
>
编写程序
object StreamFlinkWordCount
{
def
main(args
: Array
[String
]): Unit
= {
val environment
: StreamExecutionEnvironment
= StreamExecutionEnvironment
.getExecutionEnvironment
val tool
: ParameterTool
= ParameterTool
.fromArgs(args
)
val host
: String
= tool
.get("host")
val port
: Int
=tool
.getInt("port")
val datasteam
= environment
.socketTextStream(host
,port
)
datasteam
.flatMap(_
.split(" ")).map((_
,1)).keyBy(0).sum(1).print()
environment
.execute()
}
}
为程序添加参数host,port
本机cmd开启相对应的端口(window得提前安装nc 命令)
运行程序,在cmd上随便打,控制台可实时计算统计