概述
spark的内存管理有两套方案,新旧方案分别对应的类是UnifiedMemoryManager和StaticMemoryManager。
旧方案是静态的,storageMemory(存储内存)和executionMemory(执行内存)拥有的内存是独享的不可相互借用,故在其中一方内存充足,另一方内存不足但又不能借用的情况下会造成资源的浪费。新方案是统一管理的,初始状态是内存各占一半,但其中一方内存不足时可以向对方借用,对内存资源进行合理有效的利用,提高了整体资源的利用率。
总的来说内存分为三大块,包括storageMemory、executionMemory、系统预留,其中storageMemory用来缓存rdd,unroll partition,存放direct task result、广播变量,在 Spark Streaming receiver 模式中存放每个 batch 的 blocks。executionMemory用于shuffle、join、sort、aggregation 中的缓存。除了这两者以外的内存都是预留给系统的。
旧方案 StaticMemoryManager
在SparkEnv中会创建memoryManager:
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) val memoryManager: MemoryManager = if (useLegacyMemoryManager) { new StaticMemoryManager(conf, numUsableCores) } else { UnifiedMemoryManager(conf, numUsableCores) }
默认使用的是统一管理方案UnifiedMemoryManager,这里我们简要的看看旧方案StaticMemoryManager。
storageMemory能分到的内存是:
systemMaxMemory * memoryFraction * safetyFraction
其中:
systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能获得的最大内存空间。
memoryFraction:由参数spark.storage.memoryFraction控制,默认0.6。
safetyFraction:由参数spark.storage.safetyFraction控制,默认是0.9,因为cache block都是估算的,所以需要一个安全系数来保证安全。
executionMemory能分到的内存是:
systemMaxMemory * memoryFraction * safetyFraction
其中:
systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能获得的最大内存空间。
memoryFraction:由参数spark.shuffle.memoryFraction控制,默认0.2。
safetyFraction:由参数spark.shuffle.safetyFraction控制,默认是0.8。
memoryFraction系数之外和安全系数之外的内存就是给系统预留的了。
executionMemory能分到的内存直接影响了shuffle中spill的频率,增加executionMemory可减少spill的次数,但storageMemory能cache的容量也相应减少。
execution 和 storage 被分配到内存后大小就一直不变了,每次申请内存都只能申请自己独有的不能相互借用,会造成资源的浪费。另外,只有 execution 内存支持 off heap,storage 内存不支持 off heap。
新方案 UnifiedMemoryManager
由于新方案中storageMemory和executionMemory是统一管理的,我们看看两者一共能拿到多少内存。
private def getMaxMemory(conf: SparkConf): Long = { val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) val reservedMemory = conf.getLong("spark.testing.reservedMemory", if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) val minSystemMemory = (reservedMemory * 1.5).ceil.toLong if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + s"option or spark.driver.memory in Spark configuration.") } // SPARK-12759 Check executor memory to fail fast if memory is insufficient if (conf.contains("spark.executor.memory")) { val executorMemory = conf.getSizeAsBytes("spark.executor.memory") if (executorMemory < minSystemMemory) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$minSystemMemory. Please increase executor memory using the " + s"--executor-memory option or spark.executor.memory in Spark configuration.") } } val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) (usableMemory * memoryFraction).toLong }
首先给系统内存reservedMemory预留了300M,若jvm能拿到的最大内存和配置的executor内存分别不足以reservedMemory的1.5倍即450M都会抛出异常,最后storage和execution能拿到的内存为:
(heap space - 300) * spark.memory.fraction (默认为0.6)
storage和execution各占所获内存的50%。
申请storage内存
为某个blockId申请numBytes大小的内存:
override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { assertInvariants() assert(numBytes >= 0) val (executionPool, storagePool, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, maxOnHeapStorageMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, maxOffHeapMemory) } // 申请的内存大于storage和execution内存之和 if (numBytes > maxMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxMemory bytes)") return false } // 大于storage空闲内存 if (numBytes > storagePool.memoryFree) { // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } storagePool.acquireMemory(blockId, numBytes) }
若申请的numBytes比两者总共的内存还大,直接返回false,说明申请失败。
若numBytes比storage空闲的内存大,则需要向executionPool借用
借用的大小为此时execution的空闲内存和numBytes的较小值(个人观点应该是和<numBytes-storage空闲内存>的较小值)
减小execution的poolSize
增加storage的poolSize
即使向executionPool借用了内存,但不一定就够numBytes,因为不可能把execution正在使用的内存都接过来,接着调用了storagePool的acquireMemory方法在不够numBytes的情况下去释放storage中共cache的rdd,以增加storagePool.memoryFree的值:
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized { val numBytesToFree = math.max(0, numBytes - memoryFree) acquireMemory(blockId, numBytes, numBytesToFree) }
计算出向execution借了内存后还差多少内存才能满足numBytes,即需要释放的内存numBytesToFree 。接着调用了acquireMemory方法:
def acquireMemory( blockId: BlockId, numBytesToAcquire: Long, numBytesToFree: Long): Boolean = lock.synchronized { assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) if (numBytesToFree > 0) { memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call // back into this StorageMemoryPool in order to free memory. Therefore, these variables // should have been updated. val enoughMemory = numBytesToAcquire <= memoryFree if (enoughMemory) { _memoryUsed += numBytesToAcquire } enoughMemory }
当numBytesToFree 大于0的情况下,就真的要去释放缓存在memory中的block,释放完后再看空闲内存是否能满足numBytes,若满足则将numBytes加到已使用的变量里。
看看当需要从storay中释放block的时候是怎么释放的:
private[spark] def evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long = { assert(space > 0) memoryManager.synchronized { var freedMemory = 0L val rddToAdd = blockId.flatMap(getRddId) val selectedBlocks = new ArrayBuffer[BlockId] def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) } // This is synchronized to ensure that the set of entries is not changed // (because of getValue or getBytes) while traversing the iterator, as that // can lead to exceptions. entries.synchronized { val iterator = entries.entrySet().iterator() while (freedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey val entry = pair.getValue if (blockIsEvictable(blockId, entry)) { // We don't want to evict blocks which are currently being read, so we need to obtain // an exclusive write lock on blocks which are candidates for eviction. We perform a // non-blocking "tryLock" here in order to ignore blocks which are locked for reading: if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { selectedBlocks += blockId freedMemory += pair.getValue.size } } } } def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { val data = entry match { case DeserializedMemoryEntry(values, _, _) => Left(values) case SerializedMemoryEntry(buffer, _, _) => Right(buffer) } val newEffectiveStorageLevel = blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag) if (newEffectiveStorageLevel.isValid) { // The block is still present in at least one store, so release the lock // but don't delete the block info blockInfoManager.unlock(blockId) } else { // The block isn't present in any store, so delete the block info so that the // block can be stored again blockInfoManager.removeBlock(blockId) } } if (freedMemory >= space) { logInfo(s"${selectedBlocks.size} blocks selected for dropping " + s"(${Utils.bytesToString(freedMemory)} bytes)") for (blockId <- selectedBlocks) { val entry = entries.synchronized { entries.get(blockId) } // This should never be null as only one task should be dropping // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { dropBlock(blockId, entry) } } logInfo(s"After dropping ${selectedBlocks.size} blocks, " + s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") freedMemory } else { blockId.foreach { id => logInfo(s"Will not store $id") } selectedBlocks.foreach { id => blockInfoManager.unlock(id) } 0L } } }
spark中内存中的block都是通过memoryStore来存储的,用
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
来维护了blockId和MemoryEntry(对应value的包装)的关联,另外方法中还定义了两个方法,blockIsEvictable方法是判断遍历到的blockId和当前blockId是否属于同一个rdd,因为不能提出同一个rdd的另外一个block。dropBlock方法就是真正执行从内存中移除block的,若StorageLevel包括了使用disk,则会写到磁盘文件。
整段代码的逻辑简单概述就是:遍历当前memoryStore中存的每个block(不是和当前请求的block属于同于同一rdd),直到block对应的内存之和大于所需释放的内存才停止遍历,也有可能遍历完了都还不能满足所需的内存。若能释放的内存满足所需的内存,则真正执行移除,否则不移除,因为不可能一个block在内存中一部分,在磁盘一部分,最后返回真正剔除block释放的内存。
总结一下向StorageMemory申请内存的过程(在MemoryMode.ON_HEAP模式下):
若numBytes大于storage和execution内存之和,抛异常。
若numBytes大于storage空闲内存,向execution借用min(executionFree,numBytes)大的内存,并更新各自的poolSize。
若申请完后还不够,则释放storage中的block来补足。
memoryStore缓存的block大小满足需要补足的大小,则真正执行剔除(遍历block直到内存满足需求对应的block),否则不剔除。
最终若空闲内存满足numBytes则返回true,否则返回false。
申请execution内存
在execution内存不足向storage借用时,还是不满足所需内存的情况下能借多少借多少。看看在需要向execution申请内存时是怎么处理的(MemoryMode.ON_HEAP模式下):
override private[memory] def acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { assertInvariants() assert(numBytes >= 0) val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, onHeapStorageRegionSize, maxHeapMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, offHeapStorageMemory, maxOffHeapMemory) } /** * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. * * When acquiring memory for a task, the execution pool may need to make multiple * attempts. Each attempt must be able to evict storage in case another task jumps in * and caches a large block between the attempts. This is called once per attempt. */ def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { if (extraMemoryNeeded > 0) { // There is not enough free memory in the execution pool, so try to reclaim memory from // storage. We can reclaim any free memory from the storage pool. If the storage pool // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim // the memory that storage has borrowed from execution. val memoryReclaimableFromStorage = math.max( storagePool.memoryFree, storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) storagePool.decrementPoolSize(spaceToReclaim) executionPool.incrementPoolSize(spaceToReclaim) } } } /** * The size the execution pool would have after evicting storage memory. * * The execution memory pool divides this quantity among the active tasks evenly to cap * the execution memory allocation for each task. It is important to keep this greater * than the execution pool size, which doesn't take into account potential memory that * could be freed by evicting storage. Otherwise we may hit SPARK-12155. * * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness * in execution memory allocation across tasks, Otherwise, a task may occupy more than * its fair share of execution memory, mistakenly thinking that other tasks can acquire * the portion of storage memory that cannot be evicted. */ def computeMaxExecutionPoolSize(): Long = { maxMemory - math.min(storagePool.memoryUsed, storageRegionSize) } executionPool.acquireMemory( numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) }
这里先讲解这里面的两个方法:
maybeGrowExecutionPool就是需要向storage借内存的方法,能借用的最大内存memoryReclaimableFromStorage 为storage的空闲内存和storage向execution借用的内存(即已经使用也要释放来归还)的较大值,若memoryReclaimableFromStorage为0,则说明storage之前没有向execution借用内存,并且此时storage没有空闲的内存可借。
最终申请借用的是所需内存和memoryReclaimableFromStorage的较小值(缺多少借多少),跟进storagePool.freeSpaceToShrinkPool方法看看其实现:
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory } }
若storage空闲内存不足以所申请的内存,则需要通过释放storage中缓存的block来补充。
方法computeMaxExecutionPoolSize即计算的是execution拥有的最大可用内存。
接着通过这两个函数作为参数调用了方法executionPool.acquireMemory:
private[memory] def acquireMemory( numBytes: Long, taskAttemptId: Long, maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized { assert(numBytes > 0, s"invalid number of bytes requested: $numBytes") // TODO: clean up this clunky method signature // Add this task to the taskMemory map just so we can keep an accurate count of the number // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory` if (!memoryForTask.contains(taskAttemptId)) { memoryForTask(taskAttemptId) = 0L // This will later cause waiting tasks to wake up and check numTasks again lock.notifyAll() } // Keep looping until we're either sure that we don't want to grant this request (because this // task would have more than 1 / numActiveTasks of the memory) or we have enough free // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)). // TODO: simplify this to limit each task to its own slot while (true) { val numActiveTasks = memoryForTask.keys.size val curMem = memoryForTask(taskAttemptId) // In every iteration of this loop, we should first try to reclaim any borrowed execution // space from storage. This is necessary because of the potential race condition where new // storage blocks may steal the free execution memory that this task was waiting for. maybeGrowPool(numBytes - memoryFree) // Maximum size the pool would have after potentially growing the pool. // This is used to compute the upper bound of how much memory each task can occupy. This // must take into account potential free memory as well as the amount this pool currently // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management, // we did not take into account space that could have been freed by evicting cached blocks. val maxPoolSize = computeMaxPoolSize() val maxMemoryPerTask = maxPoolSize / numActiveTasks val minMemoryPerTask = poolSize / (2 * numActiveTasks) // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)) // Only give it as much memory as is free, which might be none if it reached 1 / numTasks val toGrant = math.min(maxToGrant, memoryFree) // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking; // if we can't give it this much now, wait for other tasks to free up memory // (this happens if older tasks allocated lots of memory before N grew) if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) { logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free") lock.wait() } else { memoryForTask(taskAttemptId) += toGrant return toGrant } } 0L // Never reached }
里面定义了一个Task能使用的execution内存:
val maxPoolSize = computeMaxPoolSize() val maxMemoryPerTask = maxPoolSize / numActiveTasks val minMemoryPerTask = poolSize / (2 * numActiveTasks)
其中maxPoolSize 为从 storage 借用了内存后,executionMemoryPool 的最大可用内存,保证一个Task可用的内存在 1/2*numActiveTasks ~ 1/numActiveTasks 范围内,整体保证各个Task资源占用平衡。
向execution申请内存代码流程:
先获取Task目前已经分配到的内存。
当numBytes大于execution空闲内存,则会通过maybeGrowPool方法向storage借内存。
能获取的最大内存maxToGrant为numBytes和(maxMemoryPerTask - curMem)的较小值。
本次循环能获取真正的内存toGrant为maxToGrant和(execution向memory借用后可用的内存)的较小值。
若最终能申请的内存小于numBytes且申请的内存加上原来有的内存还不足以一个Task最小的使用内存minMemoryPerTask,则会阻塞,直到有足够的内存或者有新的Task进来减小了minMemoryPerTask的值。
否则直接返回本次分配到的内存。
对于向storage和execution申请内存以及相互借用内存的方式至此讲解完成。用到storage和execution内存的地方很多(看概述),其中缓存rdd会向storage申请内存,运行Task会向execution申请内存,接下来分别看看是在什么时候申请的。
缓存 RDD
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }
每个rdd分区的数据都是通过对应的迭代器得到,其中若存储级别不为NONE,则会先尝试从储存介质中(内存、磁盘文件等)获取,第一次获取当然都没有,只有先计算完缓存起来以供后续的计算直接获取。缓存序列化和非序列化的数据的缓存方式不一样,非序列化的缓存的代码是:
memoryStore.putIteratorAsValues(blockId, iterator(), classTag)
private[storage] def putIteratorAsValues[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Number of elements unrolled so far var elementsUnrolled = 0 // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory val memoryCheckPeriod = 16 // Memory currently reserved by this task for this particular unrolling operation var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 // Keep track of unroll memory used by this particular block / putIterator() operation var unrollMemoryUsedByThisBlock = 0L // Underlying vector for unrolling the block var vector = new SizeTrackingVector[T]()(classTag) // Request enough memory to begin unrolling keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") } else { unrollMemoryUsedByThisBlock += initialMemoryThreshold } // Unroll this block safely, checking whether we have exceeded our threshold periodically while (values.hasNext && keepUnrolling) { vector += values.next() if (elementsUnrolled % memoryCheckPeriod == 0) { // If our vector's size has exceeded the threshold, request more memory val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } // New threshold is currentSize * memoryGrowthFactor memoryThreshold += amountToRequest } } elementsUnrolled += 1 } if (keepUnrolling) { // We successfully unrolled the entirety of this block val arrayValues = vector.toArray vector = null val entry = new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) val size = entry.size def transferUnrollToStorage(amount: Long): Unit = { // Synchronize so that transfer is atomic memoryManager.synchronized { releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } } // Acquire storage memory if necessary to store this block in memory. val enoughStorageMemory = { if (unrollMemoryUsedByThisBlock <= size) { val acquiredExtra = memoryManager.acquireStorageMemory( blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) if (acquiredExtra) { transferUnrollToStorage(unrollMemoryUsedByThisBlock) } acquiredExtra } else { // unrollMemoryUsedByThisBlock > size // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) transferUnrollToStorage(size) true } } if (enoughStorageMemory) { entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled = arrayValues.toIterator, rest = Iterator.empty)) } } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, vector.estimateSize()) Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values)) } }
代码太长了,我自己看到都头大了,没事,咱一点一点的慢慢来~
参数中的blockId是一个block的唯一标示,格式是"rdd_" + rddId + "_" + splitIndex
,value就是该partition对应数据的迭代器,
通过reserveUnrollMemoryForThisTask方法向Storage申请initialMemoryThreshold(初始值可通过spark.storage.unrollMemoryThreshold配置,默认1M)的内存来unroll 迭代器:
def reserveUnrollMemoryForThisTask( blockId: BlockId, memory: Long, memoryMode: MemoryMode): Boolean = { memoryManager.synchronized { val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode) if (success) { val taskAttemptId = currentTaskAttemptId() val unrollMemoryMap = memoryMode match { case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap } unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory } success } }
跟进acquireUnrollMemory可看见底层调用的就是前面所讲的向storage申请内存的方法acquireStorageMemory,若申请成功则将对应的onHeapUnrollMemoryMap加上申请到的内存,即unroll使用的内存。
若申请成功则跟新unrollMemoryUsedByThisBlock的值,即在该block上unroll使用的内存。
接着进行遍历,停止遍历的条件有两个,一是迭代器全部遍历完,二是没有申请到内存。
每迭代一条数据都会加到SizeTrackingVector类型的vector中(底层由数组实现),每迭代16次都会估算vector的大小是否超过了memoryThreshold(申请的内存)。
若超过了memoryThreshold,则会计算再次申请内存的大小,1.5倍当前vector大小-已经申请到的内存大小。
再次向Storage申请内存,若申请成功,则跟新unrollMemoryUsedByThisBlock,继续遍历进入下次循环,否则停止遍历。
循环结束后,若keepUnrolling 为 true,则说明values 一定被全部展开了;若为false,则没有全部被展开,说明没有申请到足够的内存来展开这个values,意味着该partition缓存到内存失败。
在values全部成功展开的前提下,会将vector构造成一个DeserializedMemoryEntry对象,其中包括数据的大小,接着会将展开后的数据大小和申请的内存大小作比较:
若申请的内存比数据小,则再次向storage申请对应的大小,申请成功则将unroll使用的内存转化到storage中去,转化对应的逻辑是:释放掉该Task占用的所有unroll内存,又向storage申请对应的内存,其实unroll内存就是storage内存,即操作的都是storage的内存,减去某值又加上某值,结果没有变,但流程还得这么走,因为为了将 MemoryStore 和 MemoryManager 的解耦。
若申请的内存比数据大,则释放掉对应的unroll内存,接着将unroll使用的内存转化到storage中去。
最后将blockId和对应的entry加入到memorySore所管理的entries中去。
缓存序列化rdd支持 ON_HEAP 和 OFF_HEAP,和缓存非序列化rdd的方式类似,只是以流的形式写到bytebuffer中,其中
MemoryMode 如果是 ON_HEAP,这里的 ByteBuffer 是 HeapByteBuffer(堆上内存);而如果是 OFF_HEAP,这里的 ByteBuffer 则是 DirectByteBuffer(指向的是堆外内存)。最后根据数据构建成SerializedMemoryEntry来保存在memoryStore的entries中。
shuffle中execution内存的使用
在shuffle write的时候,并不会直接将数据写到磁盘(详情请看Shuffle Write解析),而是先写到一个集合中,此集合占用的内存就是execution内存,初始给的大小是5M,可通过spark.shuffle.spill.initialMemoryThreshold
进行设置,每写一次数据就判断是否需要溢写到磁盘,溢写之前还尝试会向execution申请来避免溢写,代码如下:
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = acquireMemory(amountToRequest) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory releaseMemory() } shouldSpill }
当insert&update的次数是32的倍数且当前集合的大小已经大于等于了已经申请到的内存,此时会尝试向execution申请更多的内存来避免spill,申请的大小为2倍当前集合大小减去已经申请到的内存大小,跟进acquireMemory方法:
public long acquireMemory(long size) { long granted = taskMemoryManager.acquireExecutionMemory(size, this); used += granted; return granted; }
这不就是我们前面讲的向execution申请内存的方法吗,这里就不再叙述。
作者:BIGUFO
链接:https://www.jianshu.com/p/4fdf1f595940