apache spark - ¿Spark soporta la poda de partición con archivos de parquet?
apache-spark amazon-s3 (2)
Sí, la chispa soporta la poda de partición.
Spark hace una lista de directorios de particiones (secuencial o paralela listLeafFilesInParallel
) para crear una caché de todas las particiones por primera vez. Las consultas en la misma aplicación, que los datos escaneados aprovechan esta memoria caché. Así que la lentitud que ves podría ser debido a esta construcción de caché. Las consultas subsiguientes que escanean datos hacen uso de la caché para eliminar particiones.
Estos son los registros que muestran las particiones que se enumeran para llenar el caché.
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-01 on driver
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-02 on driver
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-03 on driver
Estos son los registros que muestran que la poda está sucediendo.
App > 16/11/10 12:29:16 main INFO DataSourceStrategy: Selected 1 partitions out of 20, pruned 95.0% partitions.
Consulte convertToParquetRelation
y getHiveQlPartitions
en HiveMetastoreCatalog.scala
.
Estoy trabajando con un conjunto de datos grande, que está dividido en dos columnas: plant_name
y tag_id
. La segunda partición - tag_id
tiene 200000 valores únicos, y yo tag_id
principalmente a los datos mediante valores específicos de tag_id
. Si uso los siguientes comandos de Spark:
sqlContext.setConf("spark.sql.hive.metastorePartitionPruning", "true")
sqlContext.setConf("spark.sql.parquet.filterPushdown", "true")
val df = sqlContext.sql("select * from tag_data where plant_name=''PLANT01'' and tag_id=''1000''")
Yo esperaría una respuesta rápida ya que esto se resuelve en una sola partición. En Hive y Presto esto toma segundos, sin embargo en Spark se ejecuta durante horas.
Los datos reales se guardan en un cubo S3, y cuando envío la consulta de SQL, Spark se apaga y primero obtiene todas las particiones del metastore de Hive (200000 de ellas), y luego llama a refresh()
para forzar una lista de estado completa de todos estos archivos en el almacén de objetos S3 (en realidad, se llama listLeafFilesInParallel
).
Son estas dos operaciones las que son tan caras, ¿hay alguna configuración que pueda hacer que Spark elimine las particiones antes, ya sea durante la llamada al almacén de metadatos o inmediatamente después?
Solo un pensamiento:
La documentación de la API de Spark para HadoopFsRelation dice, ( https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/sources/HadoopFsRelation.html )
"... cuando lee tablas particionadas de estilo Hive almacenadas en sistemas de archivos, puede descubrir información de partición de las rutas de los directorios de entrada y realizar la eliminación de particiones antes de comenzar a leer los datos ..."
Entonces, supongo que "listLeafFilesInParallel" no podría ser un problema.
Un problema similar ya está en spark jira: https://issues.apache.org/jira/browse/SPARK-10673
A pesar de que "spark.sql.hive.verifyPartitionPath" está establecido en falso y, no hay ningún efecto en el rendimiento, sospecho que el problema podría haber sido causado por particiones no registradas. Enumere las particiones de la tabla y verifique si todas las particiones están registradas. Si no, recupere sus particiones como se muestra en este enlace:
Hive no lee archivos de parquet particionados generados por Spark
Actualizar:
Supongo que el tamaño de bloque de parquet y el tamaño de página se establecieron al escribir los datos.
Cree una tabla de subárbol nueva con las particiones mencionadas, y el formato de archivo como parquet, cárguelo desde una tabla no particionada utilizando un enfoque de partición dinámica. ( https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions ) Ejecute una consulta de sección simple y luego compare ejecutando un programa de chispa.
Descargo de responsabilidad: No soy un experto de chispa / parquet. El problema sonaba interesante, y por lo tanto respondió.