todf started spark read getting apache-spark distributed-computing rdd

apache spark - started - Spark-repartition() vs coalesce()



spark group by (11)

A todas las excelentes respuestas, me gustaría agregar que volver a particionar es una de las mejores opciones para aprovechar la paralelización de datos y la fusión ofrece una opción económica para reducir la partición y muy útil al escribir datos en HDFS o algún otro receptor para aprovechar Grandes escrituras. He encontrado esto útil cuando escribo datos en formato parquet para aprovechar al máximo.

De acuerdo con Learning Spark

Tenga en cuenta que repartir sus datos es una operación bastante costosa. Spark también tiene una versión optimizada de repartition () llamada coalesce () que permite evitar el movimiento de datos, pero solo si está disminuyendo el número de particiones RDD.

Una diferencia que obtengo es que con repartición () el número de particiones se puede aumentar / disminuir, pero con fusión () el número de particiones solo se puede disminuir.

Si las particiones se distribuyen en varias máquinas y se ejecuta coalesce (), ¿cómo puede evitar el movimiento de datos?


De una manera simple COALESCE: - es solo para disminuir el no de particiones, sin mezclar datos, solo comprime las particiones

REPARACIÓN: es tanto para aumentar como para disminuir el número de particiones, pero se baraja

Ejemplo:-

val rdd = sc.textFile("path",7) rdd.repartition(10) rdd.repartition(2)

Ambos funcionan bien

Pero generalmente vamos por estas dos cosas cuando necesitamos ver la salida en un clúster, vamos con esto.


Evita una barajadura completa . Si se sabe que el número está disminuyendo, entonces el ejecutor puede mantener de forma segura los datos en el número mínimo de particiones, solo moviendo los datos de los nodos adicionales a los nodos que guardamos.

Entonces, sería algo como esto:

Node 1 = 1,2,3 Node 2 = 4,5,6 Node 3 = 7,8,9 Node 4 = 10,11,12

Luego se coalesce en 2 particiones:

Node 1 = 1,2,3 + (10,11,12) Node 3 = 7,8,9 + (4,5,6)

Observe que el Nodo 1 y el Nodo 3 no requieren que sus datos originales se muevan.


La respuesta de Justin es asombrosa y esta respuesta se profundiza.

El algoritmo de repartition realiza una combinación completa y crea nuevas particiones con datos que se distribuyen de manera uniforme. Creemos un DataFrame con los números del 1 al 12.

val x = (1 to 12).toList val numbersDf = x.toDF("number")

numbersDf contiene 4 particiones en mi máquina.

numbersDf.rdd.partitions.size // => 4

Así es como se dividen los datos en las particiones:

Partition 00000: 1, 2, 3 Partition 00001: 4, 5, 6 Partition 00002: 7, 8, 9 Partition 00003: 10, 11, 12

Hagamos una combinación completa con el método de repartition y obtengamos estos datos en dos nodos.

val numbersDfR = numbersDf.repartition(2)

Así es como se numbersDfR datos numbersDfR en mi máquina:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12 Partition B: 2, 5, 8, 11

El método de repartition nuevas particiones y distribuye de manera uniforme los datos en las nuevas particiones (la distribución de datos es más uniforme incluso para conjuntos de datos más grandes).

Diferencia entre coalesce y repartition

coalesce usa particiones existentes para minimizar la cantidad de datos que se barajan. repartition crea nuevas particiones y baraja completamente. coalesce da como resultado particiones con diferentes cantidades de datos (a veces particiones que tienen tamaños muy diferentes) y la repartition da como resultado particiones de aproximadamente el mismo tamaño.

¿La coalesce o repartition más rápida?

coalesce puede ejecutarse más rápido que la repartition , pero las particiones de tamaño desigual generalmente son más lentas para trabajar que las particiones de igual tamaño. Por lo general, deberá volver a particionar los conjuntos de datos después de filtrar un conjunto de datos grande. He descubierto que la repartition es más rápida en general porque Spark está diseñado para funcionar con particiones de igual tamaño.

Lea esta publicación de blog si desea aún más detalles.


Lo que se deduce del code y los documentos de código es que la coalesce(n) es lo mismo que la coalesce(n, shuffle = false) y la repartition(n) es lo mismo que la coalesce(n, shuffle = true)

Por lo tanto, tanto la coalesce como la repartition pueden usarse para aumentar el número de particiones

