PySpark 的 DataFrame.show() 运行缓慢

新手在这里,我通过 JDBC 从 PySpark 中的 MySQL 读取了一个表(大约 200 万行)作为 Spark 的 DataFrame,并尝试显示前 10 行:


from pyspark.sql import SparkSession


spark_session = SparkSession.builder.master("local[4]").appName("test_log_processing").getOrCreate()

url = "jdbc:mysql://localhost:3306"

table = "test.fakelog"

properties = {"user": "myUser", "password": "********"}

df = spark_session.read.jdbc(url, table, properties=properties)

df.cache()

df.show(10)  # can't get the printed results, and runs pretty slow and consumes 90%+ CPU resources

spark_session.stop()

这是控制台日志:


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

[Stage 0:>                                                          (0 + 1) / 1]

我的教育背景是统计学,最近刚开始学习 Spark,所以我不知道代码背后发生了什么(对于较小的数据集,这很好用),我应该如何解决这个问题?或者我应该了解更多关于 Spark 的知识?


烙印99
浏览 137回答 2
2回答

慕后森

由于您spark.read.jdbc为某个表调用,spark 将尝试将整个表从数据库收集到 spark 中。之后,spark 缓存数据并从缓存中打印 10 个结果。如果您运行以下代码,您会注意到一些差异。spark_session = SparkSession.builder.master("local[4]").appName("test_log_processing").getOrCreate()url = "jdbc:mysql://localhost:3306"table = "(SELECT * FROM test.fakelog LIMIT 10) temp"properties = {"user": "myUser", "password": "********"}df = spark_session.read.jdbc(url, table, properties=properties)df.cache()df.show()spark_session.stop()

慕容森

也许您的内存缓存已被填满,缓存的默认值曾经只是内存(较旧的 spark 版本)。因此,您可以尝试使用 df.persist(StorageLevel.MEMORY_AND_DISK) 代替缓存。当内存太满时,它会溢出到磁盘。试试 .take(10),它会给出行的集合,它可能不会更快,但值得一试尝试 df.coalesce(50).persist(StorageLevel.MEMORY_AND_DISK),如果您有过度分区的数据帧,则无需洗牌即可正常工作如果这些都不起作用,则可能意味着您的计算集群无法处理此负载,您可能需要向外扩展。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python