我是小蕉。
上一篇大家说没有干货,妈蛋回南天哪来的干货你告诉我!!!还好这几天天气还不错,干货来了。
首先祭上今天关键代码,要做的事情就是从Hive表中取得年龄数据,然后去重,统计每个年龄的人数。如果你能看到这里,我当你知道RDD,HDFS,还有scala是什么东东,不知道的看我上一篇或者上某搜索引擎去,我不管。
case class PERSON( val name:String, val age:String );object Some{ def main(args: Array[String]): Unit = { val conf:SparkConf = new SparkConf().setAppName("HelloWorld") val sc:SparkContext = new SparkContext(conf); val hc:HiveContext = new HiveContext(sc); val datas:DataFrame = hc.sql("SELECT NAME,AGE FROM PERSONS"); //记录可能重复,去个重先 val dataDistincted = datas.distinct(); //将行记录转换为对象方便操作 val persons:RDD[PERSON] = dataDistincted.map{case Row(name:String,age:String) => PERSON(name,age)}; //过滤年龄小于10的用户 val filtered10Person = persons.filter(x => x.age.toInt >= 10); //根据年龄对用户进行分组 val groupedByEdge = filtered10Person.groupBy(p => p.age) //打印出最终结果 groupedByEdge.collect().foreach(println);groupedByEdge.saveAsHadoopFile("/myHadoopPath")} }
大家跟我一起来,关于Spark集群的安装我就不介绍了大家自己上某搜索引擎去搜跟着做就可以了,今天主要介绍如何开始玩Spark。
一般我们的Spark程序会配合ozzie等定时调度工具来进行调度,从Hive库中读取数据然后通过数据处理来达到离线计算的功能。咱一行一行来。
case class PERSON( val name:String, val age:String );
这个没什么特殊的,case class就是定义了一个序列化的POJO类。
val conf:SparkConf = new SparkConf().setAppName("HelloWorld")
这个是Spark的一个配置类,用于配置所有Spark相关的初始化配置项。至于详细的大家上官网去看吧,配置蛮多的,都可以在这里配。上面是指定应用名为HelloWord。
val sc:SparkContext = new SparkContext(conf);
实例化一个SparkContext,这个是Spark的上下文,所有跟Spark交互的玩意都要跟它交互,其他什么其他的Context都是基于这个来进行的,而且一个应用里边只能有一个上下文,多了会报错,不信你试试。
val hc:HiveContext = new HiveContext(sc);
Spark实现了访问Hive库的API,这个是封装了大部分操作的Context,其实最有用也就一个,下面会说到,关于Hive大家别问我啊,就是基于HDFS的关系型数据库,关注的盆友要是多的话我后面专门开一次讲一讲这个东东。
val datas:DataFrame = hc.sql("SELECT NAME,AGE FROM PERSONS");
关键代码来了,敲黑板,这个是从Hive库中进行操作HQL并且把它们当成DataFrame来用,你问我DataFrame是什么,我来告诉李,就是自带Schema,能做各种类数据库操作的RDD,其他的跟RDD没什么区别。这里我们从PERSONS表中取得NAME,AGE两个字段。
val dataDistincted = datas.distinct();
好了,介绍今天第一个action算子,distinct,这个算子会比较整个数据集,然后进行去重,去重的方式就是看所有的字段是不是都一样一样的。
val persons:RDD[PERSON] = dataDistincted.map{case Row(name:String,age:String) => PERSON(name,age)};
这里是通过RDD的map转换操作,这个会并行便利RDD中每一个记录,然后转换成我们想要的类型,这里是将DataFrame中的Row数据,转换成我们定义的POJO以方面后面操作。不特殊,跟java里边的for遍历差不多,但是这个是并行的。
val filtered10Person = persons.filter(x => x.age.toInt >= 10);
好了,第二个转换操作filter,顾名思义就是过滤嘛,但是这个跟其他的过滤比较特殊,这个过滤是将filter里边的函数,条件为true的留下来,false的剔除。所以上边的操作就是将十岁及十岁以上的人留下来。
val groupedByEdge = filtered10Person.groupBy(p => p.age)
这个就比较特殊了,大家应该没见过,这个groupBy操作,也就是将整个数据集按照某种值进行分组。例子中按每个PERSON的age值进行分组,那么结果我们将会得到根据年龄分组的数据,也就是我们想要的分组功能了。至于说为什么不能分段统计,当然可以了,这个留给你们自己玩,你先做个转换呗。
groupedByEdge.collect().foreach(println);
打印出来,完事。啊哈?为什么要collect,因为RDD分布在集群中,而日志只能出现在Driver,你不collect没法打印啊。
groupedByEdge.saveAsHadoopFile("/myHadoopPath")
存到HDFS上,完事。
然后我不管你用什么方式打包一个名字叫bigjiao.jar的包出来,不懂得上某搜索引擎去。
在spark集群上提交命令:
spark-submit --master local[*] --class Some bigjiao.jar