手记

Livy Session 详解(中)

本文基于 incubator-livy 0.4.0-incubating

一文主要介绍了 session 整体的启动流程并详细分析了 client 端(livy server 端)是如何启动 driver 以及建立连接的。本文将进一步分析  session server 端(即 driver 内部)是如何启动、初始化的以及执行代码片段的。

注:如果对 livy 的整体架构以及 session client 端不了解,请先阅读以下两篇相关文章:

一、整体启动、初始化流程

如上图所示,driver 内部的启动流程可以分为以下五个步骤:

  1. 创建 ReplDriver 实例

  2. 初始化 server

  3. 初始化 SparkContext

  4. 创建 JobContextImpl 实例并执行 jobs

  5. 等待退出

1.1、创建 ReplDriver 实例

ReplDriver 是 InteractiveSession 对应的 Spark App driver,用来接收 livy server 的各种请求并进行处理。也是 RSCDriver 的子类,RSCDriver:

  • 持有等待 RSCClient 进行连接的 RpcServer server

  • 初始化 SparkContext

  • 处理各种请求:CancelJob、EndSession、JobRequest、BypassJobRequest、SyncJobRequest、GetBypassJobStatus

  • 处理 add file 请求

除了能处理 RSCDriver 支持的请求外,ReplDriver 还能处理:BaseProtocol.ReplJobRequest、BaseProtocol.CancelReplJobRequest、BaseProtocol.GetReplJobResults 请求,这些请求对应的是序列化的 job (GitHub - cloudera/livy: Livy is an open source REST interface for interacting with Apache Spark from anywhere)相关的请求。

1.2、初始化 server

这一步在 RSCDriver#initializeServer() 中调用,用于连接 client 并告知 server 端 rpc 地址,client 获知 server rpc 地址后会进行连接并发送请求。

1.3、初始化 SparkContext

1.3.1、创建解释器

会根据不同的 kind 创建不同类型的解释器,kind 在创建 session 的 request body 中指定。这些解释器有继承共同的 treat Interpreter,其类图如下:


其中的 execute 方法用来执行代码片段:

  • pyspark 类型的解释器用于执行 python、pyspark 代码片段

  • pyspark3类型的解释器用于执行 python3、 python3 spark 代码片段

  • spark 类型的解释器用于执行 scala、scala spark 代码片段

  • sparks 类型的解释器用于执行 r、r spark 代码片段

1.3.2、创建 repl/Session

repl/Session(用于和 sessions/Session 进行区分,后文简称 Session)是 server 端中至关重要的类。主要职责是:

  1. 启动 interpreter,并获取 SparkContext

  2. 持有线程池来异步执行 statements(通过 interpreter 来执行)

  3. 持有线程池来异步取消 statements

  4. 管理一个 session 下所有的 statements

在构造 Session 的过程中,会初始化用于执行 statement 的 interpreterExecutor,如下:

private val interpreterExecutor = ExecutionContext.fromExecutorService(    Executors.newSingleThreadExecutor())

可以看到,这个线程只有一个线程,也就是说在一个 Session 中的 statement 是串行的,一个 statement 执行完才会执行下一个。这种串行的方式有明显的弊端,即当 Session 的资源足以执行多个 statement 时,也只能一个接着一个执行,这既浪费了资源,有延长了任务运行的整体时间。那为什么还要这么做呢?主要是因为目前 livy 中的一个 Session 仅包含一个 interpreter,如果一个 interpreter 同时执行多段代码片段,很容易会出现穿插执行的错误。要解决这一困境的思路主要有两个:

  • 不使用 interpreter 来执行代码片段

  • 一个 Session 包含多个 interpreter,每个 interpreter 同一时间也只执行一个 statement
    目前,我们正在做这方面的工作,等完工之后可以再进一步说明下。

1.3.3、启动 Session

主要是调用 interpreter#start,该启动也是提交到 interpreterExecutor 中执行的,在启动后就会将 Session 的 state 修改为 idle。我们来看看 Spark 类型的 Session 的 interpreter 启动过程:

SparkInterpreter#start()


以上,就是 Session server 端的详细的启动过程,下一篇我们将看看代码片段是怎么执行的。



作者:牛肉圆粉不加葱
链接:https://www.jianshu.com/p/968a92c297d1


0人推荐
随时随地看视频
慕课网APP