一.此文档主题
本文档描述了操作符在内存中如何存储state,以确定运行application需要多少内存,并为executors规划适当的堆内存。
二.UnsafeRow对象的内存使用
UnsafeRow是Row的一种不安全的实现,它依靠原始内存而不是Java对象。State中存储的是key-value形式的数据,其原始对象为UnsafeRow,所以我们可能想了解UnsafeRow对象时如何消耗内存的,以便预测总的State将消耗多少内存。
原始内存结构
原始内存是按照这种格式组成的:[null bit set][values][variable length portion]
1.null bit set
用于null值跟踪
对齐到8 byte的字边界
每个字段存储1bit
2.values
每个字段存储一个8byte的字
如果字段类型是固定长度的原始类型(long、double、int等)
值是直接存储的
如果字段类型是非原始的或长度是可变的
该值是原始内存中实际值的引用信息
高32bits:相对偏移量(从基偏移量开始)
低32bits:实际值大小
3.variable length portion
非原始的或变长字段的原始格式存放在这里
举例说明,假设schema中的字段是(int、double、基于1000字节的字节数组的string),那么底层原始内存的长度将是(8+8+8+8+1000)=1032
三.不同操作符的State中存储的key/value的格式
1.StateStoreSaveExec
(1)操作符:agg(),count(),mean(),max(),avg(),min(),sum()
(2)key-value的格式
key:包含group-by字段的UnsafeRow
value:包含key字段和聚合结果字段的UnsafeRow
(3)注意:avg()(以及avg的别名)将保存两个字段-sum和count-存储到value中的UnsafeRow
2.FlatMapGroupsWithStateExec
(1)操作符:mapGroupsWithState()/flatMapGroupsWithState()
(2)key-value的格式
key:包含group-by字段的UnsafeRow
value:包含用户定义的中间数据模式字段的UnsafeRow
value中不再包含多余的key字段
3.StreamingSymmetricHashJoinExec
只用在stream-stream的join中,因为stream-static的join并不需要state。
Spark为每个输入端维护两个单独的状态:keyWithIndexToValue和keyWithNumValues,以存储具有相同join键值的多个输入行。
(1)操作符:stream-stream join
(2)keyWithIndexToValue中的key-value格式
key:包含join字段+索引号的UnsafeRow(从0开始)
value:输入行本身
(3)keyWithNumValues中的key-value格式
key:包含join字段的UnsafeRow
value:包含行计数的UnsafeRow(只有一个字段)
四.HDFSBackedStateStoreProvider存储多个版本对内存的影响
HDFSBackedStateStoreProvider是Apache Spark中StateStoreProvider的唯一实现类,它在内存中使用hash-map的方式存储状态的多个版本。
默认情况下,会存储超过100个版本。 当版本N的map被加载时,它将加载版本N-1的状态map,并对版本N-1的map进行浅拷贝(拷贝引用,不拷贝内容)。这种特性会带来以下需要注意的地方:
1.实际的key-value对象(UnsafeRow)会在多个批次的map的映射间共享,除非key被更新为新值。
2.批次之间的大量更新将阻止共享行对象的行为,且由于存储多个版本导致内存的大量占用。
3.每个版本都维护map的实体和引用。如果State中存储着大量的键值对,它们可能会造成巨大的内存开销。
作者:阿猫阿狗Hakuna
链接:https://www.jianshu.com/p/738f5d4a0f46