如何将CSV文件转换为RDD

我是新来的火花。我想对CSV记录中的特定数据执行一些操作。


我正在尝试读取CSV文件并将其转换为RDD。我的进一步操作基于CSV文件中提供的标题。


(摘自评论)到目前为止,这是我的代码:


final JavaRDD<String> File = sc.textFile(Filename).cache();

final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { 

    @Override public Iterable<String> call(String s) { 

    return Arrays.asList(EOL.split(s)); 

    } 

});

final String heading=lines.first().toString();

我可以获得这样的标题值。我想将此映射到CSV文件中的每个记录。


final String[] header=heading.split(" "); 

我可以获得这样的标题值。我想将此映射到CSV文件中的每个记录。


在Java中,我CSVReader record.getColumnValue(Column header)用来获取特定值。我需要做类似这里的事情。


浮云间
浏览 1291回答 3
3回答

哆啦的时光机

一种简单的方法是拥有一种保留标头的方法。假设您有一个file.csv,例如:user, topic, hitsom,&nbsp; scala, 120daniel, spark, 803754978, spark, 1我们可以定义一个标头类,该标头类使用第一行的解析版本:class SimpleCSVHeader(header:Array[String]) extends Serializable {&nbsp; val index = header.zipWithIndex.toMap&nbsp; def apply(array:Array[String], key:String):String = array(index(key))}我们可以使用该标头来处理以后的数据:val csv = sc.textFile("file.csv")&nbsp; // original fileval data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rowsval header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first lineval rows = data.filter(line => header(line,"user") != "user") // filter the header outval users = rows.map(row => header(row,"user")val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)...请注意,header仅仅不过是助记符到数组索引的简单映射。几乎所有这些操作都可以在数组中元素的顺序位置上完成,例如user = row(0)PS:欢迎来到Scala :-)
打开App,查看更多内容
随时随地看视频慕课网APP