如果我缓存 Spark 数据帧,然后覆盖引用,原始数据帧是否仍会被缓存?

假设我有一个函数来生成一个(py)spark数据帧,将数据帧缓存到内存中作为最后一个操作。

def gen_func(inputs):
   df = ... do stuff...
   df.cache()
   df.count()
      return df

根据我的理解,Spark的缓存工作如下:

  1. 当在数据帧上调用一个动作()时,它将从其DAG计算并缓存到内存中,并附加到引用它的对象上。cache/persistcount()

  2. 只要存在对该对象的引用(可能在其他函数/其他作用域中),df 将继续缓存,并且依赖于 df 的所有 DAG 都将使用内存中缓存的数据作为起点。

  3. 如果删除了对 df 的所有引用,Spark 会将缓存作为要进行垃圾回收的内存。它可能不会立即被垃圾回收,导致一些短期内存块(特别是,如果您生成缓存数据并过快地丢弃它们,则会导致内存泄漏),但最终它将被清除。

我的问题是,假设我用于生成一个数据框,但随后覆盖原始数据框引用(可能带有a或a)。gen_funcfilterwithColumn

df=gen_func(inputs)
df=df.filter("some_col = some_val")

在 Spark 中,RDD/DF 是不可变的,因此在滤波器之后重新分配的 df 和在滤波器之前的 df 指的是两个完全不同的对象。在本例中,对原始 df 的引用已被覆盖。这是否意味着缓存的数据框不再可用,将被垃圾回收?这是否意味着新的后置过滤器将从头开始计算所有内容,尽管它是从以前缓存的数据帧生成的?cache/counteddf

我之所以问这个问题,是因为我最近修复了代码中的一些内存不足问题,在我看来,缓存可能是问题所在。但是,我还没有真正了解使用缓存的安全方法的全部细节,以及如何意外地使缓存的内存失效。在我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?


ITMISS
浏览 159回答 2
2回答

叮当猫咪

