java - sobreescribir - Leer archivos de texto completos desde una compresión en Spark
modificar archivos txt en java (2)
Una ligera mejora en la respuesta aceptada es cambiar
Option(tar.getNextTarEntry)
a
Try(tar.getNextTarEntry).toOption.filter( _ != null)
lidiar con
.tar.gz
s malformados / truncados de una manera robusta.
Por cierto, ¿hay algo especial sobre el tamaño de la matriz de búfer?
¿Sería más rápido en promedio si estuviera más cerca del tamaño promedio del archivo, tal vez 500k en mi caso?
O es que la desaceleración que veo es más probable que la sobrecarga de
Stream
relación con un ciclo while que era más Java-ish, supongo.
Tengo el siguiente problema: suponga que tengo un directorio que contiene directorios comprimidos que contienen múltiples archivos, almacenados en HDFS. Quiero crear un RDD que consista en algunos objetos de tipo T, es decir:
context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
// The name of the file
String fileName = fileNameContent._1();
// The content of the file
String content = fileNameContent._2();
// Class T has a constructor of taking the filename and the content of each
// processed file (as two strings)
T t = new T(content, fileName);
return t;
});
Ahora, cuando
inputDataPath
es un directorio que contiene archivos, esto funciona perfectamente bien, es decir, cuando es algo como:
String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders
Pero, cuando hay un tgz que contiene varios archivos, el contenido del archivo (
fileNameContent._2()
) me da una cadena binaria inútil (bastante esperado).
Encontré una
pregunta similar sobre SO
, pero no es el mismo caso, porque la solución es cuando cada compresión consta de un solo archivo, y en mi caso hay muchos otros archivos que quiero leer individualmente como archivos completos.
También encontré una
question
sobre
wholeTextFiles
, pero esto no funciona en mi caso.
¿Alguna idea de como hacer esto?
EDITAR:
Lo intenté con el lector desde
here
(tratando de probarlo desde
here
, como en la función
testTarballWithFolders()
), pero cada vez que llamo
TarballReader tarballReader = new TarballReader(fileName);
y obtengo
NullPointerException
:
java.lang.NullPointerException
at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
at utils.TarballReader.<init>(TarballReader.java:61)
at main.SparkMain.lambda$0(SparkMain.java:105)
at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
La línea 105 en
MainSpark
es la que mostré arriba en mi edición de la publicación, y la línea 61 de
TarballReader
es
GZIPInputStream gzip = new GZIPInputStream(in);
que da un valor nulo para la secuencia de entrada en la línea superior:
InputStream in = this.getClass().getResourceAsStream(tarball);
¿Estoy en el camino correcto aquí? Si es así, ¿cómo continúo? ¿Por qué obtengo este valor nulo y cómo puedo solucionarlo?
Una posible solución es leer datos con
binaryFiles
y extraer contenido manualmente.
Scala :
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._
def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
Stream.continually(Option(tar.getNextTarEntry))
// Read until next exntry is null
.takeWhile(_.isDefined)
// flatten
.flatMap(x => x)
// Drop directories
.filter(!_.isDirectory)
.map(e => {
Stream.continually {
// Read n bytes
val buffer = Array.fill[Byte](n)(-1)
val i = tar.read(buffer, 0, n)
(i, buffer.take(i))}
// Take as long as we''ve read something
.takeWhile(_._1 > 0)
.map(_._2)
.flatten
.toArray})
.toArray
}
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) =
new String(bytes, StandardCharsets.UTF_8)
sc.binaryFiles("somePath").flatMapValues(x =>
extractFiles(x).toOption).mapValues(_.map(decode()))
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"
Ejemplo de uso completo con Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src
Python :
import tarfile
from io import BytesIO
def extractFiles(bytes):
tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
return [tar.extractfile(x).read() for x in tar if x.isfile()]
(sc.binaryFiles("somePath")
.mapValues(extractFiles)
.mapValues(lambda xs: [x.decode("utf-8") for x in xs]))