我有以下问题:假设我有一个包含压缩目录的目录,该压缩目录包含多个文件,存储在HDFS上。我想创建一个包含一些T类型对象的RDD,即:
context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
// The name of the file
String fileName = fileNameContent._1();
// The content of the file
String content = fileNameContent._2();
// Class T has a constructor of taking the filename and the content of each
// processed file (as two strings)
T t = new T(content, fileName);
return t;
});
现在,当inputDataPath目录包含文件时,可以很好地工作,例如,当它类似于:
String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders
但是,当一个tgz包含多个文件时,文件内容(fileNameContent._2())为我提供了一些无用的二进制字符串(相当不错)。我在SO上发现了类似的问题,但是情况不一样,因为解决方案是每次压缩仅包含一个文件,而在我的情况下,还有许多其他文件需要单独读取为整个文件。我还发现了有关的问题wholeTextFiles,但这在我的情况下不起作用。
任何想法如何做到这一点?
编辑:
我试图从读者在这里(试图从测试的读者在这里,就像在功能testTarballWithFolders()),但每当我打电话
TarballReader tarballReader = new TarballReader(fileName);
我得到NullPointerException:
java.lang.NullPointerException
at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
at utils.TarballReader.<init>(TarballReader.java:61)
at main.SparkMain.lambda$0(SparkMain.java:105)
at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
第105行MainSpark是我在帖子编辑中显示的上方的行,而第61行TarballReader是
GZIPInputStream gzip = new GZIPInputStream(in);
in上面一行输入流的值为空:
InputStream in = this.getClass().getResourceAsStream(tarball);
我在正确的道路上吗?如果是这样,我如何继续?为什么我得到这个空值,我该如何解决?
守着一只汪
相关分类