背景
接触SparkSQL不久,查找了些别人的资料,感觉对整个Spark HiveThriftServer2流程讲的糊里糊涂的,觉得需要从Beeline连接HiveThriftServer2开始,梳理下执行SQL的流程。
由于公司Spark集群对上层提供的主要是SparkSQL服务,之前对Hive2也接触较少,总好奇运行在Spark On Yarn的任务,是如何同Hive2做交互的,最终真正的物理计划(Task)又是如何跑在Spark上。
该文比较简单地梳理下,Spark HiveThrift2是如何重载HiveCli/HiveSession/OperationManger等实现从Beeline入口,到Spark SQL上的操作的。
随后的SQL解析、逻辑计划、物理计划、Spark On Yarn的执行,找时间一连串的梳理一遍。
一、 Spark HiveThriftServer2启动流程
1. 从Beeline开始
Beeline代码没有认真对,主要是找出Beeline->HiveCli交互的入口,实际上Beeline是通过JDBC同HiveCli进行连接的。
beeline.main()->mainWithInputRedirection()->begin()->execute()->dispatch()
boolean dispath(String line){ ... if (isBeeLine) { if (line.startsWith(COMMAND_PREFIX)) { // handle SQLLine command in beeline which starts with ! and does not end with ; return execCommandWithPrefix(line); } else { return commands.sql(line, getOpts().getEntireLineAsCommand()); } } else { return commands.sql(line, getOpts().getEntireLineAsCommand()); } }
Commands.sql()->execute()->executeInternal()
private boolean executeInternal(String sql, boolean call) { ... try { Statement stmnt = null; boolean hasResults; Thread logThread = null; try { long start = System.currentTimeMillis(); if (call) { stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql); hasResults = ((CallableStatement) stmnt).execute(); } else { // 创建statement(HiveStatement重载了sql.Statement) stmnt = beeLine.createStatement(); if (beeLine.getOpts().isSilent()) { // 执行sql hasResults = stmnt.execute(sql); } else { logThread = new Thread(createLogRunnable(stmnt)); logThread.setDaemon(true); logThread.start(); hasResults = stmnt.execute(sql); logThread.interrupt(); logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); } } }finally{ ... } }
HiveStatement真正处理sql的方法:
public boolean execute(String sql) throws SQLException { // 此处为真正处理sql的函数 runAsyncOnServer(sql); waitForOperationToComplete(); // 此处为已经查询完毕,format输出结果 // The query should be completed by now if (!stmtHandle.isHasResultSet()) { return false; } resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) .setScrollable(isScrollableResultset) .build(); return true; } // 异步地同Server交互(其实是Cli层) private void runAsyncOnServer(String sql) throws SQLException { checkConnection("execute"); closeClientOperation(); initFlags(); TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql); /** * Run asynchronously whenever possible * Currently only a SQLOperation can be run asynchronously, * in a background operation thread * Compilation is synchronous and execution is asynchronous */ execReq.setRunAsync(true); execReq.setConfOverlay(sessConf); execReq.setQueryTimeout(queryTimeout); try { // client 为TCLIService.Iface类型,基于RPC执行该Statement TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); isExecuteStatementFailed = false; } catch (SQLException eS) { isExecuteStatementFailed = true; throw eS; } catch (Exception ex) { isExecuteStatementFailed = true; throw new SQLException(ex.toString(), "08S01", ex); } } public abstract class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable { ... }
2. 走到了ThriftCLIService
先来看看被beeline调用的ExecuteStatement()函数做了什么事情,其就是基于Thrift协议获取请求信息,同时将cliService执行的结果以Thrift协议返回。真正的执行还要往下走。
@Override public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException { TExecuteStatementResp resp = new TExecuteStatementResp(); try { SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); String statement = req.getStatement(); Map<String, String> confOverlay = req.getConfOverlay(); Boolean runAsync = req.isRunAsync(); long queryTimeout = req.getQueryTimeout(); //此处cliService为CliService变量,真正执行操作还要往下走(快要逼近真相了..) OperationHandle operationHandle = runAsync ? cliService.executeStatementAsync(sessionHandle, statement, confOverlay, queryTimeout) : cliService.executeStatement(sessionHandle, statement, confOverlay, queryTimeout); resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { // Note: it's rather important that this (and other methods) catch Exception, not Throwable; // in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used // to also catch all errors; and now it allows OOMs only to propagate. LOG.warn("Error executing statement: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); } return resp; }
3. CliService做了什么?
此处为破解Spark同HiveCli交互真相的关键,看到之类所有的方法类,是不是都在Spark/sql/HiveThrift下见过相似的变量。
真相其实很简单,Spark HiveThrift2所做的是事情就是将下面这个处理用到的SessionManager/OperationManager/CliService方法重载下,下面这个方法调用的时候就跑到SparkSQL上了。
不信,你往下看。
/** * Execute statement on the server with a timeout. This is a blocking call. */ @Override public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay, queryTimeout); LOG.debug(sessionHandle + ": executeStatement()"); return opHandle; }
4. 回到SparkSQLCliService
SparkSQLCLIService继承CLIService,同时重载了init()方法。init()方法,将sessionManager至为SparkSQLSessionManager()类。
这里有趣的ReflectedCompositeService类,基于反射原理设置制定父类的变量类型。(不细分析了,原理有别的文章解释)
private[hive] class SparkSQLCLIService( hiveServer: HiveServer2) extends CLIService(hiveServer) with ReflectedCompositeService { override def init(hiveConf: HiveConf) { this.hiveConf = hiveConf this.sessionManager = new SparkSQLSessionManager(hiveServer) addService(sessionManager) this.serviceUGI = MultiSparkSQLEnv.globalUgi initCompositeService(hiveConf) } ... private[thriftserver] trait ReflectedCompositeService { this: AbstractService => def initCompositeService(hiveConf: HiveConf) { // Emulating `CompositeService.init(hiveConf)` val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") serviceList.asScala.foreach(_.init(hiveConf)) // Emulating `AbstractService.init(hiveConf)` invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED) setAncestorField(this, 3, "hiveConf", hiveConf) invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED) getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.") } }
5. SparkSQLSessionManager做了些什么?
SparkSQLSessionManager为理解这一环最绕的一个地方,它通过重载openSession()来实现SparkSQLOperationManager的调用。
sessionManager.getSession(sessionHandle).executeStatement()
SparkSQLSessionManager类的具体实现:
private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) extends SessionManager(hiveServer) with ReflectedCompositeService { // !!! private lazy val sparkSqlOperationManager = new SparkSQLOperationManager() //重载init操作,实现backgroundPool,此为操作operation的线程池 override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) // Create operation log root directory, if operation logging is enabled if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { invoke(classOf[SessionManager], this, "initOperationLogRootDir") } val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) getAncestorField[Log](this, 3, "LOG").info( s"HiveServer2: Async execution pool size $backgroundPoolSize") // 此处将父类的operationManager致为sparkSqlOperationManager,在sparkSqlOperationManager类中,实现SparkSQL操作,而非Hive的MapReduce操作 setSuperField(this, "operationManager", sparkSqlOperationManager) addService(sparkSqlOperationManager) initCompositeService(hiveConf) } override def openSession( protocol: TProtocolVersion, username: String, passwd: String, ipAddress: String, sessionConf: java.util.Map[String, String], withImpersonation: Boolean, delegationToken: String): SessionHandle = { // 此处关键:先调用父类的该方法(该方法操作在下面),其实际上将 val sessionHandle = super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation, delegationToken) val session = super.getSession(sessionHandle) HiveThriftServer2.listener.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) { sqlContext } else { sqlContext.newSession() } ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) if (sessionConf != null && sessionConf.containsKey("use:database")) { ctx.sql(s"use ${sessionConf.get("use:database")}") } sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx) sessionHandle } }
Hive SessionManager中的openSession(),实际上是调用createSession()函数完成的,同时创建一个HiveSessionImpl/HiveSessionImplWithUGI/HiveSessionProxy返回。
public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username, String password, String ipAddress, Map<String, String> sessionConf, boolean withImpersonation, String delegationToken) throws HiveSQLException { HiveSession session; // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl. // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs if (withImpersonation) { HiveSessionImplwithUGI hiveSessionUgi; if (sessionImplWithUGIclassName == null) { hiveSessionUgi = new HiveSessionImplwithUGI(sessionHandle, protocol, username, password, hiveConf, ipAddress, delegationToken); } else { try { Class<?> clazz = Class.forName(sessionImplWithUGIclassName); Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, String.class, String.class, HiveConf.class, String.class, String.class); hiveSessionUgi = (HiveSessionImplwithUGI) constructor.newInstance(sessionHandle, protocol, username, password, hiveConf, ipAddress, delegationToken); } catch (Exception e) { throw new HiveSQLException("Cannot initilize session class:" + sessionImplWithUGIclassName); } } session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); hiveSessionUgi.setProxySession(session); } else { if (sessionImplclassName == null) { session = new HiveSessionImpl(sessionHandle, protocol, username, password, hiveConf, ipAddress); } else { try { Class<?> clazz = Class.forName(sessionImplclassName); Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, String.class, String.class, HiveConf.class, String.class); // 实际上是调用Hive中的HiveSessionImpl/HiveSessionImplWithUGI/HiveSessionProxy类... 实现的 session = (HiveSession) constructor.newInstance(sessionHandle, protocol, username, password, hiveConf, ipAddress); } catch (Exception e) { throw new HiveSQLException("Cannot initilize session class:" + sessionImplclassName, e); } } } // 此处是将sparkSqlOperationManager赋值于此,在HiveSessionImpl中会使用 session.setSessionManager(this); session.setOperationManager(operationManager); try { session.open(sessionConf); } catch (Exception e) { LOG.warn("Failed to open session", e); try { session.close(); } catch (Throwable t) { LOG.warn("Error closing session", t); } session = null; throw new HiveSQLException("Failed to open new session: " + e.getMessage(), e); } if (isOperationLogEnabled) { session.setOperationLogSessionDir(operationLogRootDir); } try { executeSessionHooks(session); } catch (Exception e) { LOG.warn("Failed to execute session hooks", e); try { session.close(); } catch (Throwable t) { LOG.warn("Error closing session", t); } session = null; throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e); } handleToSession.put(session.getSessionHandle(), session); LOG.info("Session opened, " + session.getSessionHandle() + ", current sessions:" + getOpenSessionCount()); return session; }
既然getSession()方法返回的HiveSessionImpl/HiveSessionImplWithUGI/HiveSessionProxy
, 那么自然而然都要实现executeStatement()方法了。而在这些类的内部,真正完成操作的是executeStatementInternal()函数,该函数实现很简单,具体如下:
private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { acquire(true, true); ExecuteStatementOperation operation = null; OperationHandle opHandle = null; try { // 看到了没? 由于之前我们重置了operationManager变量,此处实际上调用的是sparkSqlOperationManager类,总算解开谜底 operation = getOperationManager().newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync, queryTimeout); opHandle = operation.getHandle(); operation.run(); addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { // Refering to SQLOperation.java, there is no chance that a HiveSQLException throws and the // async background operation submits to thread pool successfully at the same time. So, Cleanup // opHandle directly when got HiveSQLException if (opHandle != null) { getOperationManager().closeOperation(opHandle); } throw e; } finally { if (operation == null || operation.getBackgroundHandle() == null) { release(true, true); // Not async, or wasn't submitted for some reason (failure, etc.) } else { releaseBeforeOpLock(true); // Release, but keep the lock (if present). } } }
Operation类为一个抽象类,继承该类的对象需要实现run()/internalRun()方法,在该执行操作中,真正执行为operator.run()。
6. SparkSQLOperationManager类
该类重载了newExecuteStatementOperation()函数, 最终返回的已经是一个SparkExecuteStatementOperation()对象了。
override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sessionHandle = parentSession.getSessionHandle val sparkSession = sessionToSparkSession.get(sessionHandle) var client = sessionToClient.get(sessionHandle) val formatted = statement.toLowerCase.split("//s+").mkString(" ") ... val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] val runInBackground = async && sessionState.hiveThriftServerAsync val operation = new SparkExecuteStatementOperation( parentSession, statement, client, confOverlay, runInBackground)(sparkSession, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") operation }
7. 一条SQL真正的命运
上文生成的SparkExecuteStatementOperation对象,调用runInternal()方法调用的是execute()。execute()真正调用的则是result = sparkSession.sql(statement)
, 做了这么多准备,真正的实现其实就像我们自己写个demo时的ss.sql()操作,感觉好费周折啊。
private def execute(): Unit = { statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sparkSession.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) HiveThriftServer2.listener.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, statement, statementId, parentSession.getUsername) sparkSession.sparkContext.setJobGroup(statementId, statement) val pool = sessionToActivePool.get(parentSession.getSessionHandle) if (pool != null) { sparkSession.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } try { // 一条SQL真正的命运 result = sparkSession.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => sessionToActivePool.put(parentSession.getSessionHandle, value) logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => } HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = sparkSession.conf.get("spark.sql.thriftServer.incrementalCollect", "false").toBoolean if (useIncrementalCollect) { result.toLocalIterator.asScala } else { result.collect().iterator } } val (itra, itrb) = iter.duplicate iterHeader = itra iter = itrb dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray } catch { case e: HiveSQLException => if (getStatus().getState() == OperationState.CANCELED) { return } else { setState(OperationState.ERROR) throw e } // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => val currentState = getStatus().getState() logError(s"Error executing query, currentState $currentState, ", e) setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) HiveThriftServer2.listener.onStatementFinish(statementId) }
二、总结
本文没有细枝末节的分析各个方法和变量,只是在脑海中走一遍一条SQL在Spark HiveThriftServer2环境中的运行流程,看看大致如何处理的。
而深有感触的是,基于Spark借Hive的壳子,通过java/scala语言的重载功能,很简单、方便的将原Hive功能扩展到Spark上,非常巧妙。
同时基于JDBC查询SQL的功能,中间经过了HiveCommands/HiveStatement/ThriftCli/CliService/SessionManager/SessionImpl/OperationManager/Operator等等许多层的抽象,虽然体会起来很繁杂,但之于上述所说的扩展工作,却显得万分必要。后续工作中,切记学习该项工作。
当然,此文仅仅分析了一条SQL刚刚开始执行的非常简单的逻辑,后续会着手再往下走,一条SQL是如何转变成一个个基于executor执行的task的,把Spark的逻辑梳理一遍。
作者:分裂四人组
链接:https://www.jianshu.com/p/679de255735a