txt texto sobreescribir separado por palabra modificar leer ejemplos como comas caracter archivos archivo abrir java hadoop apache-spark hdfs decompression compression

java - sobreescribir - Leer archivos de texto completos desde una compresión en Spark



modificar archivos txt en java (2)

Una ligera mejora en la respuesta aceptada es cambiar

Option(tar.getNextTarEntry)

a

Try(tar.getNextTarEntry).toOption.filter( _ != null)

lidiar con .tar.gz s malformados / truncados de una manera robusta.

Por cierto, ¿hay algo especial sobre el tamaño de la matriz de búfer? ¿Sería más rápido en promedio si estuviera más cerca del tamaño promedio del archivo, tal vez 500k en mi caso? O es que la desaceleración que veo es más probable que la sobrecarga de Stream relación con un ciclo while que era más Java-ish, supongo.

Tengo el siguiente problema: suponga que tengo un directorio que contiene directorios comprimidos que contienen múltiples archivos, almacenados en HDFS. Quiero crear un RDD que consista en algunos objetos de tipo T, es decir:

context = new JavaSparkContext(conf); JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath); JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath); JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> { // The name of the file String fileName = fileNameContent._1(); // The content of the file String content = fileNameContent._2(); // Class T has a constructor of taking the filename and the content of each // processed file (as two strings) T t = new T(content, fileName); return t; });

Ahora, cuando inputDataPath es un directorio que contiene archivos, esto funciona perfectamente bien, es decir, cuando es algo como:

String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders

Pero, cuando hay un tgz que contiene varios archivos, el contenido del archivo ( fileNameContent._2() ) me da una cadena binaria inútil (bastante esperado). Encontré una pregunta similar sobre SO , pero no es el mismo caso, porque la solución es cuando cada compresión consta de un solo archivo, y en mi caso hay muchos otros archivos que quiero leer individualmente como archivos completos. También encontré una question sobre wholeTextFiles , pero esto no funciona en mi caso.

¿Alguna idea de como hacer esto?

EDITAR:

Lo intenté con el lector desde here (tratando de probarlo desde here , como en la función testTarballWithFolders() ), pero cada vez que llamo

TarballReader tarballReader = new TarballReader(fileName);

y obtengo NullPointerException :

java.lang.NullPointerException at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83) at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77) at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91) at utils.TarballReader.<init>(TarballReader.java:61) at main.SparkMain.lambda$0(SparkMain.java:105) at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

La línea 105 en MainSpark es la que mostré arriba en mi edición de la publicación, y la línea 61 de TarballReader es

GZIPInputStream gzip = new GZIPInputStream(in);

que da un valor nulo para la secuencia de entrada en la línea superior:

InputStream in = this.getClass().getResourceAsStream(tarball);

¿Estoy en el camino correcto aquí? Si es así, ¿cómo continúo? ¿Por qué obtengo este valor nulo y cómo puedo solucionarlo?


Una posible solución es leer datos con binaryFiles y extraer contenido manualmente.

Scala :

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream import org.apache.commons.compress.archivers.tar.TarArchiveInputStream import org.apache.spark.input.PortableDataStream import scala.util.Try import java.nio.charset._ def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try { val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open)) Stream.continually(Option(tar.getNextTarEntry)) // Read until next exntry is null .takeWhile(_.isDefined) // flatten .flatMap(x => x) // Drop directories .filter(!_.isDirectory) .map(e => { Stream.continually { // Read n bytes val buffer = Array.fill[Byte](n)(-1) val i = tar.read(buffer, 0, n) (i, buffer.take(i))} // Take as long as we''ve read something .takeWhile(_._1 > 0) .map(_._2) .flatten .toArray}) .toArray } def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8) sc.binaryFiles("somePath").flatMapValues(x => extractFiles(x).toOption).mapValues(_.map(decode()))

libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"

Ejemplo de uso completo con Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Python :

import tarfile from io import BytesIO def extractFiles(bytes): tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz") return [tar.extractfile(x).read() for x in tar if x.isfile()] (sc.binaryFiles("somePath") .mapValues(extractFiles) .mapValues(lambda xs: [x.decode("utf-8") for x in xs]))