def create_df(src,header=None):
df =spark.read.csv(src, header=header)
return df
result = source_df.filter(f.col('Job_name') == job_name).select(source_df['dfname'],source_df['srcpath']).collect()
for x in result:
src=str('"' +x[1] + '"'.strip(' '))
src = str(src)
x[0] = create_df(src, header=True) //throwing an uft-8 encod
结果是一个包含 2 列的列表,称为 dfname 和源路径,需要循环结果列表并根据 dfname 值需要动态创建传递 df 名称。
| dfname | SPath |
|------------+--------------|
| Account_Df | s3://path... |
| ProdMet_Df | s3://path... |
基于df名称需要创建dfnames吗?
预期输出 Account_Df 和 ProdMet_Df 两个独立的 dfs。
回首忆惘然
相关分类