从Spark压缩中读取整个文本文件

我有以下问题:假设我有一个包含压缩目录的目录,该压缩目录包含多个文件,存储在HDFS上。我想创建一个包含一些T类型对象的RDD,即:


context = new JavaSparkContext(conf);

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);


JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);

JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {

    // The name of the file

    String fileName = fileNameContent._1();

    // The content of the file

    String content = fileNameContent._2();


    // Class T has a constructor of taking the filename and the content of each

    // processed file (as two strings)

    T t = new T(content, fileName);


    return t;

});

现在,当inputDataPath目录包含文件时,可以很好地工作,例如,当它类似于:


String inputDataPath =  "hdfs://some_path/*/*/"; // because it contains subfolders

但是,当一个tgz包含多个文件时,文件内容(fileNameContent._2())为我提供了一些无用的二进制字符串(相当不错)。我在SO上发现了类似的问题,但是情况不一样,因为解决方案是每次压缩仅包含一个文件,而在我的情况下,还有许多其他文件需要单独读取为整个文件。我还发现了有关的问题wholeTextFiles,但这在我的情况下不起作用。


任何想法如何做到这一点?


编辑:


我试图从读者在这里(试图从测试的读者在这里,就像在功能testTarballWithFolders()),但每当我打电话


TarballReader tarballReader = new TarballReader(fileName);

我得到NullPointerException:


java.lang.NullPointerException

    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)

    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)

    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)

    at utils.TarballReader.<init>(TarballReader.java:61)

    at main.SparkMain.lambda$0(SparkMain.java:105)

    at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source)

    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

    at scala.collection.Iterator$class.foreach(Iterator.scala:727)


第105行MainSpark是我在帖子编辑中显示的上方的行,而第61行TarballReader是


GZIPInputStream gzip = new GZIPInputStream(in);

in上面一行输入流的值为空:


InputStream in = this.getClass().getResourceAsStream(tarball);

我在正确的道路上吗?如果是这样,我如何继续?为什么我得到这个空值,我该如何解决?


冉冉说
浏览 700回答 2
2回答

守着一只汪

可接受答案的一个小改进是更改Option(tar.getNextTarEntry)至Try(tar.getNextTarEntry).toOption.filter( _ != null)以.tar.gz健壮的方式应对格式错误/截断的。顺便说一句,缓冲区数组的大小有什么特别之处吗?如果接近平均文件大小(在我的情况下可能是500k),平均速度会更快吗?我猜是Stream相对于whileJava式的循环而言,我看到的是速度下降还是更可能是相对而言的开销。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java