compression zip apache-spark

compression - Soporte de cremallera en Apache Spark



zip apache-spark (5)

He leído sobre el soporte de Spark para los archivos de entrada gzip- kind aquí , y me pregunto si existe la misma compatibilidad para diferentes tipos de archivos comprimidos, como archivos .zip . Hasta ahora he intentado calcular un archivo comprimido bajo un archivo zip , pero Spark parece no poder leer su contenido con éxito.

He echado un vistazo al newAPIHadoopFile Hadoop y al newAPIHadoopRDD , pero hasta ahora no he podido obtener nada que funcione.

Además, Spark admite la creación de una partición para cada archivo en una carpeta específica, como en el siguiente ejemplo:

SparkConf SpkCnf = new SparkConf().setAppName("SparkApp") .setMaster("local[4]"); JavaSparkContext Ctx = new JavaSparkContext(SpkCnf); JavaRDD<String> FirstRDD = Ctx.textFile("C:/input/).cache();

Donde C:/input/ apunta a un directorio con múltiples archivos.

En el caso de que sea posible computar archivos comprimidos , ¿también sería posible empacar cada archivo en un único archivo comprimido y seguir el mismo patrón de una partición por archivo ?


Spark soporte predeterminado archivos comprimidos

De acuerdo con la Guía de programación Spark

Todos los métodos de entrada basados ​​en archivos de Spark, incluido el archivo de texto, admiten la ejecución en directorios, archivos comprimidos y comodines también. Por ejemplo, puede usar textFile ("/ my / directory"), textFile ("/ my / directory / .txt") y textFile ("/ my / directory / .gz").

Esto podría ampliarse proporcionando información sobre los formatos de compresión admitidos por Hadoop, que básicamente se puede comprobar encontrando todas las clases que extienden CompressionCodec ( docs )

name | ext | codec class ------------------------------------------------------------- bzip2 | .bz2 | org.apache.hadoop.io.compress.BZip2Codec default | .deflate | org.apache.hadoop.io.compress.DefaultCodec deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec gzip | .gz | org.apache.hadoop.io.compress.GzipCodec lz4 | .lz4 | org.apache.hadoop.io.compress.Lz4Codec snappy | .snappy | org.apache.hadoop.io.compress.SnappyCodec

Fuente: enumere los códecs hadoop disponibles

Así que los formatos anteriores y muchas más posibilidades se pueden lograr simplemente llamando:

sc.readFile(path)

Lectura de archivos zip en Spark

Desafortunadamente, zip no está en la lista compatible por defecto.

Encontré un gran artículo: Hadoop: Procesando archivos ZIP en Mapa / Reducir y algunas respuestas ( ejemplo ) explicando cómo usar ZipFileInputFormat importado junto con la API sc.newAPIHadoopFile . Pero esto no funcionó para mí .

Mi solución

Sin dependencias externas, puede cargar su archivo con sc.binaryFiles y luego descomprimir el PortableDataStream leyendo el contenido. Este es el enfoque que he elegido.

import java.io.{BufferedReader, InputStreamReader} import java.util.zip.ZipInputStream import org.apache.spark.SparkContext import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal { def readFile(path: String, minPartitions: Int = sc.defaultMinPartitions): RDD[String] = { if (path.endsWith(".zip")) { sc.binaryFiles(path, minPartitions) .flatMap { case (name: String, content: PortableDataStream) => val zis = new ZipInputStream(content.open) // this solution works only for single file in the zip val entry = zis.getNextEntry val br = new BufferedReader(new InputStreamReader(zis)) Stream.continually(br.readLine()).takeWhile(_ != null) } } else { sc.textFile(path, minPartitions) } } }

utilizando esta clase implícita, debe importarla y llamar al método SparkContext en SparkContext :

import com.github.atais.spark.Implicits.ZipSparkContext sc.readFile(path)

Y la clase implícita cargará su archivo zip correctamente y devolverá RDD[String] como solía hacerlo.

Nota: ¡Esto solo funciona para un solo archivo en el archivo zip!
Para múltiples archivos en su soporte zip, marque esta respuesta: https://.com/a/45958458/1549135


A continuación se muestra un ejemplo en el que se buscan archivos .zip en un directorio y se crea un RDD con un FileInputFormat personalizado denominado ZipFileInputFormat y la nueva API API de APIAP en el contexto de Spark. A continuación, escribe esos archivos en un directorio de salida.

allzip.foreach { x => val zipFileRDD = sc.newAPIHadoopFile( x.getPath.toString, classOf[ZipFileInputFormat], classOf[Text], classOf[BytesWritable], hadoopConf) zipFileRDD.foreach { y => ProcessFile(y._1.toString, y._2) }

https://github.com/alvinhenrick/apache-spark-examples/blob/master/src/main/scala/com/zip/example/Unzip.scala

El archivo ZipFileInputFormat utilizado en el ejemplo se puede encontrar aquí: https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop


Como Apache Spark usa los formatos de entrada de Hadoop, podemos consultar la documentación de hadoop sobre cómo procesar archivos zip y ver si hay algo que funcione.

Este sitio nos da una idea de cómo usar esto (es decir, podemos usar ZipFileInputFormat). Dicho esto, dado que los archivos zip no son de tabla dividida (ver esto ), su solicitud para tener un solo archivo comprimido no es realmente compatible. En cambio, si es posible, sería mejor tener un directorio que contenga muchos archivos zip separados.

Esta pregunta es similar a esta otra pregunta , sin embargo, agrega una pregunta adicional de si sería posible tener un solo archivo zip (que, dado que no es un formato de tabla dividida, no es una buena idea).


Puede usar sc.binaryFiles para abrir el archivo zip en formato binario y luego descomprimirlo en el formato de texto. Lamentablemente, el archivo comprimido no se puede dividir. Por lo tanto, debe esperar a la descompresión, y luego llamar aleatoriamente para equilibrar los datos en cada partición.

Aquí hay un ejemplo en Python. Más información está en http://gregwiki.duckdns.org/index.php/2016/04/11/read-zip-file-in-spark/

file_RDD = sc.binaryFiles( HDFS_path + data_path ) def Zip_open( binary_stream_string ) : # New version, treat a stream as zipped file try : pseudo_file = io.BytesIO( binary_stream_string ) zf = zipfile.ZipFile( pseudo_file ) return zf except : return None def read_zip_lines(zipfile_object) : file_iter = zipfile_object.open(''diff.txt'') data = file_iter.readlines() return data My_RDD = file_RDD.map(lambda kv: (kv[0], Zip_open(kv[1])))


Puede usar sc.binaryFiles para leer Zip como archivo binario

val rdd = sc.binaryFiles(path).flatMap { case (name: String, content: PortableDataStream) => new ZipInputStream(content.open) } //=> RDD[ZipInputStream]

Y luego puede asignar el ZipInputStream a la lista de líneas:

val zis = rdd.first val entry = zis.getNextEntry val br = new BufferedReader(new InputStreamReader(in, "UTF-8")) val res = Stream.continually(br.readLine()).takeWhile(_ != null).toList

Pero el problema sigue siendo que el archivo zip no es divisible.