apache-spark - example - spark sql java
¿Cómo agrego una columna persistente de identificadores de fila a Spark DataFrame? (6)
Esta pregunta no es nueva, sin embargo, encuentro un comportamiento sorprendente en Spark. Necesito agregar una columna de ID de fila a un DataFrame. Utilicé el método DataFrame monotonically_increasing_id () y me da una columna adicional de ID de fila únicos (que por cierto NO son consecutivos, pero son únicos).
El problema que tengo es que cuando filtro el DataFrame, los ID de fila en el DataFrame resultante se reasignan. Los dos marcos de datos se muestran a continuación.
-
el primero es el DataFrame inicial con ID de fila agregados de la siguiente manera:
df.withColumn("rowId", monotonically_increasing_id())
-
el segundo DataFrame es el obtenido después de filtrar en el col P a través de
df.filter(col("P"))
.
El problema se ilustra en el rowId para custId 169, que era 5 en el DataFrame inicial, pero después de filtrar ese rowId (5) se reasignó a custmId 773 cuando se eliminó custId 169. No sé por qué este es el comportamiento predeterminado.
Me gustaría que los
rowIds
sean "adhesivos";
si elimino las filas del DataFrame, no quiero que sus ID sean "reutilizadas", quiero que también desaparezcan junto con sus filas.
¿Es posible hacer eso?
No veo ningún indicador para solicitar este comportamiento del método
monotonically_increasing_id
.
+---------+--------------------+-------+
| custId | features| P |rowId|
+---------+--------------------+-------+
|806 |[50,5074,...| true| 0|
|832 |[45,120,1...| true| 1|
|216 |[6691,272...| true| 2|
|926 |[120,1788...| true| 3|
|875 |[54,120,1...| true| 4|
|169 |[19406,21...| false| 5|
after filtering on P:
+---------+--------------------+-------+
| custId| features| P |rowId|
+---------+--------------------+-------+
| 806|[50,5074,...| true| 0|
| 832|[45,120,1...| true| 1|
| 216|[6691,272...| true| 2|
| 926|[120,1788...| true| 3|
| 875|[54,120,1...| true| 4|
| 773|[3136,317...| true| 5|
Esto funcionó para mí. Creó otra columna de identidad y usó la función de ventana número_ fila
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
No pude reproducir esto. Sin embargo, estoy usando Spark 2.0, así que tal vez el comportamiento ha cambiado o no estoy haciendo lo mismo que tú.
val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
.toDF("name", "value","flag")
.withColumn("rowd", monotonically_increasing_id())
df.show
val df2 = df.filter(col("flag")=== true)
df2.show
df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
+-----+-----+-----+----+
| name|value| flag|rowd|
+-----+-----+-----+----+
| one| 1| true| 0|
| two| 2|false| 1|
|three| 3| true| 2|
| four| 4| true| 3|
+-----+-----+-----+----+
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
+-----+-----+----+----+
| name|value|flag|rowd|
+-----+-----+----+----+
| one| 1|true| 0|
|three| 3|true| 2|
| four| 4|true| 3|
+-----+-----+----+----+
Para evitar la evaluación cambiante de monotonically_increasing_id (), puede intentar escribir el marco de datos en el disco y volver a leerlo. Entonces la columna de identificación ahora es simplemente un campo de datos que se está leyendo, en lugar de calcularse dinámicamente en algún punto de la tubería. Aunque es una solución bastante fea, funcionó cuando hice una prueba rápida.
Para obtener un mejor rendimiento con la solución Chris T, puede intentar escribir en un marco de datos compartido apache ignite en lugar de escribir en el disco. https://ignite.apache.org/use-cases/spark/shared-memory-layer.html
Recientemente estuve trabajando en un problema similar.
Aunque
monotonically_increasing_id()
es muy rápido, no es confiable y no le dará números de fila consecutivos, solo aumentará los enteros únicos.
Crear una partición de Windows y luego usar
row_number().over(some_windows_partition)
consume mucho tiempo.
La mejor solución hasta ahora es usar un archivo comprimido con índice y luego convertir el archivo comprimido nuevamente al marco de datos original, con el nuevo esquema que incluye la columna de índice.
Prueba esto:
row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)
Donde
original_dataframe
es el
dataframe
que tiene que agregar un índice y
row_with_index
es el nuevo esquema con el índice de columna que puede escribir como
import org.apache.spark.sql.functions.{row_number}
import org.apache.spark.sql.expressions.Window
val df1: DataFrame = df.withColumn("Id",lit(1))
df1
.select(
...,
row_number()
.over(Window
.partitionBy("Id"
.orderBy(col("...").desc))
)
.alias("Row_Nbr")
)
Aquí,
calendar_date
,
year_week_number
,
year_period_number
y
realization
fueron las columnas de mi
dataframe
original.
Puede reemplazar los nombres con los nombres de sus columnas.
El índice es el nuevo nombre de columna que tuvo que agregar para los números de fila.
Este proceso es en gran medida más eficiente y más suave en comparación con el
row_number().over(some_windows_partition)
.
Espero que esto ayude.
Spark 2.0
-
Este problema se ha resuelto en Spark 2.0 con SPARK-14241 .
-
Otro problema similar se ha resuelto en Spark 2.1 con SPARK-14393
Spark 1.x
El problema que experimenta es bastante sutil, pero puede reducirse a un hecho simple
monotonically_increasing_id
es una función extremadamente fea.
Claramente no es puro y su valor depende de algo que está completamente fuera de su control.
No toma ningún parámetro, por lo que desde la perspectiva del optimizador no importa cuándo se llama y se puede enviar después de todas las demás operaciones. De ahí el comportamiento que ves.
Si observa el código, descubrirá que esto se marca explícitamente al extender la expresión
MonotonicallyIncreasingID
con
Nondeterministic
.
No creo que haya una solución elegante, pero una forma de manejar esto es agregar una dependencia artificial en el valor filtrado.
Por ejemplo con un UDF como este:
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
bound = udf(lambda _, v: v, LongType())
(df
.withColumn("rn", monotonically_increasing_id())
# Due to nondeterministic behavior it has to be a separate step
.withColumn("rn", bound("P", "rn"))
.where("P"))
En general, podría ser más limpio agregar índices usando
zipWithIndex
en un
RDD
y luego convertirlo nuevamente en un
DataFrame
.
* La solución que se muestra arriba ya no es una solución válida (ni requerida) en Spark 2.x donde las UDF de Python están sujetas a las optimizaciones del plan de ejecución.