猿问

用火花-csv编写单个csv文件

用火花-csv编写单个csv文件

我在用https://github.com/databricks/spark-csv,我试图写一个CSV,但不能,它是一个文件夹。

需要一个Scala函数,它将接受像路径和文件名这样的参数,并编写那个CSV文件。


尚方宝剑之说
浏览 581回答 3
3回答

慕森王

它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果需要一个输出文件(仍在文件夹中),则可以repartition(如果上游数据很大,但需要洗牌,则首选):df   .repartition(1)    .write.format("com.databricks.spark.csv")    .option("header", "true")    .save("mydata.csv")或coalesce:df   .coalesce(1)    .write.format("com.databricks.spark.csv")    .option("header", "true")    .save("mydata.csv")保存前的数据帧:所有数据将写入mydata.csv/part-00000..在使用此选项之前确保您了解正在发生的事情,以及将所有数据传输给单个员工的成本。..如果使用带有复制的分布式文件系统,数据将被多次传输-首先获取到单个工作人员,然后通过存储节点分发。或者,您可以保留代码的原样,并使用通用工具,如cat或HDFSgetmerge然后简单地合并所有的部分。

HUWWW

如果您正在使用HDFS运行SPark,我一直在通过正常编写CSV文件和利用HDFS进行合并来解决这个问题。我是在星火(1.6)直接这样做的:import&nbsp;org.apache.hadoop.conf.Configurationimport&nbsp;org.apache.hadoop.fs._def&nbsp;merge(srcPath:&nbsp;String,&nbsp;dstPath:&nbsp;String):&nbsp;Unit&nbsp;=&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;val&nbsp;hadoopConfig&nbsp;=&nbsp;new&nbsp;Configuration() &nbsp;&nbsp;&nbsp;val&nbsp;hdfs&nbsp;=&nbsp;FileSystem.get(hadoopConfig) &nbsp;&nbsp;&nbsp;FileUtil.copyMerge(hdfs,&nbsp;new&nbsp;Path(srcPath),&nbsp;hdfs,&nbsp;new&nbsp;Path(dstPath),&nbsp;true,&nbsp;hadoopConfig,&nbsp;null)&nbsp; &nbsp;&nbsp;&nbsp;//&nbsp;the&nbsp;"true"&nbsp;setting&nbsp;deletes&nbsp;the&nbsp;source&nbsp;files&nbsp;once&nbsp;they&nbsp;are&nbsp;merged&nbsp;into&nbsp;the&nbsp;new&nbsp;output}val&nbsp;newData&nbsp;= &nbsp;&nbsp;&nbsp;&nbsp;<<&nbsp;create&nbsp;your&nbsp;dataframe&nbsp;>>val&nbsp;outputfile&nbsp;=&nbsp;"/user/feeds/project/outputs/subject"&nbsp;&nbsp; &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;filename&nbsp;=&nbsp;"myinsights"var&nbsp;outputFileName&nbsp;=&nbsp;outputfile&nbsp;+&nbsp;"/temp_"&nbsp;+&nbsp;filename&nbsp; var&nbsp;mergedFileName&nbsp;=&nbsp;outputfile&nbsp;+&nbsp;"/merged_"&nbsp;+&nbsp;filenamevar&nbsp;mergeFindGlob&nbsp;&nbsp;=&nbsp;outputFileName &nbsp;&nbsp;&nbsp;&nbsp;newData.write&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.format("com.databricks.spark.csv") &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.option("header",&nbsp;"false") &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.mode("overwrite") &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.save(outputFileName) &nbsp;&nbsp;&nbsp;&nbsp;merge(mergeFindGlob,&nbsp;mergedFileName&nbsp;) &nbsp;&nbsp;&nbsp;&nbsp;newData.unpersist()我不记得我是从哪里学到这个把戏的,但它可能对你有用。

慕妹3242003

我在这里可能有点晚了,但是.coalesce(1)或repartition(1)可能适用于小数据集,但大型数据集都将被抛到一个节点上的一个分区中。这可能会抛出OOM错误,或者充其量只能缓慢地处理。我强烈建议你使用FileUtil.copyMerge()函数来自HadoopAPI。这将把输出合并到一个文件中。编辑-这有效地将数据带给驱动程序,而不是执行者节点。Coalesce()如果单个执行器具有比驱动程序更多的RAM,就可以了。编辑2:copyMerge()在Hadoop3.0中被删除。有关如何使用最新版本的更多信息,请参见下面的堆栈溢出文章:Hadoop如何在Hadoop3.0中实现CopyMerge
随时随地看视频慕课网APP
我要回答