apache-spark amazon-s3 apache-spark-sql parquet

apache spark - ¿Spark admite escaneos de columna verdaderos sobre archivos de parquet en S3?



apache-spark amazon-s3 (4)

Uno de los grandes beneficios del formato de almacenamiento de datos de Parquet es que es columnar . Si tengo un conjunto de datos ''ancho'' con cientos de columnas, pero mi consulta solo toca algunas de ellas, entonces es posible leer solo los datos que almacenan esas pocas columnas y omitir el resto.

Presumiblemente, esta característica funciona leyendo un poco de metadatos en la cabecera de un archivo de parquet que indica las ubicaciones en el sistema de archivos para cada columna. El lector puede buscar en el disco para leer solo las columnas necesarias.

¿Alguien sabe si el lector de parquet predeterminado de chispa implementa correctamente este tipo de búsqueda selectiva en S3? Creo que es compatible con S3 , pero hay una gran diferencia entre el soporte teórico y una implementación que explota adecuadamente ese soporte.


DESCARGO DE RESPONSABILIDAD: no tengo una respuesta definitiva y tampoco quiero actuar como una fuente autorizada, pero he dedicado un tiempo al soporte de parquet en Spark 2.2+ y espero que mi respuesta nos ayude a todos a acercarnos a la respuesta correcta.

¿Parquet en S3 evita extraer los datos de las columnas no utilizadas de S3 y solo recupera los fragmentos de archivos que necesita, o extrae todo el archivo?

Uso Spark 2.3.0-SNAPSHOT que construí hoy directamente desde el master .

parquet formato de fuente de datos de parquet es manejado por ParquetFileFormat que es un FileFormat .

Si estoy en lo correcto, la parte de lectura es manejada por el método buildReaderWithPartitionValues (que anula los de FileFormat ).

buildReaderWithPartitionValues se usa exclusivamente cuando se FileSourceScanExec el operador físico FileSourceScanExec para los denominados RDD de entrada que en realidad son un solo RDD para generar filas internas cuando se ejecuta WholeStageCodegenExec .

Dicho esto, creo que revisar lo que buildReaderWithPartitionValues hace nos puede acercar a la respuesta final.

Cuando miras la línea puedes estar seguro de que estamos en el camino correcto.

// Intenta empujar hacia abajo los filtros cuando está habilitado el pushdown de filtro.

La ruta del código depende de la propiedad spark.sql.parquet.filterPushdown Spark que está activada de manera predeterminada .

spark.sql.parquet.filterPushdown Habilita la optimización de pushdown de filtro de parquet cuando se establece en verdadero.

Eso nos lleva a ParquetInputFormat.setFilterPredicate de parquet- ParquetInputFormat.setFilterPredicate si se definen los filtros.

if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) }

El código se vuelve más interesante un poco más tarde cuando se usan los filtros cuando el código vuelve a parquet-mr (en lugar de usar el denominado lector de decodificación de parquet vectorizado). Esa es la parte que realmente no entiendo (excepto lo que puedo ver en el código).

Tenga en cuenta que el lector de decodificación de parquet vectorizado está controlado por spark.sql.parquet.enableVectorizedReader Spark propiedad que está activada por defecto.

SUGERENCIA: para saber qué parte de la expresión if se usa, habilite el nivel de registro DEBUG para org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat logger.

Para ver todos los filtros desplegados, puede activar el nivel de registro INFO del registrador org.apache.spark.sql.execution.FileSourceScanExec . Debería ver lo siguiente en los registros :

INFO Pushed Filters: [pushedDownFilters]

Espero que si no está cerca de ser una respuesta definitiva, haya ayudado un poco y alguien lo retome donde lo dejé para que sea uno pronto. Hope muere el último :)


Esto debe ser desglosado

  1. El código de Parquet obtiene los predicados de la chispa (sí)
  2. ¿El parquet luego intenta leer selectivamente solo esas columnas, usando las FileSystem seek() Hadoop FileSystem seek() + read() o readFully(position, buffer, length) ? Sí
  3. ¿El conector S3 convierte estas operaciones de archivos en solicitudes HTTP GET eficientes? En Amazon EMR: sí. En Apache Hadoop, necesita hadoop 2.8 en classpath y establece adecuadamente spark.hadoop.fs.s3a.experimental.fadvise=random para activar el acceso aleatorio.

Hadoop 2.7 y anteriores manejan mal el seek () agresivo alrededor del archivo, porque siempre inician un offset-end-of-file GET, se sorprenden con la próxima búsqueda, tienen que abortar esa conexión, reabrir una nueva conexión TCP / HTTPS 1.1 (lento, CPU pesada), hazlo de nuevo, repetidamente. La operación aleatoria de IO duele en la carga masiva de elementos como .csv.gz, pero es fundamental para obtener ORC / Parquet perf.

No obtienes la aceleración en el JAR de hadoop-aws de Hadoop 2.7. Si lo necesita, debe actualizar hadoop * .jar y las dependencias, o compilar Spark desde cero contra Hadoop 2.8.

Tenga en cuenta que Hadoop 2.8+ también tiene una pequeña característica agradable: si llama toString() en un cliente de sistema de archivos S3A en una declaración de registro, imprime todas las estadísticas de IO del sistema de archivos, incluida la cantidad de datos descartados en búsquedas, conexiones TCP abortadas yc . Te ayuda a descubrir qué está pasando.

2018-04-13 warning:: No intentes soltar el JAR de Hadoop 2.8+ hadoop-aws en el classpath junto con el resto del conjunto de hadoop-2.7 JAR y esperar ver cualquier aceleración. Todo lo que verás son rastros de pila. Necesita actualizar todos los JAR de hadoop y sus dependencias transitivas.


No, pushdown de predicado no es totalmente compatible. Esto, por supuesto, depende de:

  • Caso de uso específico
  • Versión Spark
  • Tipo de conector S3 y versión

Para verificar su caso de uso específico, puede habilitar el nivel de registro DEBUG en Spark y ejecutar su consulta. Luego, puede ver si hay "búsquedas" durante las solicitudes S3 (HTTP) y cuántas solicitudes se enviaron realmente. Algo como esto:

17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1[/r][/n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 0-7472093/7472094[/r][/n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 7472094[/r][/n]"

Aquí hay un ejemplo de un informe de problema que se abrió recientemente debido a la incapacidad de Spark 2.1 para calcular COUNT(*) de todas las filas en un conjunto de datos basado en los metadatos almacenados en el archivo Parquet: https://issues.apache.org/jira/browse/SPARK-21074


el lector de chispas de parquet es como cualquier otro InputFormat,

  1. Ninguno de los archivos de entrada tiene algo especial para S3. Los formatos de entrada pueden leer LocalFileSystem, Hdfs y S3 sin optimización especial para eso.

  2. Parquet InpuTFormat, dependiendo de las columnas que leas, leerá selectivamente las columnas por ti.

  3. Si desea estar completamente seguro (aunque los predicados pushdown funcionan en la última versión de chispa), seleccione manualmente las columnas y escriba la transformación y las acciones, en lugar de depender de SQL.