sqlcontext spark example apache-spark dataframe spark-dataframe apache-spark-sql

apache-spark - example - spark sql java



¿Cómo agrego una columna persistente de identificadores de fila a Spark DataFrame? (6)

Esta pregunta no es nueva, sin embargo, encuentro un comportamiento sorprendente en Spark. Necesito agregar una columna de ID de fila a un DataFrame. Utilicé el método DataFrame monotonically_increasing_id () y me da una columna adicional de ID de fila únicos (que por cierto NO son consecutivos, pero son únicos).

El problema que tengo es que cuando filtro el DataFrame, los ID de fila en el DataFrame resultante se reasignan. Los dos marcos de datos se muestran a continuación.

  • el primero es el DataFrame inicial con ID de fila agregados de la siguiente manera:

    df.withColumn("rowId", monotonically_increasing_id())

  • el segundo DataFrame es el obtenido después de filtrar en el col P a través de df.filter(col("P")) .

El problema se ilustra en el rowId para custId 169, que era 5 en el DataFrame inicial, pero después de filtrar ese rowId (5) se reasignó a custmId 773 cuando se eliminó custId 169. No sé por qué este es el comportamiento predeterminado.

Me gustaría que los rowIds sean "adhesivos"; si elimino las filas del DataFrame, no quiero que sus ID sean "reutilizadas", quiero que también desaparezcan junto con sus filas. ¿Es posible hacer eso? No veo ningún indicador para solicitar este comportamiento del método monotonically_increasing_id .

+---------+--------------------+-------+ | custId | features| P |rowId| +---------+--------------------+-------+ |806 |[50,5074,...| true| 0| |832 |[45,120,1...| true| 1| |216 |[6691,272...| true| 2| |926 |[120,1788...| true| 3| |875 |[54,120,1...| true| 4| |169 |[19406,21...| false| 5| after filtering on P: +---------+--------------------+-------+ | custId| features| P |rowId| +---------+--------------------+-------+ | 806|[50,5074,...| true| 0| | 832|[45,120,1...| true| 1| | 216|[6691,272...| true| 2| | 926|[120,1788...| true| 3| | 875|[54,120,1...| true| 4| | 773|[3136,317...| true| 5|


Esto funcionó para mí. Creó otra columna de identidad y usó la función de ventana número_ fila

from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)]) zipped_rdd = **original_dataframe**.rdd.zipWithIndex() indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))


No pude reproducir esto. Sin embargo, estoy usando Spark 2.0, así que tal vez el comportamiento ha cambiado o no estoy haciendo lo mismo que tú.

val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true)) .toDF("name", "value","flag") .withColumn("rowd", monotonically_increasing_id()) df.show val df2 = df.filter(col("flag")=== true) df2.show df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields] +-----+-----+-----+----+ | name|value| flag|rowd| +-----+-----+-----+----+ | one| 1| true| 0| | two| 2|false| 1| |three| 3| true| 2| | four| 4| true| 3| +-----+-----+-----+----+ df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields] +-----+-----+----+----+ | name|value|flag|rowd| +-----+-----+----+----+ | one| 1|true| 0| |three| 3|true| 2| | four| 4|true| 3| +-----+-----+----+----+


Para evitar la evaluación cambiante de monotonically_increasing_id (), puede intentar escribir el marco de datos en el disco y volver a leerlo. Entonces la columna de identificación ahora es simplemente un campo de datos que se está leyendo, en lugar de calcularse dinámicamente en algún punto de la tubería. Aunque es una solución bastante fea, funcionó cuando hice una prueba rápida.



Recientemente estuve trabajando en un problema similar. Aunque monotonically_increasing_id() es muy rápido, no es confiable y no le dará números de fila consecutivos, solo aumentará los enteros únicos.

Crear una partición de Windows y luego usar row_number().over(some_windows_partition) consume mucho tiempo.

La mejor solución hasta ahora es usar un archivo comprimido con índice y luego convertir el archivo comprimido nuevamente al marco de datos original, con el nuevo esquema que incluye la columna de índice.

Prueba esto:

row_with_index = Row( "calendar_date" ,"year_week_number" ,"year_period_number" ,"realization" ,"index" )

Donde original_dataframe es el dataframe que tiene que agregar un índice y row_with_index es el nuevo esquema con el índice de columna que puede escribir como

import org.apache.spark.sql.functions.{row_number} import org.apache.spark.sql.expressions.Window val df1: DataFrame = df.withColumn("Id",lit(1)) df1 .select( ..., row_number() .over(Window .partitionBy("Id" .orderBy(col("...").desc)) ) .alias("Row_Nbr") )

Aquí, calendar_date , year_week_number , year_period_number y realization fueron las columnas de mi dataframe original. Puede reemplazar los nombres con los nombres de sus columnas. El índice es el nuevo nombre de columna que tuvo que agregar para los números de fila.

Este proceso es en gran medida más eficiente y más suave en comparación con el row_number().over(some_windows_partition) .

Espero que esto ayude.


Spark 2.0

Spark 1.x

El problema que experimenta es bastante sutil, pero puede reducirse a un hecho simple monotonically_increasing_id es una función extremadamente fea. Claramente no es puro y su valor depende de algo que está completamente fuera de su control.

No toma ningún parámetro, por lo que desde la perspectiva del optimizador no importa cuándo se llama y se puede enviar después de todas las demás operaciones. De ahí el comportamiento que ves.

Si observa el código, descubrirá que esto se marca explícitamente al extender la expresión MonotonicallyIncreasingID con Nondeterministic .

No creo que haya una solución elegante, pero una forma de manejar esto es agregar una dependencia artificial en el valor filtrado. Por ejemplo con un UDF como este:

from pyspark.sql.types import LongType from pyspark.sql.functions import udf bound = udf(lambda _, v: v, LongType()) (df .withColumn("rn", monotonically_increasing_id()) # Due to nondeterministic behavior it has to be a separate step .withColumn("rn", bound("P", "rn")) .where("P"))

En general, podría ser más limpio agregar índices usando zipWithIndex en un RDD y luego convertirlo nuevamente en un DataFrame .

* La solución que se muestra arriba ya no es una solución válida (ni requerida) en Spark 2.x donde las UDF de Python están sujetas a las optimizaciones del plan de ejecución.