Spark - 将CSV文件加载为DataFrame?

Spark - 将CSV文件加载为DataFrame?

我想在spark中读取CSV并将其转换为DataFrame并将其存储在HDFS中 df.registerTempTable("table_name")

我试过了:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

我得到的错误:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

在Apache Spark中将CSV文件作为DataFrame加载的正确命令是什么?


慕工程0101907
浏览 1650回答 3
3回答

PIPIONE

spark-csv是Spark核心功能的一部分,不需要单独的库。所以你可以这样做df = spark.read.format("csv").option("header", "true").load("csvfile.csv")在scala中,(这适用于任何格式的分隔符提及“,”用于csv,“\ t”用于tsv等) val df = sqlContext.read.format("com.databricks.spark.csv")    .option("delimiter", ",")    .load("csvfile.csv")

慕仙森

它的Hadoop是2.6,Spark是1.6,没有“databricks”包。import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};import org.apache.spark.sql.Row;val csv = sc.textFile("/path/to/file.csv")val rows = csv.map(line => line.split(",").map(_.trim))val header = rows.firstval data = rows.filter(_(0) != header(0))val rdd = data.map(row => Row(row(0),row(1).toInt))val schema = new StructType()     .add(StructField("id", StringType, true))     .add(StructField("val", IntegerType, true))val df = sqlContext.createDataFrame(rdd, schema)
打开App,查看更多内容
随时随地看视频慕课网APP