叮当猫咪
我做了几个实验,如下所示。显然,数据帧一旦缓存,就会保持缓存状态(如 和 查询计划等 所示),即使所有 Python 引用都被覆盖或完全删除,并且显式调用了垃圾回收。getPersistentRDDsInMemorydel实验 1:def func(): data = spark.createDataFrame([[1],[2],[3]]).toDF('col1') data.cache() data.count() return datasc._jsc.getPersistentRDDs()df = func()sc._jsc.getPersistentRDDs()df2 = df.filter('col1 != 2')del dfimport gcgc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()df2.select('*').explain()del df2gc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()结果:>>> def func():... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')... data.cache()... data.count()... return data...>>> sc._jsc.getPersistentRDDs(){}>>> df = func()>>> sc._jsc.getPersistentRDDs(){71: JavaObject id=o234}>>> df2 = df.filter('col1 != 2')>>> del df>>> import gc>>> gc.collect()93>>> sc._jvm.System.gc()>>> sc._jsc.getPersistentRDDs(){71: JavaObject id=o240}>>> df2.select('*').explain()== Physical Plan ==*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))+- *(1) ColumnarToRow +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)] +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [_1#172L AS col1#174L] +- *(1) Scan ExistingRDD[_1#172L]>>> del df2>>> gc.collect()85>>> sc._jvm.System.gc()>>> sc._jsc.getPersistentRDDs(){71: JavaObject id=o250}实验 2:def func(): data = spark.createDataFrame([[1],[2],[3]]).toDF('col1') data.cache() data.count() return datasc._jsc.getPersistentRDDs()df = func()sc._jsc.getPersistentRDDs()df = df.filter('col1 != 2')import gcgc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()df.select('*').explain()del dfgc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()结果:>>> def func():... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')... data.cache()... data.count()... return data...>>> sc._jsc.getPersistentRDDs(){}>>> df = func()>>> sc._jsc.getPersistentRDDs(){86: JavaObject id=o317}>>> df = df.filter('col1 != 2')>>> import gc>>> gc.collect()244>>> sc._jvm.System.gc()>>> sc._jsc.getPersistentRDDs(){86: JavaObject id=o323}>>> df.select('*').explain()== Physical Plan ==*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))+- *(1) ColumnarToRow +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)] +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [_1#218L AS col1#220L] +- *(1) Scan ExistingRDD[_1#218L]>>> del df>>> gc.collect()85>>> sc._jvm.System.gc()>>> sc._jsc.getPersistentRDDs(){86: JavaObject id=o333}实验3(对照实验,证明无孔徒有效)def func(): data = spark.createDataFrame([[1],[2],[3]]).toDF('col1') data.cache() data.count() return datasc._jsc.getPersistentRDDs()df = func()sc._jsc.getPersistentRDDs()df2 = df.filter('col1 != 2')df2.select('*').explain()df.unpersist()df2.select('*').explain()结果:>>> def func():... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')... data.cache()... data.count()... return data...>>> sc._jsc.getPersistentRDDs(){}>>> df = func()>>> sc._jsc.getPersistentRDDs(){116: JavaObject id=o398}>>> df2 = df.filter('col1 != 2')>>> df2.select('*').explain()== Physical Plan ==*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))+- *(1) ColumnarToRow +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)] +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [_1#310L AS col1#312L] +- *(1) Scan ExistingRDD[_1#310L]>>> df.unpersist()DataFrame[col1: bigint]>>> sc._jsc.getPersistentRDDs(){}>>> df2.select('*').explain()== Physical Plan ==*(1) Project [_1#310L AS col1#312L]+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2)) +- *(1) Scan ExistingRDD[_1#310L]回答OP的问题:这是否意味着缓存的数据框不再可用,将被垃圾回收?这是否意味着新的后置滤波器df将从头开始计算所有内容,尽管它是从以前缓存的数据帧生成的?实验表明两者都没有。数据帧保持缓存状态,不进行垃圾回收,并且根据查询计划使用缓存的(不可引用的)数据帧计算新数据帧。与缓存使用相关的一些有用功能(如果您不想通过 Spark UI 执行此操作)是:sc._jsc.getPersistentRDDs(),其中显示了缓存的 RDD/数据帧的列表,以及spark.catalog.clearCache(),这将清除所有缓存的 RDD/数据帧。我在执行上述操作时是否偏离了最佳实践?我没有权力对此进行判断,但正如其中一条评论所建议的那样,避免重新分配,因为数据帧是不可变的。试着想象你正在用scala编码,你被定义为.做是不可能的。Python本身无法强制执行,但我认为最佳做法是避免覆盖任何数据帧变量,这样,如果您不再需要缓存的结果,则可以随时调用。dfdfvaldf = df.filter(...)df.unpersist()