猿问

优化 pyspark 中的行访问和转换

我在 S3 存储桶中有一个以 jason 形式存在的大型数据集 (5GB)。我需要转换数据的模式,并使用 ETL 脚本将转换后的数据写回 S3。


所以我使用爬虫来检测架构并将数据加载到 pyspark 数据框中,然后更改架构。现在我遍历数据框中的每一行并将其转换为字典。删除空列,然后将字典转换为字符串并写回 S3。以下是代码:


#df is the pyspark dataframe

columns = df.columns

print(columns)

s3 = boto3.resource('s3')

cnt = 1


for row in df.rdd.toLocalIterator():

    data = row.asDict(True)


    for col_name in columns:

        if data[col_name] is None:

            del data[col_name]


    content = json.dumps(data)

    object = s3.Object('write-test-transaction-transformed', str(cnt)).put(Body=content)

    cnt = cnt+1

print(cnt)

我用过 toLocalIterator。上面代码的执行是串行执行的吗?如果是那么如何优化它?有没有更好的方法来执行上述逻辑?


慕尼黑5688855
浏览 153回答 3
3回答

波斯汪

假设,数据集中的每一行都是 json 字符串格式import pyspark.sql.functions as Fdef drop_null_cols(data):    import json    content = json.loads(data)    for key, value in list(content.items()):        if value is None:            del content[key]    return json.dumps(content)drop_null_cols_udf = F.udf(drop_null_cols, F.StringType())df = spark.createDataFrame(    ["{\"name\":\"Ranga\", \"age\":25, \"city\":\"Hyderabad\"}",     "{\"name\":\"John\", \"age\":null, \"city\":\"New York\"}",     "{\"name\":null, \"age\":31, \"city\":\"London\"}"],    "string").toDF("data")df.select(    drop_null_cols_udf("data").alias("data")).show(10,False)如果输入数据框有 cols 并且输出只需要不是 null cols jsondf = spark.createDataFrame(        [('Ranga', 25, 'Hyderabad'),         ('John', None, 'New York'),         (None, 31, 'London'),        ],        ['name', 'age', 'city']    )df.withColumn(    "data", F.to_json(F.struct([x for x in df.columns]))).select(    drop_null_cols_udf("data").alias("data")).show(10, False)#df.write.format("csv").save("s3://path/to/file/) -- save to s3结果+-------------------------------------------------+|data                                             |+-------------------------------------------------+|{"name": "Ranga", "age": 25, "city": "Hyderabad"}||{"name": "John", "city": "New York"}             ||{"age": 31, "city": "London"}                    |+-------------------------------------------------+

智慧大石

我将遵循以下方法(用 scala 编写,但可以在 python 中以最小的变化实现)-找到数据集计数并将其命名为totalCountval totalcount = inputDF.count()查找count(col)所有数据框列,并将字段映射到它们的计数这里对于输入数据框的所有列,计算计数请注意,count(anycol)返回提供的列全部非空的行数。例如 - 如果一列有 10 行值,如果说有 5 个值,null则计数(列)变为 5获取第一行Map[colName, count(colName)]称为fieldToCountval cols = inputDF.columns.map { inputCol =>&nbsp; &nbsp; &nbsp; functions.count(col(inputCol)).as(inputCol)&nbsp; &nbsp; }// Returns the number of rows for which the supplied column are all non-null.&nbsp; &nbsp; // count(null) returns 0&nbsp; &nbsp; val row = dataset.select(cols: _*).head()&nbsp; &nbsp; val fieldToCount = row.getValuesMap[Long]($(inputCols))获取要删除的列在此处使用步骤#2 中创建的 Map,并将计数小于 totalCount 的列标记为要删除的列从输入数据框中选择所有列,并count == totalCount根据要求以任何格式将处理后的输出数据框保存在任何地方。请注意,this approach will remove all the column having at least one null valueval fieldToBool = fieldToCount.mapValues(_ < totalcount)val processedDF = inputDF.select(fieldToBool.filterNot(_._2).map(_.1) :_*)// save this processedDF anywhere in any format as per requirement我相信这种方法会比您目前使用的方法表现更好

Cats萌萌

我解决了上面的问题。我们可以简单地查询数据框的空值。df = df.filter(df.column.isNotNull()) 从而删除所有存在 null 的行。所以如果有 n 列,我们需要 2^n 次查询来筛选出所有可能的组合。在我的例子中,有 10 列,所以总共有 1024 个查询,这是可以接受的,因为 sql 查询是并行化的。
随时随地看视频慕课网APP

相关分类

Python
我要回答