手记

12.spark sql之读写数据

简介

  Spark SQL支持多种结构化数据源,轻松从各种数据源中读取Row对象。这些数据源包括Parquet、JSON、Hive表及关系型数据库等。

  当只使用一部分字段时,Spark SQL可以智能地只扫描这些字段,而不会像hadoopFile方法一样简单粗暴地扫描全部数据。

Parquet

  Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet自动保存原始数据的类型,当写入Parquet文件时,所有的列会自动转为可空约束。

  • scala

// Encoders for most common types are automatically provided by importing spark.implicits._import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet("people.parquet")// Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFrameval parquetFileDF = spark.read.parquet("people.parquet")// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()// +------------+// |       value|// +------------+// |Name: Justin|// +------------+
  • java

import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.sql.Encoders;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;

Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write().parquet("people.parquet");// Read in the Parquet file created above.// Parquet files are self-describing so the schema is preserved// The result of loading a parquet file is also a DataFrameDataset<Row> parquetFileDF = spark.read().parquet("people.parquet");// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    Encoders.STRING());
namesDS.show();// +------------+// |       value|// +------------+// |Name: Justin|// +------------+
  • python

peopleDF = spark.read.json("examples/src/main/resources/people.json")# DataFrames can be saved as Parquet files, maintaining the schema information.peopleDF.write.parquet("people.parquet")# Read in the Parquet file created above.# Parquet files are self-describing so the schema is preserved.# The result of loading a parquet file is also a DataFrame.parquetFile = spark.read.parquet("people.parquet")# Parquet files can also be used to create a temporary view and then used in SQL statements.parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()# +------+# |  name|# +------+# |Justin|# +------+
  • sql

CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet")

SELECT * FROM parquetTable

JSON

  Spark SQL可以自动推断JSON数据集的结构,并加载为以Row为集合项的Dataset。

  默认Spark SQL读取的json文件不是常规的json文件,每一行必须包含一个独立的、自包含的有效JSOn对象。对于常规的多行JSON文件,设置multiLine选项为true即可。

  • scala

// Primitive types (Int, String, etc) and Product types (case classes) encoders are// supported by importing this when creating a Dataset.import spark.implicits._// A JSON dataset is pointed to by path.// The path can be either a single text file or a directory storing text filesval path = "examples/src/main/resources/people.json"val peopleDF = spark.read.json(path)// The inferred schema can be visualized using the printSchema() methodpeopleDF.printSchema()// root//  |-- age: long (nullable = true)//  |-- name: string (nullable = true)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by sparkval teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()// +------+// |  name|// +------+// |Justin|// +------+// Alternatively, a DataFrame can be created for a JSON dataset represented by// a Dataset[String] storing one JSON object per stringval otherPeopleDataset = spark.createDataset(  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()// +---------------+----+// |        address|name|// +---------------+----+// |[Columbus,Ohio]| Yin|// +---------------+----+
  • java

import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;// A JSON dataset is pointed to by path.// The path can be either a single text file or a directory storing text filesDataset<Row> people = spark.read().json("examples/src/main/resources/people.json");// The inferred schema can be visualized using the printSchema() methodpeople.printSchema();// root//  |-- age: long (nullable = true)//  |-- name: string (nullable = true)// Creates a temporary view using the DataFramepeople.createOrReplaceTempView("people");// SQL statements can be run by using the sql methods provided by sparkDataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();// +------+// |  name|// +------+// |Justin|// +------+// Alternatively, a DataFrame can be created for a JSON dataset represented by// a Dataset<String> storing one JSON object per string.List<String> jsonData = Arrays.asList(        "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();// +---------------+----+// |        address|name|// +---------------+----+// |[Columbus,Ohio]| Yin|// +---------------+----+
  • python

# spark is from the previous example.sc = spark.sparkContext# A JSON dataset is pointed to by path.# The path can be either a single text file or a directory storing text filespath = "examples/src/main/resources/people.json"peopleDF = spark.read.json(path)# The inferred schema can be visualized using the printSchema() methodpeopleDF.printSchema()# root#  |-- age: long (nullable = true)#  |-- name: string (nullable = true)# Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")# SQL statements can be run by using the sql methods provided by sparkteenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()# +------+# |  name|# +------+# |Justin|# +------+# Alternatively, a DataFrame can be created for a JSON dataset represented by# an RDD[String] storing one JSON object per stringjsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()# +---------------+----+# |        address|name|# +---------------+----+# |[Columbus,Ohio]| Yin|# +---------------+----+
  • sql

CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path "examples/src/main/resources/people.json")

SELECT * FROM jsonTable

Hive

  Spark SQL支持任何Hive支持的存储格式(SerDe),包括文本文件、RCFiles、ORC、Parquet、Avro及Protocol Buffer等。

  如果已配置好hive环境,将hive-site.xml,core-site.xml(用于安全配置),hdfs-site.xml(HDFS配置)放到conf目录下;如果没有hive环境,Spark SQL会自动在spark-warehouse(spark.sql.warehouse.dir配置项)目录下创建metastore_db。另外,需要赋予执行spark应用的用户写权限。

  • scala

