手记

Spark Connector Reader 原理与实践

本文主要讲述如何利用 Spark Connector 进行 Nebula Graph 数据的读取。

Spark Connector 简介

Spark Connector 是一个 Spark 的数据连接器,可以通过该连接器进行外部数据系统的读写操作,Spark Connector 包含两部分,分别是 Reader 和 Writer,而本文侧重介绍 Spark Connector Reader,Writer 部分将在下篇和大家详聊。

Spark Connector Reader 原理

Spark Connector Reader 是将 Nebula Graph 作为 Spark 的扩展数据源,从 Nebula Graph 中将数据读成 DataFrame,再进行后续的 map、reduce 等操作。

[Spark SQL ]允许用户自定义数据源,支持对外部数据源进行扩展。通过 Spark SQL 读取的数据格式是以命名列方式组织的分布式数据集 DataFrame,Spark SQL 本身也提供了众多 API 方便用户对 DataFrame 进行计算和转换,能对多种数据源使用 DataFrame 接口。

Spark 调用外部数据源包的是 org.apache.spark.sql,首先了解下 Spark SQL 提供的的扩展数据源相关的接口。

Basic Interfaces

  • BaseRelation:表示具有已知 Schema 的元组集合。所有继承 BaseRelation 的子类都必须生成 StructType 格式的 Schema。换句话说,BaseRelation 定义了从数据源中读取的数据在 Spark SQL 的 DataFrame 中存储的数据格式的。

  • RelationProvider:获取参数列表,根据给定的参数返回一个新的 BaseRelation。

  • DataSourceRegister:注册数据源的简写,在使用数据源时不用写数据源的全限定类名,而只需要写自定义的 shortName 即可。

Providers

  • RelationProvider:从指定数据源中生成自定义的 relation。 createRelation() 会基于给定的 Params 参数生成新的 relation。

  • SchemaRelationProvider:可以基于给定的 Params 参数和给定的 Schema 信息生成新的 Relation。

RDD

  • RDD[InternalRow]: 从数据源中 Scan 出来后需要构造成 RDD[Row]

要实现自定义 Spark 外部数据源,需要根据数据源自定义上述部分方法。

在 Nebula Graph 的 Spark Connector 中,我们实现了将 Nebula Graph 作为 Spark SQL 的外部数据源,通过 sparkSession.read 形式进行数据的读取。该功能实现的类图展示如下:

  1. 定义数据源 NebulaRelatioProvider,继承 RelationProvider 进行 relation 自定义,继承 DataSourceRegister 进行外部数据源的注册。

  2. 定义 NebulaRelation 定义 Nebula Graph 的数据 Schema 和数据转换方法。在 getSchema() 方法中连接 Nebula Graph 的 Meta 服务获取配置的返回字段对应的 Schema 信息。

  3. 定义 NebulaRDD 进行 Nebula Graph 数据的读取。 compute() 方法中定义如何读取 Nebula Graph 数据,主要涉及到进行 Nebula Graph 数据 Scan、将读到的 Nebula Graph Row 数据转换为 Spark 的 InternalRow 数据,以 InternalRow 组成 RDD 的一行,其中每一个 InternalRow 表示 Nebula Graph 中的一行数据,最终通过分区迭代的形式将 Nebula Graph 所有数据读出组装成最终的 DataFrame 结果数据。

Spark Connector Reader 实践

Spark Connector 的 Reader 功能提供了一个接口供用户编程进行数据读取。一次读取一个点/边类型的数据,读取结果为 DataFrame。

下面开始实践,拉取 GitHub 上 Spark Connector 代码:


git clone -b v1.0 git@github.com:vesoft-inc/nebula-java.git

cd nebula-java/tools/nebula-spark

mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true

将编译打成的包 copy 到本地 Maven 库。

应用示例如下:

  1. 在 mvn 项目的 pom 文件中加入 nebula-spark 依赖

<dependency>

<groupId>com.vesoft</groupId>

<artifactId>nebula-spark</artifactId>

<version>1.1.0</version>

</dependency>

  1. 在 Spark 程序中读取 Nebula Graph 数据:

// 读取 Nebula Graph 点数据

val vertexDataset: Dataset[Row] =

spark.read

.nebula("127.0.0.1:45500", "spaceName", "100")

.loadVerticesToDF("tag", "field1,field2")

vertexDataset.show()

// 读取 Nebula Graph 边数据

val edgeDataset: Dataset[Row] =

spark.read

.nebula("127.0.0.1:45500", "spaceName", "100")

.loadEdgesToDF("edge", "*")

edgeDataset.show()

配置说明:

  • nebula(address: String, space: String, partitionNum: String)

address:可以配置多个地址,以英文逗号分割,如“ip1:45500,ip2:45500”

space: Nebula Graph 的 graphSpace

partitionNum: 设定spark读取Nebula时的partition数,尽量使用创建 Space 时指定的 Nebula Graph 中的 partitionNum,可确保一个Spark的partition读取Nebula Graph一个part的数据。

  • loadVertices(tag: String, fields: String)

tag:Nebula Graph 中点的 Tag

fields:该 Tag 中的字段,,多字段名以英文逗号分隔。表示只读取 fields 中的字段,* 表示读取全部字段

  • loadEdges(edge: String, fields: String)

edge:Nebula Graph 中边的 Edge

fields:该 Edge 中的字段,多字段名以英文逗号分隔。表示只读取 fields 中的字段,* 表示读取全部字段

0人推荐
随时随地看视频
慕课网APP