apache spark - read - Spark-Ventana con recursividad?-Propagando valores de manera condicional en las filas
spark sql java (1)
Las funciones de la ventana no son compatibles con la recursión, pero no es necesaria aquí. Este tipo de sesionización se puede manejar fácilmente con suma acumulativa:
from pyspark.sql.functions import col, sum, when, lag
from pyspark.sql.window import Window
w = Window.partitionBy("user_id").orderBy("visit_id")
purch_id = sum(lag(when(
col("revenue") > 0, 1).otherwise(0),
1, 0
).over(w)).over(w) + 1
df.withColumn("purch_id", purch_id).show()
+-------+--------+-------+--------+
|user_id|visit_id|revenue|purch_id|
+-------+--------+-------+--------+
| 1| 1| 0| 1|
| 1| 2| 0| 1|
| 1| 3| 0| 1|
| 1| 4| 100| 1|
| 1| 5| 0| 2|
| 1| 6| 0| 2|
| 1| 7| 200| 2|
| 1| 8| 0| 3|
| 1| 9| 10| 3|
+-------+--------+-------+--------+
Tengo el siguiente marco de datos que muestra los ingresos de las compras.
+-------+--------+-------+
|user_id|visit_id|revenue|
+-------+--------+-------+
| 1| 1| 0|
| 1| 2| 0|
| 1| 3| 0|
| 1| 4| 100|
| 1| 5| 0|
| 1| 6| 0|
| 1| 7| 200|
| 1| 8| 0|
| 1| 9| 10|
+-------+--------+-------+
En última instancia, quiero que la nueva columna purch_revenue
muestre los ingresos generados por la compra en cada fila. Como solución alternativa, también he intentado introducir un identificador de compra purch_id
que se incrementa cada vez que se realiza una compra. Entonces esto está listado solo como una referencia.
+-------+--------+-------+-------------+--------+
|user_id|visit_id|revenue|purch_revenue|purch_id|
+-------+--------+-------+-------------+--------+
| 1| 1| 0| 100| 1|
| 1| 2| 0| 100| 1|
| 1| 3| 0| 100| 1|
| 1| 4| 100| 100| 1|
| 1| 5| 0| 100| 2|
| 1| 6| 0| 100| 2|
| 1| 7| 200| 100| 2|
| 1| 8| 0| 100| 3|
| 1| 9| 10| 100| 3|
+-------+--------+-------+-------------+--------+
Intenté usar la función lag/lead
esta manera:
user_timeline = Window.partitionBy("user_id").orderBy("visit_id")
find_rev = fn.when(fn.col("revenue") > 0,fn.col("revenue"))/
.otherwise(fn.lead(fn.col("revenue"), 1).over(user_timeline))
df.withColumn("purch_revenue", find_rev)
Esto duplica la columna de ingresos si los revenue > 0
y también lo detiene por una fila. Claramente, puedo encadenar esto para un N finito, pero esa no es una solución.
- ¿Hay alguna forma de aplicar esto recursivamente hasta que los
revenue > 0
? - Alternativamente, ¿hay alguna manera de incrementar un valor basado en una condición? Intenté encontrar una manera de hacerlo, pero tuve problemas para encontrar uno.