在上一篇中介绍了Receiver在Driver的精妙实现,本篇内容主要介绍Receiver在Executor中的启动,数据接收和存储
从ReceiverTracker的start方法开始,调用launchReceivers()方法,给endpoint发送消息,endpoint.send(StartAllReceivers(receivers)),endpoint就是ReceiverTrackerEndpoint,也可以说是给自己的消息通讯体发送了一条消息。看接收到的消息
case StartAllReceivers(receivers) => val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) // 循环启动receiver for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation //启动receiver startReceiver(receiver, executors) }
startReceiver(receiver, executors)循环调用,每一个receiver会启动一个job。startReceiver的代码如下
private def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]): Unit = { def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node // 在worker节点启动receiver的方法,(就是action中的方法) val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException("Could not start receiver as object not found.") } //判断task的重试次数为0,就是没有task失败后,重试运行不执行以下代码 if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) //这里创建接收器管理者,在start方法里启动receiver接收数据 val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledLocations to run the receiver in a Spark job // 创建接收数据的RDD val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { // ssc.sc.makeRDD(Seq(receiver), 1) } else { // 根据数据本地性创建receiverRDD val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } // 对job进行一些配置 receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) // 到这里就提交了receiverRDD到集群中 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { // 重启receiver logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") // 重启receiver self.send(RestartReceiver(receiver)) } }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started") }
在startReceiverFunc函数中定义了从iterator中取一条记录,也就是receiver,然后实例化一个ReceiverSupervisorImpl,把receiver传递进入,然后调用ReceiverSupervisorImpl的start方法。当然这里并没有启动ReceiverSupervisorImpl,只是定义了操作而已,真正的执行是在Executor中。
然后提交ReceiverRDD到集群运行,代码如下
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
通过startReceiverFunc函数 来看ReceiverSupervisorImpl在Executor上的运行。
从supervisor.start()开始,start方法代码如下
def start() { onStart() startReceiver() }
onStart方法代码如下
/** * Called when supervisor is started. * Note that this must be called before the receiver.onStart() is called to ensure * things like [[BlockGenerator]]s are started before the receiver starts sending data. */protected def onStart() { }
重点是看onStart的注释,注释内容说在receiver.onStart()之前,必须BlockGenerator先启动,以保证接收到的数据能够被存储起来。看onStart方法的子类实现,代码如下
override protected def onStart() { registeredBlockGenerators.foreach { _.start() } }
registeredBlockGenerators在ReceiverSupervisorImpl实例化的时候创建,代码如下
private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
registeredBlockGenerators在createBlockGenerator方法中添加了BlockGenerator,代码如下
override def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { // Cleanup BlockGenerators that have already been stopped registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() } // 每一个receiver创建一个BlockGenerator,因为streamId一一对应receiver val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) registeredBlockGenerators += newBlockGenerator newBlockGenerator }
那么createBlockGenerator在什么时候被调用呢?看代码
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
registeredBlockGenerators的BlockGenerator已经有了,看BlockGenerator的start()方法,代码如下
def start(): Unit = synchronized { if (state == Initialized) { state = Active blockIntervalTimer.start() blockPushingThread.start() logInfo("Started BlockGenerator") } else { throw new SparkException( s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]") } }
这里启动了blockIntervalTimer和blockPushingThread,blockIntervalTimer就是一个定时器,默认每200ms回调一下updateCurrentBuffer方法,回调时间通过参数spark.streaming.blockInterval设置,这也是一个性能调优的参数,时间过短太造成block碎片太多,时间过长可能导致block块过大,具体时间长短要根据实际业务而定,updateCurrentBuffer方法作用就是将接收到的数据包装到block存储,代码后面再看;blockPushingThread作用是定时从blocksForPushing队列中取block,然后存储,并向ReceiverTrackerEndpoint汇报,代码后面再看
BlockGenerator启动之后接着看 supervisor.start()方法中的 startReceiver()方法, startReceiver()代码如下
def startReceiver(): Unit = synchronized { try { if (onReceiverStart()) { logInfo("Starting receiver") receiverState = Started receiver.onStart() logInfo("Called receiver onStart") } else { // The driver refused us stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) } } catch { case NonFatal(t) => stop("Error starting receiver " + streamId, Some(t)) } }
首先判断onReceiverStart()的返回值,onReceiverStart()代码在子类中的实现如下
override protected def onReceiverStart(): Boolean = { val msg = RegisterReceiver( streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) trackerEndpoint.askWithRetry[Boolean](msg) }
onReceiverStart内部向trackerEndpoint发送了一条RegisterReceiver注册receiver的消息,在trackerEndpoint内部收到消息后,将注册信息包装到一个ReceiverTrackingInfo的case class类中,然后把ReceiverTrackingInfo按照k-v的方式put到receiverTrackingInfos中,key就是streamId,再次说明一个inputDstream对应一个receiver。
回到上面的调用返回true,将receiverState 标记为Started,然后调用了receiver的onStart方法。
以SocketReceiver为例,看SocketReceiver的onStart方法 ,启动了一条后台线程,调用receive()方法接收数据,代码如下
def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() }
接着看receive()方法,代码如下
def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } }
receiver方法的内容就很简单了,启动一个socket接收数据,接收一行就调用store方法存储起来,store方法的代码如下
def store(dataItem: T) { supervisor.pushSingle(dataItem) }
调用supervisor的pushSingle方法,supervisor就是ReceiverSupervisor的实现类ReceiverSupervisorImpl的方法,代码如下
def pushSingle(data: Any) { defaultBlockGenerator.addData(data) }
defaultBlockGenerator在上面说过,他是ReceiverSupervisorImpl的一个成员变量,接着看他的addData方法,代码如下
def addData(data: Any): Unit = { if (state == Active) { waitToPush() synchronized { if (state == Active) { currentBuffer += data } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } } } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } }
currentBuffer += data,在currentBuffer 上不断的累加数据,那么currentBuffer 的数据是怎样存储起来的呢,这时候就用到了前面介绍的 blockIntervalTimer和blockPushingThread
首先看blockIntervalTimer定时回调的updateCurrentBuffer()方法,代码如下
private def updateCurrentBuffer(time: Long): Unit = { try { var newBlock: Block = null synchronized { if (currentBuffer.nonEmpty) { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) } } if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } }
将currentBuffer交给newBlockBuffer ,然后实例化一个空的ArrayBuffer给currentBuffer,接着实例化一个Block把newBlockBuffer 传递进去,最后把newBlock 放入到blocksForPushing队列中
接下来就是blockPushingThread干的活了,在blockPushingThread线程中调用keepPushingBlocks方法,代码如下
private def keepPushingBlocks() { logInfo("Started block pushing thread") def areBlocksBeingGenerated: Boolean = synchronized { state != StoppedGeneratingBlocks } try { // While blocks are being generated, keep polling for to-be-pushed blocks and push them. while (areBlocksBeingGenerated) { Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => } } // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks. logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") while (!blocksForPushing.isEmpty) { val block = blocksForPushing.take() logDebug(s"Pushing block $block") pushBlock(block) logInfo("Blocks left to push " + blocksForPushing.size()) } logInfo("Stopped block pushing thread") } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") case e: Exception => reportError("Error in block pushing thread", e) } }
从blocksForPushing队列中定时取出block然后pushBlock,代码如下
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None =>}
接着看pushBlock(block)方法,代码如下
listener.onPushBlock(block.id, block.buffer)
这里调用了listener的onPushBlock方法,那么listener是从哪来的,查询一下listener变量,listener是在BlockGenerator实例化的时候传递进来的,找BlockGenerator的实例化,是通过createBlockGenerator方法接收的参数并传递给BlockGenerator。找createBlockGenerator方法的调用,终于看到了defaultBlockGeneratorListener的实例化,代码如下
private val defaultBlockGeneratorListener = new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { } def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { reportError(message, throwable) } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId)) } }
原来onPushBlock方法在这里,看pushArrayBuffer的调用 ,pushArrayBuffer方法的代码如下
def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) }
重磅性的一行代码出现了 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption),代码如下
def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") }
这里面做了几事件事,第一调用receivedBlockHandler来存储block
第二向trackerEndpoint汇报block的存储结果blockInfo
receivedBlockHandler是在ReceiverSupervisorImpl实例化的时候创建的,代码如下
private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) } }
有两种类型,一种的WAL方式,还有一种普通的方式。WAL的方式以后再看,这里看BlockManagerBasedBlockHandler,代码如下
private[streaming] class BlockManagerBasedBlockHandler( blockManager: BlockManager, storageLevel: StorageLevel) extends ReceivedBlockHandler with Logging { def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true) numRecords = countIterator.count putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } if (!putResult.map { _._1 }.contains(blockId)) { throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel") } BlockManagerBasedStoreResult(blockId, numRecords) } def cleanupOldBlocks(threshTime: Long) { // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing // of BlockRDDs. } }
这里就是借助BlockManager来存储block并返回block存储的元数据,终于看完了receiver的整个数据接收和存储。
整个过程还是很清晰的,如果有张流程图就最好了,流程图以后补上,谢谢
作者:海纳百川_spark
链接:https://www.jianshu.com/p/fd48919ad1ed