Чтение целых текстовых файлов из сжатия в Spark

У меня есть следующая проблема: предположим, что у меня есть каталог, содержащий сжатые каталоги, которые содержат несколько файлов, хранящихся на HDFS. Я хочу создать RDD, состоящий из некоторых объектов типа T, то есть:

context = new JavaSparkContext(conf); JavaPairRDD filesRDD = context.wholeTextFiles(inputDataPath); JavaPairRDD filesRDD = context.wholeTextFiles(inputDataPath); JavaRDD processingFiles = filesRDD.map(fileNameContent -> { // The name of the file String fileName = fileNameContent._1(); // The content of the file String content = fileNameContent._2(); // Class T has a constructor of taking the filename and the content of each // processed file (as two strings) T t = new T(content, fileName); return t; }); 

Теперь, когда inputDataPath – это каталог, содержащий файлы, это отлично работает, т. inputDataPath Когда это что-то вроде:

 String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders 

Но, когда есть tgz, содержащий несколько файлов, содержимое файла ( fileNameContent._2() ) получает мне бесполезную двоичную строку (вполне ожидаемый). Я нашел аналогичный вопрос о SO , но это не тот случай, потому что там решение состоит в том, что каждое сжатие состоит только из одного файла, и в моем случае есть много других файлов, которые я хочу читать отдельно как целые файлы. Я также нашел вопрос о wholeTextFiles , но это не работает в моем случае.

Есть идеи, как это сделать?

РЕДАКТИРОВАТЬ:

Я попробовал с читателем отсюда (пытаясь проверить читателя здесь , как в функции testTarballWithFolders() ), но всякий раз, когда я звоню

 TarballReader tarballReader = new TarballReader(fileName); 

и я получаю NullPointerException :

 java.lang.NullPointerException at java.util.zip.InflaterInputStream.(InflaterInputStream.java:83) at java.util.zip.GZIPInputStream.(GZIPInputStream.java:77) at java.util.zip.GZIPInputStream.(GZIPInputStream.java:91) at utils.TarballReader.(TarballReader.java:61) at main.SparkMain.lambda$0(SparkMain.java:105) at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

Строка 105 в MainSpark является той, которую я показал выше в моем редактировании сообщения, а строка 61 из TarballReader

 GZIPInputStream gzip = new GZIPInputStream(in); 

который дает нулевое значение входного streamа in верхней строке:

 InputStream in = this.getClass().getResourceAsStream(tarball); 

Я на правильном пути здесь? Если да, то как мне продолжить? Почему я получаю это значение null и как его исправить?

Одним из возможных решений является считывание данных с помощью binaryFiles и извлечение содержимого вручную.

Скала :

 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream import org.apache.commons.compress.archivers.tar.TarArchiveInputStream import org.apache.spark.input.PortableDataStream import scala.util.Try import java.nio.charset._ def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try { val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open)) Stream.continually(Option(tar.getNextTarEntry)) // Read until next exntry is null .takeWhile(_.isDefined) // flatten .flatMap(x => x) // Drop directories .filter(!_.isDirectory) .map(e => { Stream.continually { // Read n bytes val buffer = Array.fill[Byte](n)(-1) val i = tar.read(buffer, 0, n) (i, buffer.take(i))} // Take as long as we've read something .takeWhile(_._1 > 0) .map(_._2) .flatten .toArray}) .toArray } def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8) sc.binaryFiles("somePath").flatMapValues(x => extractFiles(x).toOption).mapValues(_.map(decode())) 
 libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11" 

Полный пример использования с Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Python :

 import tarfile from io import BytesIO def extractFiles(bytes): tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz") return [tar.extractfile(x).read() for x in tar if x.isfile()] (sc.binaryFiles("somePath") .mapValues(extractFiles) .mapValues(lambda xs: [x.decode("utf-8") for x in xs])) 
Давайте будем гением компьютера.