手记

将代码从 spark 1.x 移植到 spark 2.x

1. SparkSession

sparkSession可以视为sqlContexthiveContext以及StreamingContext的结合体,这些Context的API都可以通过sparkSession使用。

创建SparkSession

val spark = SparkSession.builder
    .master("local[2]")
    .appName("spark session example")
    .getOrCreate()

使用enableHiveSupport就能够支持hive,相当于hiveContext

val spark = SparkSession.builder
    .master("local[2]")
    .appName("spark session example")
    .enableHiveSupport()
    .getOrCreate()

API操作,与之前的Context基本一致

//读取csv数据val df0 = spark.read
  .option("header","true")
  .csv("src/main/resources/test.csv")//读取parquet数据val df1 = spark.read.parquet("...")//读取json数据val df2 = spark.read.json("...")//sql查询val df3 = spark.sql("xxx")

Spark 2.0向后兼容,所以hiveContext以及sqlContext依旧可用,不过官方建议开发者开始使用SparkSession

2. DataSet,RDD,DataFrame

  • RDD

    类型安全,面向对象编程风格,序列化以及反序列化开销大。

  • DataFrame

    提供了查询优化器,分区剪枝,自动判断是否使用broadcast join等功能,对rdd进行了大量的优化。对spark了解不深的编程/分析人员非常友好。

    可以视为Row类型的Dataset (Dataset[Row]),非类型安全,不是面向对象编程风格。

  • DataSet

    继承了RDD和DataFrame的优点。数据以编码的二进制形式存储,将对象的schema映射为SparkSQL类型,不需要反序列化就可以进行shuffle等操作,每条记录存储的则是强类型值,类型安全,面向对象编程风格。

Dataset的创建

dataset可以从rdd,dataFrame转化,也可以从原始数据直接生成。

通过toDS方法创建

val ds1 = Seq("a","b").toDS()
ds1.show//+-----+//|value|//+-----+//|    a|//|    b|//+-----+

通过createDataSet创建

case class Person(name: String, age: Int)val data = Seq(Person("lsw", 23), Person("yyy", 22))
val ds2 = spark.createDataset(data)
ds2.show//+----+---+//|name|age|//+----+---+//| lsw| 23|//| yyy| 22|//+----+---+

DataSet与RDD使用上的区别

Dataset 结合了 rdd 和 DataFrame 上大多数的API,所以spark 1.x基于 rdd 或 DataFrame 的代码可以很容易的改写成spark 2.x版本

  1. 数据读取

    RDDs

    sparkContext.textFile("/path/to/data.txt")

    Datasets

    //返回 DataFrameval df = spark.read.text("/path/to/data.txt")//返回 DataSet[String]val ds1 = spark.read.textFile("/path/to/data.txt")//或者读取成DataFram再转化成Datasetval ds2 = spark.read.text("/path/to/data.txt").as[String]
  2. 常用API

    RDDs

    //flatMap,filterval lines = sc.textFile("/path/to/data.txt")
    val res = lines
      .flatMap(_.split(" "))
      .filter(_ != "")//reduceval rdd = sc.makeRDD(Seq(1, 2, 3, 4))
    rdd.reduce((a, b) => a + b)

    Datasets

    //flatMap,filterval lines = spark.read.textFile("/path/to/data.txt")
    val res = lines
      .flatMap(_.split(" "))
      .filter(_ != "")//reduceval ds = Seq(1, 2, 3, 4).toDs
    ds.reduce((a, b) => a + b)
  3. reduceByKey

    RDDs

    val reduceCountByRDD = wordsPair
      .reduceByKey(_+_)

    Datasets

    val reduceCountByDs = wordsPairDs
      .mapGroups((key,values) =>(key,values.length))
  4. RDD,DataFrame,Dataset的相互转化

    import spark.implicits._//Dataset转化为RDDval ds2rdd = ds.rdd//Dataset转为DataFrameval ds2df = ds.toDF//RDD转化为Datasetval rdd2ds = rdd.toDS//RDD转化为DataFrameval rdd2df = rdd.toDF//DataFrame转化为RDDval df2rdd = df.rdd//DataFrame转化为DataSetval df2ds = df.as[Type]
  5. wordCount

    data.txt

    hello world
    hello spark

    RDDs

    val rdd = sc.textFile("src/main/resources/data.txt")
    val wordCount = rdd
      .map(word => (word,1))
      .reduceByKey(_+_)

    Datasets

    import spark.implicits._
    val wordCount1 = lines
      .flatMap(r => r.split(" "))
      .groupByKey(r => r)
      .mapGroups((k, v) => (k, v.length))
    wordCount1.show//  +-----+--------+//  |value|count(1)| //  +-----+--------+//  |hello|       2|//  |spark|       1|//  |world|       1|//  +-----+--------+
      //也可以直接使用count函数val wordCount2 = lines
      .flatMap(r => r.split(" "))
      .groupByKey(v => v)
      .count()
    wordCount2.show//  +-----+---+//  |   _1| _2|//  +-----+---+//  |hello|  2|//  |spark|  1|//  |world|  1|//  +-----+---+

Dataset性能提升(来自官方)

这里写图片描述


这里写图片描述


这里写图片描述

3.Catalog

Spark 2.0中添加了标准的API(称为catalog)来访问Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据。

获取catalog

从SparkSession中获取catalog

val catalog = spark.catalog

查询临时表和元数据中的表

返回Dataset[Table]

catalog.listTable.show//  +----+--------+-----------+---------+-----------+//  |name|database|description|tableType|isTemporary|//  +----+--------+-----------+---------+-----------+//  |table|   null|      null|TEMPORARY|        true|//  |act | default|      null| EXTERNAL|       false|//  +----+--------+-----------+---------+-----------+

创建临时表

使用createTempViewcreateOrReplaceTempView取代registerTempTable

例如

df.createTempView("table")
df.createOrReplaceTempView("table")
  • createTempView

    创建临时表,如果已存在同名表则报错。

  • createOrReplaceTempView

    创建临时表,如果存在则进行替换,与老版本的registerTempTable功能相同。

销毁临时表

使用dropTempView取代dropTempTable,销毁临时表的同事会清除缓存的数据。

spark.dropTempView("table")

缓存表

对数据进行缓存

//缓存表有两种方式df.cache
catalog.cacheTable("table")//判断数据是否缓存catalog.isCached("table")

catalog相较于之前的API,对metadata的操作更加的简单,直观。



作者:breeze_lsw
链接:https://www.jianshu.com/p/fb9722809165


0人推荐
随时随地看视频
慕课网APP