我做了几个实验,如下所示。显然,数据帧一旦缓存,就会保持缓存状态(如 和 查询计划等 所示),即使所有 Python 引用都被覆盖或完全删除,并且显式调用了垃圾回收。getPersistentRDDsInMemorydel实验 1:def func():    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')    data.cache()    data.count()    return datasc._jsc.getPersistentRDDs()df = func()sc._jsc.getPersistentRDDs()df2 = df.filter('col1 != 2')del dfimport gcgc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()df2.select('*').explain()del df2gc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()结果:>>> def func():...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')...     data.cache()...     data.count()...     return data...>>> sc._jsc.getPersistentRDDs(){}>>> df = func()>>> sc._jsc.getPersistentRDDs(){71: JavaObject id=o234}>>> df2 = df.filter('col1 != 2')>>> del df>>> import gc>>> gc.collect()93>>> sc._jvm.System.gc()>>> sc._jsc.getPersistentRDDs(){71: JavaObject id=o240}>>> df2.select('*').explain()== Physical Plan ==*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))+- *(1) ColumnarToRow   +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]         +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)               +- *(1) Project [_1#172L AS col1#174L]                  +- *(1) Scan ExistingRDD[_1#172L]>>> del df2>>> gc.collect()85>>> sc._jvm.System.gc()>>> sc._jsc.getPersistentRDDs(){71: JavaObject id=o250}实验 2:def func():    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')    data.cache()    data.count()    return datasc._jsc.getPersistentRDDs()df = func()sc._jsc.getPersistentRDDs()df = df.filter('col1 != 2')import gcgc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()df.select('*').explain()del dfgc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()结果:>>> def func():...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')...     data.cache()...     data.count()...     return data...>>> sc._jsc.getPersistentRDDs(){}>>> df = func()>>> sc._jsc.getPersistentRDDs(){86: JavaObject id=o317}>>> df = df.filter('col1 != 2')>>> import gc>>> gc.collect()244>>> sc._jvm.System.gc()>>> sc._jsc.getPersistentRDDs(){86: JavaObject id=o323}>>> df.select('*').explain()== Physical Plan ==*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))+- *(1) ColumnarToRow   +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]         +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)               +- *(1) Project [_1#218L AS col1#220L]                  +- *(1) Scan ExistingRDD[_1#218L]>>> del df>>> gc.collect()85>>> sc._jvm.System.gc()>>> sc._jsc.getPersistentRDDs(){86: JavaObject id=o333}实验3(对照实验,证明无孔徒有效)def func():    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')    data.cache()    data.count()    return datasc._jsc.getPersistentRDDs()df = func()sc._jsc.getPersistentRDDs()df2 = df.filter('col1 != 2')df2.select('*').explain()df.unpersist()df2.select('*').explain()结果:>>> def func():...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')...     data.cache()...     data.count()...     return data...>>> sc._jsc.getPersistentRDDs(){}>>> df = func()>>> sc._jsc.getPersistentRDDs(){116: JavaObject id=o398}>>> df2 = df.filter('col1 != 2')>>> df2.select('*').explain()== Physical Plan ==*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))+- *(1) ColumnarToRow   +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]         +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)               +- *(1) Project [_1#310L AS col1#312L]                  +- *(1) Scan ExistingRDD[_1#310L]>>> df.unpersist()DataFrame[col1: bigint]>>> sc._jsc.getPersistentRDDs(){}>>> df2.select('*').explain()== Physical Plan ==*(1) Project [_1#310L AS col1#312L]+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))   +- *(1) Scan ExistingRDD[_1#310L]回答OP的问题:这是否意味着缓存的数据框不再可用,将被垃圾回收?这是否意味着新的后置滤波器df将从头开始计算所有内容,尽管它是从以前缓存的数据帧生成的?实验表明两者都没有。数据帧保持缓存状态,不进行垃圾回收,并且根据查询计划使用缓存的(不可引用的)数据帧计算新数据帧。与缓存使用相关的一些有用功能(如果您不想通过 Spark UI 执行此操作)是:sc._jsc.getPersistentRDDs(),其中显示了缓存的 RDD/数据帧的列表,以及spark.catalog.clearCache(),这将清除所有缓存的 RDD/数据帧。我在执行上述操作时是否偏离了最佳实践?我没有权力对此进行判断,但正如其中一条评论所建议的那样,避免重新分配,因为数据帧是不可变的。试着想象你正在用scala编码,你被定义为.做是不可能的。Python本身无法强制执行,但我认为最佳做法是避免覆盖任何数据帧变量,这样,如果您不再需要缓存的结果,则可以随时调用。dfdfvaldf = df.filter(...)df.unpersist()

qq_遁去的一_1

想提出几点,希望能澄清Spark在缓存方面的行为。当您有df = ... do stuff...df.cache()df.count()...然后在应用程序中的其他位置   another_df = ... do *same* stuff...   another_df.*some_action()*...,您希望重用缓存的数据帧。毕竟,重用先前计算的结果是缓存的目标。意识到这一点,Spark开发人员决定使用分析的逻辑计划作为识别缓存数据帧的“关键”,而不是仅仅依赖于来自应用程序端的引用。在 Spark 中,CacheManager 是跟踪缓存计算的组件,按索引顺序排列:another_dfdfcachedData  /**   * Maintains the list of cached plans as an immutable sequence.  Any updates to the list   * should be protected in a "this.synchronized" block which includes the reading of the   * existing value and the update of the cachedData var.   */  @transient @volatile  private var cachedData = IndexedSeq[CachedData]()在查询规划期间(在缓存管理器阶段),将扫描此结构以查找正在分析的计划的所有子树,以查看是否已计算出其中的任何子树。如果找到匹配项,Spark 会将此子树替换为相应的 from 。InMemoryRelationcachedDatacache()(的简单同义词 ) 函数通过调用 cacheQuery(...) 来存储具有存储级别的数据帧persist()MEMORY_AND_DISKCacheManager      /**       * Caches the data produced by the logical representation of the given [[Dataset]].       * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because       * recomputing the in-memory columnar representation of the underlying table is expensive.       */      def cacheQuery(...请注意,这与使用级别的 RDD 缓存不同。一旦缓存了数据帧,它们就会保留在内存或本地执行器磁盘上缓存,直到它们被显式'ed',或者调用CacheManager。当执行程序存储内存完全填满时,缓存块开始使用 LRU(最近最少使用)推送到磁盘,但永远不会简单地“丢弃”。MEMORY_ONLYunpersistclearCache()顺便说一句,好问题...
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python