Con shuffle = true, en realidad puede unirse a un mayor número de particiones. Esto es útil si tiene una pequeña cantidad de particiones, digamos 100, potencialmente con unas pocas particiones que son anormalmente grandes.

Otra nota importante para acentuar es que si disminuye drásticamente el número de particiones, debería considerar usar una versión coalesce de coalesce (igual que la repartition en ese caso). Esto permitirá que sus cálculos se realicen en paralelo en particiones principales (tarea múltiple).

Sin embargo, si está haciendo una fusión drástica, por ejemplo, a numPartitions = 1, esto puede hacer que su cálculo se realice en menos nodos de los que desee (por ejemplo, un nodo en el caso de numPartitions = 1). Para evitar esto, puede pasar shuffle = true. Esto agregará un paso aleatorio, pero significa que las particiones ascendentes actuales se ejecutarán en paralelo (según lo que sea la partición actual).

Consulte también la respuesta relacionada here


Me gustaría agregar a la respuesta de Justin y Power que:

"repartición" ignorará las particiones existentes y creará otras nuevas. Entonces puede usarlo para corregir el sesgo de datos. Puede mencionar las claves de partición para definir la distribución. El sesgo de datos es uno de los mayores problemas en el espacio de problemas de ''big data''.

"fusionar" funcionará con particiones existentes y barajará un subconjunto de ellas. No puede corregir el sesgo de datos tanto como la "repartición". así que incluso si es menos costoso, puede que no sea lo que necesita.


Para alguien que tuvo problemas para generar un solo archivo csv de PySpark (AWS EMR) como salida y guardarlo en s3, el uso de la partición ayudó. La razón es que la fusión no puede hacer una mezcla completa, pero la distribución sí. Esencialmente, puede aumentar o disminuir el número de particiones usando la división, pero solo puede disminuir el número de particiones (pero no 1) usando la fusión. Aquí está el código para cualquier persona que intente escribir un csv de AWS EMR en s3:

df.repartition(1).write.format(''csv'')/ .option("path", "s3a://my.bucket.name/location")/ .save(header = ''true'')


Pero también debe asegurarse de que, si se trata de datos de gran tamaño, los datos que están llegando a los nodos de fusión deben estar altamente configurados. Debido a que todos los datos se cargarán en esos nodos, puede provocar una excepción de memoria. Aunque la reparación es costosa, prefiero usarla. Ya que baraja y distribuye los datos por igual.

Sea prudente para seleccionar entre fusión y reparto.


Todas las respuestas están agregando un gran conocimiento a esta pregunta muy frecuente.

Entonces, siguiendo la tradición de la línea de tiempo de esta pregunta, aquí están mis 2 centavos.

Encontré que la repartición es más rápida que la fusión , en un caso muy específico.

En mi aplicación, cuando el número de archivos que estimamos es inferior al umbral determinado, la repartición funciona más rápido.

Esto es lo que quiero decir

if(numFiles > 20) df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest) else df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

En el fragmento anterior, si mis archivos tenían menos de 20, la fusión tardaba una eternidad en terminar, mientras que el reparto era mucho más rápido y, por lo tanto, el código anterior.

Por supuesto, este número (20) dependerá de la cantidad de trabajadores y la cantidad de datos.

Espero que ayude.


Un punto adicional a tener en cuenta aquí es que, como el principio básico de Spark RDD es la inmutabilidad. La repartición o fusión creará un nuevo RDD. El RDD base continuará teniendo existencia con su número original de particiones. En caso de que el caso de uso exija que el RDD persista en la memoria caché, entonces se debe hacer lo mismo para el RDD recién creado.

scala> pairMrkt.repartition(10) res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26 scala> res16.partitions.length res17: Int = 10 scala> pairMrkt.partitions.length res20: Int = 2


repartition : se recomienda usar la partición mientras se aumenta el número de particiones, ya que implica la combinación aleatoria de todos los datos.

coalesce : se recomienda usar coalesce sin reducir las particiones. Por ejemplo, si tiene 3 particiones y desea reducirlas a 2 particiones, Coalesce moverá los datos de la tercera partición a las particiones 1 y 2. Las particiones 1 y 2 permanecerán en el mismo contenedor. entre ejecutor será alto e impacta el rendimiento.

El rendimiento inteligente coalesce mejor el rendimiento que el repartition mientras reduce el número de particiones.