一flink 基础使用
此处主要介绍两部分基础内容案例,
使用wiki作为连接器,读取日志数据发送到kafka队列
读取socket流, 实现单词计数功能
代码提交jar包到远程服务器
1.1 使用wiki流 ,发送数据到kafka
首先是创建maven工程,maven pom文件如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>${flink.version}</version> </dependency>
其中flink-connector-wikiedits_2.11是用于操作wiki上数据使用的Wikipedia 连接器.
接下来是应用程序的说明,首先创建FLink 的SteamingExecutionEnvironment变量(如果是批处理任务的话则创建ExecutionEnvironment对象)它用于设置程序所需要的执行参数,读取创建资源文件。
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
通过env添加资源,用于读取wiki ipc 日志的资源
val edits:DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource())
相当于建立了一根桥梁, 打开flink与wiki的桥梁,
val keyedEdits:KeyedStream[WikipediaEditEvent, String] = edits.keyBy({ (event: WikipediaEditEvent) => { event.getUser } })
上面是获取一个wiki的流
val secondR = keyedEdits .timeWindow(Time.seconds(5)) .fold(WikiModel("","",0L,"","","",0))(getWikiEvent) .map(res => res.user) .addSink(new FlinkKafkaProducer010[String]("kason-pc:9092","wiki-result",new SimpleStringSchema()))
首先创建了一个model,WikiModel
case class WikiModel(user: String,title: String,time: Long, summary: String, diffUrl: String, channel: String, byteDiff: Int)
之后将getWikiEvent方法传入,为了获取数据封装成对象,来看看此基本方法
def getWikiEvent(data: WikiModel, event: WikipediaEditEvent): WikiModel = { val user: String = event.getUser val title: String = event.getTitle val time: Long = event.getTimestamp val summary: String = event.getSummary val diffUrl: String = event.getDiffUrl val channel: String = event.getChannel val byteDiff: Int = event.getByteDiff WikiModel(user,title,time,summary,diffUrl,channel,byteDiff) }
说明此方法只是为了讲wiki流的每一个新获得的数据转换成WikiModel对象
addSink(new FlinkKafkaProducer010[String]("kason-pc:9092","wiki-result",new SimpleStringSchema())这个是为了将数据发送到kafka里面, 此时会自动创建topic为wiki-result的kafka topic,最后通过env.execute()来执行应用程序,开启每隔5s钟一个窗口。
结果如下
image.png
1.2 socket流单词计数
其基本情况就是使用linux下的nc -l 9000在9000端口上发送数据, flink通过读取此端口的数据,处理10s间隔的窗口,对此窗口的数据进行单词计数(注意只对此窗口时间内的数据)
其ubuntu端的发送数据截屏如下
image.png
flink程序如下:
package quickstartimport java.sql.Dateimport java.text.SimpleDateFormatimport org.apache.flink.api.scala._import org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.streaming.api.windowing.time.Time object SocketWindowWordCount { def main(args: Array[String]) : Unit = { val simple = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") // the port to connect to val port: Int = try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'") return } } // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // get input data by connecting to the socket val text: DataStream[String] = env.socketTextStream("kason-pc", port, '\n') // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1, simple.format(new Date(System.currentTimeMillis()))) } .keyBy("word") .timeWindow(Time.seconds(10), Time.seconds(1)) .sum("count") // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1) env.execute("Socket Window WordCount") } // Data type for words with count case class WordWithCount(word: String, count: Long, time: String)}
此处获取的结果如下图:
image.png
这里分析一下输出的结果, 上面的代码是每隔1s处理10s socket发送过来的数据,所以开始时输入的zk zk ll 当1s时间到来时, 处理的数据只有zk zk ll所以此时输出应该是zk,2 ll,1, 之后又输入zk kk pp, 然后当1s到来时, 需要处理的数据就是zk zk ll zk ll pp了, 此时结果输出应该是zk 3 ll 2 pp 1, 然后又输入了zk ll kk 所以1s后处理的数据就变成了 zk zk ll zk ll pp zk ll kk 所以数据输出为zk 4 ll 3 pp 1 kk 1, 然后时间过到11s时, 窗口10s内的数据就变成了 zk ll pp zk ll kk,所以结果变成了zk 2 ll 2 pp 1 kk 1, 然后时间过到12s时, 窗口10s内就只剩下zk ll kk了, 此时结果输出就是zk 1 ll 1 kk1, 然后时间过到13时, 窗口10s内没有数据了,此时也就没有输出了。
此处需要介绍一下flink的这个时间窗口,timeWindow
所谓的TIme Window实际上就是根据时间对数据流进行分组的, 也可以认为是根据固定时间进行切割数据, timeWindow有两种类型: 翻滚时间窗口, 滑动时间窗口。
翻滚时间窗口:
翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。可以认为是数据不能跨窗口, 此时相当于只给timeWIndow传入一个时间参数。比如一个例子:我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window), 比如你调整本例子的timeWindow函数为timeWindow(Time.seconds(1))
此时通过nc -l 9000每隔一秒发送数据:
image.png
结果如下:
image.png
滑动时间窗口
对于某些应用,它们需要的窗口是不间断的,需要平滑地进行窗口聚合。比如,我们可以每30秒计算一次最近一分钟用户购买的商品总数。这种窗口我们称为滑动时间窗口(Sliding Time Window)。在滑窗中,一个元素可以对应多个窗口, 也就是数据可以跨窗口, 如本例
image.png
1.3 代码提交jar包到远程服务器
首先打包可执行程序为jar包, 本例子使用的maven
<build> <sourceDirectory>src/main/scala</sourceDirectory> <!--<testSourceDirectory>src/test/scala</testSourceDirectory>--> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>quickstart.SocketWindowWordCount</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
然后修改应用程序代码
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("kason-pc",6123,"/home/kason/workspace/BigdataComponents/target/BigdataComponents-1.0-SNAPSHOT.jar")
应用提交之后IDEA 显示
image.png
登录kason-pc:8081 查看RUNNING jOBS
image.png
根据JOB Name 来找到你提交的程序, 点进去
image.png
查看应用程序输出结果:
到log日志目录中去查看, 结果如下, 和本地调试结果一致:
日志结果.png
作者:kason_zhang
链接:https://www.jianshu.com/p/1ab7769f5837