tutorial spark org examples example apache-spark apache-spark-sql

apache-spark - tutorial - org apache spark examples



ParticiĆ³n de DataFrame por un solo archivo de Parquet(por particiĆ³n) (2)

Me gustaría reparar / unir mis datos para que se guarden en un archivo Parquet por partición. También me gustaría usar la API Spark SQL partitionBy. Entonces podría hacer eso así:

df.coalesce(1).write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")

Lo he probado y parece que no funciona bien. Esto se debe a que solo hay una partición para trabajar en el conjunto de datos y todas las particiones, la compresión y el guardado de archivos deben ser realizados por un núcleo de CPU.

Podría volver a escribir esto para hacer el particionamiento de forma manual (usando filtros con los distintos valores de partición, por ejemplo) antes de llamar a coalesce.

Pero, ¿hay una mejor manera de hacerlo utilizando la API Spark SQL estándar?


Por definición :

coalesce (numPartitions: Int): DataFrame Devuelve un nuevo DataFrame que tiene exactamente particiones numPartitions.

Puede usarlo para disminuir el número de particiones en el RDD / DataFrame con el parámetro numPartitions. Es útil para ejecutar operaciones de manera más eficiente después de filtrar un gran conjunto de datos.

En cuanto a tu código, no funciona bien porque lo que estás haciendo realmente es:

  1. poner todo en una partición que sobrecarga el controlador ya que es extraer todos los datos en una partición en el controlador (y tampoco es una buena práctica)

  2. coalesce realidad mezcla todos los datos en la red que también pueden dar como resultado la pérdida de rendimiento.

La reproducción aleatoria es el mecanismo de Spark para redistribuir los datos de manera que se agrupen de manera diferente en las particiones. Normalmente, esto implica copiar datos entre ejecutores y máquinas, lo que hace que la reorganización sea una operación compleja y costosa.

El concepto de reproducción aleatoria es muy importante de administrar y comprender. Siempre es preferible mezclar lo mínimo posible porque es una operación costosa, ya que implica E / S de disco, serialización de datos y E / S de red. Para organizar los datos para la reproducción aleatoria, Spark genera conjuntos de tareas: asignar tareas para organizar los datos y un conjunto de tareas de reducción para agregarlas. Esta nomenclatura proviene de MapReduce y no se relaciona directamente con el mapa de Spark y reduce las operaciones.

Internamente, los resultados de las tareas de mapas individuales se guardan en la memoria hasta que no encajan. Luego, estos se ordenan según la partición de destino y se escriben en un solo archivo. En el lado de la reducción, las tareas leen los bloques clasificados relevantes.

Con respecto al parquet de particionado, sugiero que lea la respuesta here sobre Spark DataFrames con Partición de Parquet y también esta section en la Guía de programación de Spark para Ajuste de rendimiento .

Espero que esto ayude !


Tuve exactamente el mismo problema y encontré una forma de hacerlo usando DataFrame.repartition() . El problema con el uso de coalesce(1) es que su paralelismo cae a 1, y puede ser lento en el mejor de los casos y error en el peor de los casos. Aumentar ese número tampoco ayuda: si te coalesce(10) obtienes más paralelismo, pero terminas con 10 archivos por partición.

Para obtener un archivo por partición sin utilizar coalesce() , utilice repartition() con las mismas columnas para las que desea que se particione la salida. Entonces en tu caso, haz esto:

df.repartition("entity", "year", "month", "day", "status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")

Una vez que hago eso, obtengo un archivo de parquet por partición de salida, en lugar de varios archivos.

Probé esto en Python, pero supongo que en Scala debería ser el mismo.