compression - Soporte de cremallera en Apache Spark
zip apache-spark (5)
He leído sobre el soporte de Spark para los archivos de entrada gzip- kind aquí , y me pregunto si existe la misma compatibilidad para diferentes tipos de archivos comprimidos, como archivos .zip . Hasta ahora he intentado calcular un archivo comprimido bajo un archivo zip , pero Spark parece no poder leer su contenido con éxito.
He echado un vistazo al newAPIHadoopFile
Hadoop y al newAPIHadoopRDD
, pero hasta ahora no he podido obtener nada que funcione.
Además, Spark admite la creación de una partición para cada archivo en una carpeta específica, como en el siguiente ejemplo:
SparkConf SpkCnf = new SparkConf().setAppName("SparkApp")
.setMaster("local[4]");
JavaSparkContext Ctx = new JavaSparkContext(SpkCnf);
JavaRDD<String> FirstRDD = Ctx.textFile("C:/input/).cache();
Donde C:/input/
apunta a un directorio con múltiples archivos.
En el caso de que sea posible computar archivos comprimidos , ¿también sería posible empacar cada archivo en un único archivo comprimido y seguir el mismo patrón de una partición por archivo ?
Spark soporte predeterminado archivos comprimidos
De acuerdo con la Guía de programación Spark
Todos los métodos de entrada basados en archivos de Spark, incluido el archivo de texto, admiten la ejecución en directorios, archivos comprimidos y comodines también. Por ejemplo, puede usar textFile ("/ my / directory"), textFile ("/ my / directory / .txt") y textFile ("/ my / directory / .gz").
Esto podría ampliarse proporcionando información sobre los formatos de compresión admitidos por Hadoop, que básicamente se puede comprobar encontrando todas las clases que extienden CompressionCodec
( docs )
name | ext | codec class
-------------------------------------------------------------
bzip2 | .bz2 | org.apache.hadoop.io.compress.BZip2Codec
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec
gzip | .gz | org.apache.hadoop.io.compress.GzipCodec
lz4 | .lz4 | org.apache.hadoop.io.compress.Lz4Codec
snappy | .snappy | org.apache.hadoop.io.compress.SnappyCodec
Fuente: enumere los códecs hadoop disponibles
Así que los formatos anteriores y muchas más posibilidades se pueden lograr simplemente llamando:
sc.readFile(path)
Lectura de archivos zip en Spark
Desafortunadamente, zip
no está en la lista compatible por defecto.
Encontré un gran artículo: Hadoop: Procesando archivos ZIP en Mapa / Reducir y algunas respuestas ( ejemplo ) explicando cómo usar ZipFileInputFormat
importado junto con la API sc.newAPIHadoopFile
. Pero esto no funcionó para mí .
Mi solución
Sin dependencias externas, puede cargar su archivo con sc.binaryFiles
y luego descomprimir el PortableDataStream
leyendo el contenido. Este es el enfoque que he elegido.
import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {
def readFile(path: String,
minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {
if (path.endsWith(".zip")) {
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
// this solution works only for single file in the zip
val entry = zis.getNextEntry
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
} else {
sc.textFile(path, minPartitions)
}
}
}
utilizando esta clase implícita, debe importarla y llamar al método SparkContext
en SparkContext
:
import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)
Y la clase implícita cargará su archivo zip
correctamente y devolverá RDD[String]
como solía hacerlo.
Nota: ¡Esto solo funciona para un solo archivo en el archivo zip!
Para múltiples archivos en su soporte zip, marque esta respuesta: https://.com/a/45958458/1549135
A continuación se muestra un ejemplo en el que se buscan archivos .zip en un directorio y se crea un RDD con un FileInputFormat personalizado denominado ZipFileInputFormat
y la nueva API API de APIAP en el contexto de Spark. A continuación, escribe esos archivos en un directorio de salida.
allzip.foreach { x =>
val zipFileRDD = sc.newAPIHadoopFile(
x.getPath.toString,
classOf[ZipFileInputFormat],
classOf[Text],
classOf[BytesWritable], hadoopConf)
zipFileRDD.foreach { y =>
ProcessFile(y._1.toString, y._2)
}
El archivo ZipFileInputFormat utilizado en el ejemplo se puede encontrar aquí: https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop
Como Apache Spark usa los formatos de entrada de Hadoop, podemos consultar la documentación de hadoop sobre cómo procesar archivos zip y ver si hay algo que funcione.
Este sitio nos da una idea de cómo usar esto (es decir, podemos usar ZipFileInputFormat). Dicho esto, dado que los archivos zip no son de tabla dividida (ver esto ), su solicitud para tener un solo archivo comprimido no es realmente compatible. En cambio, si es posible, sería mejor tener un directorio que contenga muchos archivos zip separados.
Esta pregunta es similar a esta otra pregunta , sin embargo, agrega una pregunta adicional de si sería posible tener un solo archivo zip (que, dado que no es un formato de tabla dividida, no es una buena idea).
Puede usar sc.binaryFiles para abrir el archivo zip en formato binario y luego descomprimirlo en el formato de texto. Lamentablemente, el archivo comprimido no se puede dividir. Por lo tanto, debe esperar a la descompresión, y luego llamar aleatoriamente para equilibrar los datos en cada partición.
Aquí hay un ejemplo en Python. Más información está en http://gregwiki.duckdns.org/index.php/2016/04/11/read-zip-file-in-spark/
file_RDD = sc.binaryFiles( HDFS_path + data_path )
def Zip_open( binary_stream_string ) : # New version, treat a stream as zipped file
try :
pseudo_file = io.BytesIO( binary_stream_string )
zf = zipfile.ZipFile( pseudo_file )
return zf
except :
return None
def read_zip_lines(zipfile_object) :
file_iter = zipfile_object.open(''diff.txt'')
data = file_iter.readlines()
return data
My_RDD = file_RDD.map(lambda kv: (kv[0], Zip_open(kv[1])))
Puede usar sc.binaryFiles para leer Zip como archivo binario
val rdd = sc.binaryFiles(path).flatMap { case (name: String, content: PortableDataStream) => new ZipInputStream(content.open) } //=> RDD[ZipInputStream]
Y luego puede asignar el ZipInputStream a la lista de líneas:
val zis = rdd.first val entry = zis.getNextEntry val br = new BufferedReader(new InputStreamReader(in, "UTF-8")) val res = Stream.continually(br.readLine()).takeWhile(_ != null).toList
Pero el problema sigue siendo que el archivo zip no es divisible.