spark repartition apache-spark-sql data-partitioning

apache spark sql - Spark SQL-Diferencia entre df.repartition y DataFrameWriter partitionBy?



pyspark repartition (2)

Cuidado: ¡creo que la respuesta aceptada no es del todo correcta! Me alegra que haga esta pregunta, porque el comportamiento de estas funciones con nombres similares difiere en formas importantes e inesperadas que no están bien documentadas en la documentación oficial de chispa.

La primera parte de la respuesta aceptada es correcta: llamar a df.repartition(COL, numPartitions=k) creará un marco de datos con k particiones usando un particionador basado en hash. COL define aquí la clave de particionamiento: puede ser una sola columna o una lista de columnas. El particionador basado en hash toma la clave de partición de cada fila de entrada, la divide en un espacio de k particiones a través de algo como partition = hash(partitionKey) % k . Esto garantiza que todas las filas con la misma clave de partición terminen en la misma partición. Sin embargo, las filas de varias claves de partición también pueden terminar en la misma partición (cuando se produce una colisión hash entre las claves de partición) y algunas particiones pueden estar vacías .

En resumen, los aspectos no intuitivos de df.repartition(COL, numPartitions=k) son que

  • las particiones no segregarán estrictamente las claves de partición
  • algunas de sus k particiones pueden estar vacías, mientras que otras pueden contener filas de varias claves de partición

El comportamiento de df.write.partitionBy es bastante diferente, de una manera que muchos usuarios no esperan. Supongamos que desea que sus archivos de salida tengan particiones de fecha y que sus datos abarquen más de 7 días. Supongamos también que df tiene 10 particiones para empezar. Cuando ejecuta df.write.partitionBy(''day'') , ¿cuántos archivos de salida debe esperar? La respuesta es, depende''. Si cada partición de sus particiones iniciales en df contiene datos de cada día, entonces la respuesta es 70. Si cada una de sus particiones iniciales en df contiene datos de exactamente un día, entonces la respuesta es 10.

¿Cómo podemos explicar este comportamiento? Cuando ejecuta df.write , cada una de las particiones originales en df se escribe de forma independiente. Es decir, cada una de sus 10 particiones originales se subdivide por separado en la columna ''día'', y se escribe un archivo separado para cada subpartición.

Encuentro este comportamiento bastante molesto y desearía que hubiera una manera de hacer un reparticionamiento global al escribir marcos de datos.

¿Cuál es la diferencia entre DataFrame repartition() y DataFrameWriter partitionBy() métodos?

Espero que ambos se utilicen para "particionar datos basados ​​en la columna del marco de datos"? ¿O hay alguna diferencia?


Si ejecuta repartition(COL) , cambia la partición durante los cálculos; obtendrá spark.sql.shuffle.partitions (predeterminado: 200). Si luego llama a .write obtendrá un directorio con muchos archivos.

Si ejecuta .write.partitionBy(COL) , como resultado obtendrá tantos directorios como valores únicos en COL. Esto acelera la lectura de datos (si filtra por columna de partición) y ahorra espacio en el almacenamiento (la columna de partición se elimina de los archivos de datos).

ACTUALIZACIÓN : Ver la respuesta de @ conradlee. Explica en detalles no solo cómo se verá la estructura de directorios después de aplicar diferentes métodos, sino también cuál será el número resultante de archivos en ambos escenarios.