apache spark - read - Recuento de filas de parquet rápido en Spark
spark sql example (1)
Eso es correcto, Spark ya está usando el campo de conteos de vueltas cuando está ejecutando count
.
Profundizando un poco en los detalles, SpecificParquetRecordReaderBase.java hace referencia al rendimiento del escaneo de Improve Parquet cuando se usa la confirmación de esquemas planos como parte de [SPARK-11787] Acelera el lector de parquet para esquemas planos . Tenga en cuenta que esta confirmación se incluyó como parte de la rama Spark 1.6.
Si la consulta es un recuento de filas, funciona bastante de la manera que lo describió (es decir, leyendo los metadatos). Si los predicados están totalmente satisfechos por los valores mínimos / máximos, eso debería funcionar también, aunque eso no está tan completamente verificado. No es una mala idea usar esos campos de Parquet, pero como está implícito en la declaración anterior, la cuestión clave es garantizar que el filtrado de predicados coincida con los metadatos, por lo que está haciendo un recuento exacto.
Para ayudar a entender por qué hay dos etapas, aquí está el DAG creado al ejecutar la instrucción count ().
Al profundizar en las dos etapas, observe que la primera (Etapa 25) ejecuta el análisis de archivos mientras que la segunda etapa (Etapa 26) ejecuta la reproducción aleatoria para el recuento.
¡Gracias a Nong Li (el autor del compromiso SpecificParquetRecordReaderBase.java ) por la validación!
Actualizado
Para proporcionar un contexto adicional en el puente entre Dataset.count
y Parquet, el flujo de la lógica interna que lo rodea es:
- Spark no lee ninguna columna de Parquet para calcular el conteo
- Pasar el esquema de Parquet a
VectorizedParquetRecordReader
es en realidad un mensaje de Parquet vacío - Computar el recuento utilizando los metadatos almacenados en los pies de página del archivo Parquet. implica el ajuste de lo anterior dentro de un iterador que devuelve un
InternalRow
por InternalRow.scala .
Para trabajar con el formato de archivo Parquet, internamente, Apache Spark ajusta la lógica con un iterador que devuelve un InternalRow
; se puede encontrar más información en InternalRow.scala . En última instancia, la función de agregado count()
interactúa con la fuente de datos de Parquet subyacente utilizando este iterador. Por cierto, esto es cierto tanto para el lector de parquet vectorizado como no vectorizado.
Por lo tanto, para unir el Dataset.count()
con el lector de Parquet, la ruta es:
- La llamada a
Dataset.count()
se planifica en un operador agregado con una única función de agregado count (). - El código Java se genera en tiempo de planificación para el operador agregado así como para la función de agregado count ().
- El código Java generado interactúa con la fuente de datos subyacente ParquetFileFormat con un RecordReaderIterator, que es utilizado internamente por la API de fuente de datos Spark.
Para obtener más información, consulte Explicación de metadatos de recuento de parquet .
Los archivos de Parquet contienen un campo de conteo de filas por bloque. Spark parece leerlo en algún momento ( SpecificParquetRecordReaderBase.java#L151
).
Intenté esto en spark-shell
:
sqlContext.read.load("x.parquet").count
Y Spark ejecutó dos etapas, mostrando varios pasos de agregación en el DAG. Me imagino que esto significa que lee el archivo normalmente en lugar de usar los recuentos de filas. (Podría estar equivocado.)
La pregunta es: ¿Spark ya está usando los campos de recuento de filas cuando ejecuto count
? ¿Hay otra API para usar esos campos? ¿Confiar en esos campos es una mala idea por alguna razón?