import java.io.Fileimport org.apache.spark.sql.Rowimport org.apache.spark.sql.SparkSessioncase class Record(key: Int, value: String)// warehouseLocation points to the default location for managed databases and tablesval warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()import spark.implicits._import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQLsql("SELECT * FROM src").show()// +---+-------+// |key|  value|// +---+-------+// |238|val_238|// | 86| val_86|// |311|val_311|// ...// Aggregation queries are also supported.sql("SELECT COUNT(*) FROM src").show()// +--------+// |count(1)|// +--------+// |    500 |// +--------+// The results of SQL queries are themselves DataFrames and support all normal functions.val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")// The items in DataFrames are of type Row, which allows you to access each column by ordinal.val stringsDS = sqlDF.map {  case Row(key: Int, value: String) => s"Key: $key, Value: $value"}
stringsDS.show()// +--------------------+// |               value|// +--------------------+// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// ...// You can also use DataFrames to create temporary views within a SparkSession.val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")// Queries can then join DataFrame data with data stored in Hive.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()// +---+------+---+------+// |key| value|key| value|// +---+------+---+------+// |  2| val_2|  2| val_2|// |  4| val_4|  4| val_4|// |  5| val_5|  5| val_5|// ...
  • java

import java.io.File;import java.io.Serializable;import java.util.ArrayList;import java.util.List;import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Encoders;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;public static class Record implements Serializable {  private int key;  private String value;  public int getKey() {    return key;
  }  public void setKey(int key) {    this.key = key;
  }  public String getValue() {    return value;
  }  public void setValue(String value) {    this.value = value;
  }
}// warehouseLocation points to the default location for managed databases and tablesString warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");// Queries are expressed in HiveQLspark.sql("SELECT * FROM src").show();// +---+-------+// |key|  value|// +---+-------+// |238|val_238|// | 86| val_86|// |311|val_311|// ...// Aggregation queries are also supported.spark.sql("SELECT COUNT(*) FROM src").show();// +--------+// |count(1)|// +--------+// |    500 |// +--------+// The results of SQL queries are themselves DataFrames and support all normal functions.Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");// The items in DataFrames are of type Row, which lets you to access each column by ordinal.Dataset<String> stringsDS = sqlDF.map(
    (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
    Encoders.STRING());
stringsDS.show();// +--------------------+// |               value|// +--------------------+// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// ...// You can also use DataFrames to create temporary views within a SparkSession.List<Record> records = new ArrayList<>();for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");// Queries can then join DataFrames data with data stored in Hive.spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();// +---+------+---+------+// |key| value|key| value|// +---+------+---+------+// |  2| val_2|  2| val_2|// |  2| val_2|  2| val_2|// |  4| val_4|  4| val_4|// ...
  • python

from os.path import expanduser, join, abspathfrom pyspark.sql import SparkSessionfrom pyspark.sql import Row# warehouse_location points to the default location for managed databases and tableswarehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()# spark is an existing SparkSessionspark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")# Queries are expressed in HiveQLspark.sql("SELECT * FROM src").show()# +---+-------+# |key|  value|# +---+-------+# |238|val_238|# | 86| val_86|# |311|val_311|# ...# Aggregation queries are also supported.spark.sql("SELECT COUNT(*) FROM src").show()# +--------+# |count(1)|# +--------+# |    500 |# +--------+# The results of SQL queries are themselves DataFrames and support all normal functions.sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")# The items in DataFrames are of type Row, which allows you to access each column by ordinal.stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))for record in stringsDS.collect():
    print(record)# Key: 0, Value: val_0# Key: 0, Value: val_0# Key: 0, Value: val_0# ...# You can also use DataFrames to create temporary views within a SparkSession.Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")# Queries can then join DataFrame data with data stored in Hive.spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()# +---+------+---+------+# |key| value|key| value|# +---+------+---+------+# |  2| val_2|  2| val_2|# |  4| val_4|  4| val_4|# |  5| val_5|  5| val_5|# ...

JDBC连接

  Spark SQL可以使用JDBC连接读写关系型数据库中的数据。这种方式比使用spark core中的JdbcRDD要好,因为生成的DataFrame可以很容易被处理。

  • scala

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods// Loading data from a JDBC sourceval jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// Saving data to a JDBC sourcejdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// Specifying create table column data types on writejdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
  • java

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods// Loading data from a JDBC sourceDataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);// Saving data to a JDBC sourcejdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);// Specifying create table column data types on writejdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
  • python

# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods# Loading data from a JDBC sourcejdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})# Saving data to a JDBC sourcejdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})# Specifying create table column data types on writejdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})
  • sql

CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename",
  user 'username',
  password 'password')

INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable

         

             




作者:java大数据编程
链接:https://www.jianshu.com/p/7e3ab68d3c2f


0人推荐
随时随地看视频
慕课网APP