本文基于 incubator-livy 0.4.0-incubating
一文主要介绍了 session 整体的启动流程并详细分析了 client 端(livy server 端)是如何启动 driver 以及建立连接的。本文将进一步分析 session server 端(即 driver 内部)是如何启动、初始化的以及执行代码片段的。
注:如果对 livy 的整体架构以及 session client 端不了解,请先阅读以下两篇相关文章:
一、整体启动、初始化流程
如上图所示,driver 内部的启动流程可以分为以下五个步骤:
创建 ReplDriver 实例
初始化 server
初始化 SparkContext
创建 JobContextImpl 实例并执行 jobs
等待退出
1.1、创建 ReplDriver 实例
ReplDriver 是 InteractiveSession 对应的 Spark App driver,用来接收 livy server 的各种请求并进行处理。也是 RSCDriver 的子类,RSCDriver:
持有等待 RSCClient 进行连接的
RpcServer server
初始化 SparkContext
处理各种请求:CancelJob、EndSession、JobRequest、BypassJobRequest、SyncJobRequest、GetBypassJobStatus
处理 add file 请求
除了能处理 RSCDriver 支持的请求外,ReplDriver 还能处理:BaseProtocol.ReplJobRequest、BaseProtocol.CancelReplJobRequest、BaseProtocol.GetReplJobResults 请求,这些请求对应的是序列化的 job (GitHub - cloudera/livy: Livy is an open source REST interface for interacting with Apache Spark from anywhere)相关的请求。
1.2、初始化 server
这一步在 RSCDriver#initializeServer()
中调用,用于连接 client 并告知 server 端 rpc 地址,client 获知 server rpc 地址后会进行连接并发送请求。
1.3、初始化 SparkContext
1.3.1、创建解释器
会根据不同的 kind 创建不同类型的解释器,kind 在创建 session 的 request body 中指定。这些解释器有继承共同的 treat Interpreter,其类图如下:
其中的 execute 方法用来执行代码片段:
pyspark 类型的解释器用于执行 python、pyspark 代码片段
pyspark3类型的解释器用于执行 python3、 python3 spark 代码片段
spark 类型的解释器用于执行 scala、scala spark 代码片段
sparks 类型的解释器用于执行 r、r spark 代码片段
1.3.2、创建 repl/Session
repl/Session(用于和 sessions/Session
进行区分,后文简称 Session)是 server 端中至关重要的类。主要职责是:
启动 interpreter,并获取 SparkContext
持有线程池来异步执行 statements(通过 interpreter 来执行)
持有线程池来异步取消 statements
管理一个 session 下所有的 statements
在构造 Session 的过程中,会初始化用于执行 statement 的 interpreterExecutor,如下:
private val interpreterExecutor = ExecutionContext.fromExecutorService( Executors.newSingleThreadExecutor())
可以看到,这个线程只有一个线程,也就是说在一个 Session 中的 statement 是串行的,一个 statement 执行完才会执行下一个。这种串行的方式有明显的弊端,即当 Session 的资源足以执行多个 statement 时,也只能一个接着一个执行,这既浪费了资源,有延长了任务运行的整体时间。那为什么还要这么做呢?主要是因为目前 livy 中的一个 Session 仅包含一个 interpreter,如果一个 interpreter 同时执行多段代码片段,很容易会出现穿插执行的错误。要解决这一困境的思路主要有两个:
不使用 interpreter 来执行代码片段
一个 Session 包含多个 interpreter,每个 interpreter 同一时间也只执行一个 statement
目前,我们正在做这方面的工作,等完工之后可以再进一步说明下。
1.3.3、启动 Session
主要是调用 interpreter#start
,该启动也是提交到 interpreterExecutor 中执行的,在启动后就会将 Session 的 state 修改为 idle。我们来看看 Spark 类型的 Session 的 interpreter 启动过程:
SparkInterpreter#start()
以上,就是 Session server 端的详细的启动过程,下一篇我们将看看代码片段是怎么执行的。
作者:牛肉圆粉不加葱
链接:https://www.jianshu.com/p/968a92c297d1