我在 python 2.7 中编写了一个脚本,使用 pyspark 将 csv 转换为镶木地板和其他东西。当我在小数据上运行我的脚本时,它运行良好,但是当我在更大的数据 (250GB) 上运行时,我发现了以下错误 - 总分配超过了堆内存的 95.00%(960,285,889 字节)。我该如何解决这个问题?它发生的原因是什么?天!
部分代码:导入的库: import pyspark as ps
from pyspark.sql.types import StructType, StructField, IntegerType,
DoubleType, StringType, TimestampType,LongType,FloatType
from collections import OrderedDict
from sys import argv
使用pyspark:
schema_table_name="schema_"+str(get_table_name())
print (schema_table_name)
schema_file= OrderedDict()
schema_list=[]
ddl_to_schema(data)
for i in schema_file:
schema_list.append(StructField(i,schema_file[i]()))
schema=StructType(schema_list)
print schema
spark = ps.sql.SparkSession.builder.getOrCreate()
df = spark.read.option("delimiter",
",").format("csv").schema(schema).option("header", "false").load(argv[2])
df.write.parquet(argv[3])
# df.limit(1500).write.jdbc(url = url, table = get_table_name(), mode =
"append", properties = properties)
# df = spark.read.jdbc(url = url, table = get_table_name(), properties =
properties)
pq = spark.read.parquet(argv[3])
pq.show()
只是为了澄清 schema_table_name 是为了保存所有表名称(在适合 csv 的 DDL 中)。
函数 ddl_to_schema 只需采用常规 ddl 并将其编辑为 parquet 可以使用的 ddl。
子衿沉夜
相关分类