手记

求求你大蕉别学了之 Flink No.127

求求你大蕉别学了。我偏不。

Flink ,为纯粹的流计算为生的一个大数据项目,玩一波先。跟 Spark 有什么区别呢?其实就一个区别,Spark 永远是批量处理,Flink 可以批量也可以实时流。啥意思呢?就是 Spark 没有一批就不处理就存着,永远只能准实时,而 Flink 拿到就处理拿到就处理,跟家里自来水似的,来多少处理多少。

废话不多说,先上手再说,什么原理什么的,后边再看,属于重要但不紧急的事情。我先当你装了 maven 和 IDEA 了,先用maven 初始化一下项目,试试看这个没有毒,是官网上的初始化方法。

 mvn archetype:generate \

    -DarchetypeGroupId=org.apache.flink \

    -DarchetypeArtifactId=flink-quickstart-java \

    -DarchetypeVersion=1.7.0 \

    -DgroupId=wiki-edits \

    -DartifactId=wiki-edits \

    -Dversion=0.1 \

    -Dpackage=wikiedits \

    -DinteractiveMode=false

下边介绍几种学完基本能吹牛逼的用法。

首先命令行先初始化一个 scoket 输入源,下边所有的步骤都用得到,对就这样开。

nc -lk 9999  

招式1 : 纯流式处理

val text = env.socketTextStream("localhost",9999);
val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
          .map( (_,1))

count.print();
System.out.println(env.getExecutionPlan(1));
env.execute("Flink Streaming Scala API Skeleton")

用过Spark 的同学应该都很清楚, flatMap 就是把每一个值变成一个列表然后所有的小列表打平成一个大的列表,filter 就是过滤出符合条件的值们。所以这坨代码的大概意思就是把输入的字符串,先按空格切割一下,然后输出掉,这里其中的 count 是 Flink 的其中一种 sink 。当然你也可以自己定义自己的 sink ,譬如这样。 

count.addSink(new SinkFunction[(String, Int)] {
  // DO SOME THING
})

ps: sink 是 Flink 的输出叫法,一次对外的输出叫一次 sink,sink 的方式有很多种,比如写文件,写数据库,调用其他 rpc 接口等等。

招式2 : 每隔三个进行进行一次处理

val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
  .map( (_,1))
  .countWindowAll(3L)
  .aggregate(new MyAggregate())


这就是传说中的小量批处理啦,Spark 就无法原生支持这种吧?每三个作为一批进行批量处理。

招式3 : 每隔三个对过去十个进行一次处理

val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
  .map( (_,1))
  .countWindowAll(10L,3L)
  .aggregate(new MyAggregate())
这个就更加棒棒了,我们经常都会有这样的需求,每接收到3笔订单统计过去10笔订单的订单总额,这也是传说中的一个数据在多个窗口里统计。这个有什么卵用呢?

招式4 : 每隔三秒进行进行一次处理

val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
  .map( (_,1))
    .timeWindowAll(Time.seconds(3))
  .aggregate(new MyAggregate())

这种小批量就是每隔一定的时间窗口进行一次数据处理,这就不多说了,跟没什么特殊了。大概就是类似金拱门工作人员,每隔三秒钟看看有多少个麦包包,一起拿起来打包这样 (卧槽好饿.(๑>؂<๑)۶好吃)。

招式5 : 每隔三秒对过去十秒的数据进行进行一次处理

val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
  .map( (_,1))
    .timeWindowAll(Time.seconds(10),Time.seconds(3))
  .aggregate(new MyAggregate())

这就牛逼了,老板每天都会有这样的需求,你给我看看过去十秒咱赚了多少钱?每隔三秒钟计算一次。这种需求你还要存储过去十秒的数据,其实是比较困难的,在以前那种没有状态的流计算里边。

大蕉 : 1、2、3 老板咱赚了一栋楼

老板 : 好好干,明年哥给你娶个嫂子。

大蕉 : zzz...

Flink 常规用法就是这么简单,自己玩玩看吧。那么上边那个myAggregate 是啥玩意呢?说实话我找了半天没找到 Flink 中类似Spark 的reduceByKey,很生气,自己实现了一个统计数字的版本,丢一波吧,自己随意看看,大概就是实现一个 aggregate 的方法。其中需要实现四个方法。

1、创建一个收集器

2、定义怎么merge 两个收集器

3、定义怎么获取收集器的结果

4、定义一个元素怎么加入到收集器中。

这样四个步骤都实现完,就实现了reduce,至于怎么reduceByKey,自己想想然后看看下边的步骤,相信你能看懂的,看不懂留言。。

关于AggregateFunction 的接口定义是长这样的,有三个泛型,分别是输入元素类型 IN(input),收集器类型 ACC(accumulator),返回结果的类型 OUT(output)。

public interface AggregateFunction<IN, ACC, OUT> {
  ACC createAccumulator();
  ACC add(IN value, ACC accumulator);
  OUT getResult(ACC accumulator);
  ACC merge(ACC a, ACC b);
}
class MyAggregate extends AggregateFunction[(String, Int), Map[String, Int], Map[String, Int]] {

  var map = new HashMap[String, Int];
  override def createAccumulator(): Map[String, Int] = {
    return map;
  }

  override def merge(a: Map[String, Int], b: Map[String, Int]): Map[String, Int] = {
/**
  * 把两个Map里边的值拿出来,key如果一样的把他们的value加到一起
  */
val keySet1 = a.keySet;
    val keySet2 = b.keySet;
    val keySetAll = keySet1.++(keySet2);
    var result = new HashMap[String, Int];
    keySetAll.foreach(key => {
      val aCounting = a.get(key).getOrElse(0);
      val bCounting = b.get(key).getOrElse(0);
      val countingAll = (key, aCounting + bCounting);
      result = result.+(countingAll);
    });
    return result;
  }

  override def getResult(accumulator: Map[String, Int]): Map[String, Int] = {
    return accumulator;
  }

  override def add(value: (String, Int), accumulator: Map[String, Int]): Map[String, Int] = {
/**
  * 如果收集器中没有,那么把自己加进去.如果收集器中有,把自己的value和收集器中的value相加一下然后放回去
  */
    val key = value._1;
    val counting = value._2;
    if (!accumulator.contains(key)) {
      return accumulator.+(value);
    }

    val currentCounting = accumulator.get(key).getOrElse(0);
    val tempValue = (key, counting + currentCounting);

    return accumulator.+(tempValue);

  }
}

最近买了极客时间的 10x 程序员工作法,感觉还是挺不错的,看一看不亏。

有一段时间,网上流传着一个帖子,亚马逊 CTO 介绍亚马逊是如何开发一项产品的,简单来说,他们采用向后工作的方法,开发一项产品的顺序为:

写新闻稿 。
写 FAQ 。
写用户文档 。
写代码 。

别整天一上班就写代码。据说代码越多阅读量越低,准备创造历史新低,再见 ~ 


0人推荐
随时随地看视频
慕课网APP