apache-spark pyspark apache-spark-sql partitioning window-functions

apache spark - Evite el impacto en el rendimiento de un solo modo de partición en las funciones de la ventana de Spark



apache-spark pyspark (1)

En la práctica, el impacto en el rendimiento será casi el mismo que si omitiera la cláusula PartitionBy. Todos los registros se barajarán en una única partición, se ordenarán localmente y se repetirán secuencialmente uno por uno.

La diferencia es solo en el número de particiones creadas en total. Vamos a ilustrar eso con un ejemplo usando un conjunto de datos simple con 10 particiones y 1000 registros:

df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))

Si define marco sin partición por cláusula

w_unpart = Window.orderBy(f.col("index").asc())

y lag con lag

df_lag_unpart = df.withColumn( "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1") )

solo habrá una partición en total:

df_lag_unpart.rdd.glom().map(len).collect()

[1000]

En comparación con esa definición de cuadro con índice ficticio (simplificado un poco en comparación con su código:

w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())

utilizará un número de particiones igual a spark.sql.shuffle.partitions :

spark.conf.set("spark.sql.shuffle.partitions", 11) df_lag_part = df.withColumn( "diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1") ) df_lag_part.rdd.glom().count()

11

con solo una partición no vacía:

df_lag_part.rdd.glom().filter(lambda x: x).count()

1

Desafortunadamente, no hay una solución universal que pueda usarse para resolver este problema en PySpark. Esto es solo un mecanismo inherente de la implementación combinado con el modelo de procesamiento distribuido.

Dado index columna de index es secuencial, podría generar una clave de partición artificial con un número fijo de registros por bloque:

rec_per_block = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions")) df_with_block = df.withColumn( "block", (f.col("index") / rec_per_block).cast("int") )

y úselo para definir la especificación del marco:

w_with_block = Window.partitionBy("block").orderBy("index") df_lag_with_block = df_with_block.withColumn( "diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1") )

Esto usará el número esperado de particiones:

df_lag_with_block.rdd.glom().count()

11

con una distribución de datos aproximadamente uniforme (no podemos evitar colisiones hash):

df_lag_with_block.rdd.glom().map(len).collect()

[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]

pero con una serie de lagunas en los límites del bloque:

df_lag_with_block.where(f.col("diffs_col1").isNull()).count()

12

Como los límites son fáciles de calcular:

from itertools import chain boundary_idxs = sorted(chain.from_iterable( # Here we depend on sequential identifiers # This could be generalized to any monotonically increasing # id by taking min and max per block (idx - 1, idx) for idx in df_lag_with_block.groupBy("block").min("index") .drop("block").rdd.flatMap(lambda x: x) .collect()))[2:] # The first boundary doesn''t carry useful inf.

siempre puedes seleccionar:

missing = df_with_block.where(f.col("index").isin(boundary_idxs))

y llenar estos por separado:

# We use window without partitions here. Since number of records # will be small this won''t be a performance issue # but will generate "Moving all data to a single partition" warning missing_with_lag = missing.withColumn( "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1") ).select("index", f.col("diffs_col1").alias("diffs_fill"))

y join :

combined = (df_lag_with_block .join(missing_with_lag, ["index"], "leftouter") .withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))

para obtener el resultado deseado:

mismatched = combined.join(df_lag_unpart, ["index"], "outer").where( combined["diffs_col1"] != df_lag_unpart["diffs_col1"] ) assert mismatched.count() == 0

Mi pregunta se desencadena por el caso de uso de calcular las diferencias entre filas consecutivas en un marco de datos de chispa.

Por ejemplo, tengo:

>>> df.show() +-----+----------+ |index| col1| +-----+----------+ | 0.0|0.58734024| | 1.0|0.67304325| | 2.0|0.85154736| | 3.0| 0.5449719| +-----+----------+

Si elijo calcularlos usando las funciones de "Ventana", entonces puedo hacerlo así:

>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc()) >>> import pyspark.sql.functions as f >>> df.withColumn(''diffs_col1'', f.lag(df.col1, -1).over(winSpec) - df.col1).show() +-----+----------+-----------+ |index| col1| diffs_col1| +-----+----------+-----------+ | 0.0|0.58734024|0.085703015| | 1.0|0.67304325| 0.17850411| | 2.0|0.85154736|-0.30657548| | 3.0| 0.5449719| null| +-----+----------+-----------+

Pregunta : Particioné explícitamente el marco de datos en una sola partición. ¿Cuál es el impacto en el rendimiento de esto y, si lo hay, por qué es así y cómo podría evitarlo? Porque cuando no especifico una partición, aparece la siguiente advertencia:

16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.