一、spark普通shuffle的基本原理
1、假如现在在一个节点上由4个shufflemapTask在执行,但是这个节点的core的数量数2,在远端有4个resultTask等待接收shuffleMapTask的数据进行处理
2、这样可以有两个shufflemaptask可以同时执行,在每一个shufflemaptask下面都会产生4个bucket,这是为什么呢,因为每一个shufflemaptask都会为每一个resulttask建立一个数据分区,但是这个bucket是在内存中的当数量达到一定的阈值的时候就会把数据写入本地的磁盘当中也就是shuffleblockfile。
3、shufflemaptask的输出会作为mapstatus发送到DAGscheduler上面mapoutputTracker上面的Master上面去。
4、在resultTask需要拉取数据的时候会去找mapstatus然后使用BlockManager把数据拉取到本地。(到这儿有没有觉得这和MapReduce的执行过程简直就是一样的,其实不然他们还是有那么一点区别,MapReduce在shuffle阶段需要把数据完全存储完之后才把reduce采取拉取数据,但是spark的shuffle阶段不需要这样shufflemaptask可以一边把数据写入本地的缓存,resultTask可以一边读取数据,这样的操作的速度是不是比mapreduce会,这是为什么呢,因为在hadoop的MapReduce阶段存在在分区内按照key排序,这就是为啥不能像spark的shuffle的原因)
5、说了这么多还不如看看图
6、存在的问题:
假如有1000个shufflemaptask,1000个resultTask那么就会产生100万个磁盘文件,这样在会进行多次的磁盘io,由于磁盘io速度很慢,这样磁盘io就会严重的降低了整个系统的性能。
二、优化后的shuffle原理
1、可以自定义是否在map端进行聚合排序等操作
override def write(records: Iterator[Product2[K, V]]): Unit = {
//是否在map端进行排序
//mapSideCombine如果这个值为真就在咋map端进行合并操作,比如现在map端的输出是
//hello 1 hello 1 经过聚合之后的数据就是hello 2
sorter = if (dep.mapSideCombine) {
//aggregator这是我们定义的算子排序的函数比如reduceByKey等等
//要求这个值为真才进行按照我们的逻辑进行聚合的操作
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
//在map端的分区类按照key进行聚合
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
//如果为假就忽略不用按照key进行聚合排序
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
//得到上面是否在map端进行聚合的数据
sorter. (records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
//从shuffle的map输出的文件中去拉取数据
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
//得到block的id也就是
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
//按照id把文件写入本地不同的分类区中
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
//提交开始写文件
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
//把完成后的信息写入mapStatus中
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}2、采用类似拉链的操作去存储数据,只需记录下数据的开始和结束的位置
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {
//得到目前要写文件的下标
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
//得到开始写数据的位置
offset += length
out.writeLong(offset)
}
} {
out.close()
}
val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
// so just use the existing partition lengths and delete our temporary map outputs.
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
indexTmp.delete()
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
} finally {
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
}
}
} def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
//每一个分区还剩内存的数目的多少
val lengths = new Array[Long](numPartitions)
//得到一个写文件的句柄
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics)
if (spills.isEmpty) {
// Case where we only have in-memory data
//如果我们定义了数据聚合的算子就为真,也就是需要用map对数据进行处理
//如果没有定义那么就直接用buffer存储数据
val collection = if (aggregator.isDefined) map else buffer
//得到一个写数据的迭代器便于把数据都写入文件中去
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
//得到一个端的地址
val segment = writer.commitAndGet()
lengths(partitionId) = segment.length
}
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
val segment = writer.commitAndGet()
lengths(id) = segment.length
}
}
}
writer.close()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}3、示意图