structtype spark read apache-spark pyspark pyspark-sql

apache spark - read - Spark-Ventana con recursividad?-Propagando valores de manera condicional en las filas



spark sql java (1)

Las funciones de la ventana no son compatibles con la recursión, pero no es necesaria aquí. Este tipo de sesionización se puede manejar fácilmente con suma acumulativa:

from pyspark.sql.functions import col, sum, when, lag from pyspark.sql.window import Window w = Window.partitionBy("user_id").orderBy("visit_id") purch_id = sum(lag(when( col("revenue") > 0, 1).otherwise(0), 1, 0 ).over(w)).over(w) + 1 df.withColumn("purch_id", purch_id).show()

+-------+--------+-------+--------+ |user_id|visit_id|revenue|purch_id| +-------+--------+-------+--------+ | 1| 1| 0| 1| | 1| 2| 0| 1| | 1| 3| 0| 1| | 1| 4| 100| 1| | 1| 5| 0| 2| | 1| 6| 0| 2| | 1| 7| 200| 2| | 1| 8| 0| 3| | 1| 9| 10| 3| +-------+--------+-------+--------+

Tengo el siguiente marco de datos que muestra los ingresos de las compras.

+-------+--------+-------+ |user_id|visit_id|revenue| +-------+--------+-------+ | 1| 1| 0| | 1| 2| 0| | 1| 3| 0| | 1| 4| 100| | 1| 5| 0| | 1| 6| 0| | 1| 7| 200| | 1| 8| 0| | 1| 9| 10| +-------+--------+-------+

En última instancia, quiero que la nueva columna purch_revenue muestre los ingresos generados por la compra en cada fila. Como solución alternativa, también he intentado introducir un identificador de compra purch_id que se incrementa cada vez que se realiza una compra. Entonces esto está listado solo como una referencia.

+-------+--------+-------+-------------+--------+ |user_id|visit_id|revenue|purch_revenue|purch_id| +-------+--------+-------+-------------+--------+ | 1| 1| 0| 100| 1| | 1| 2| 0| 100| 1| | 1| 3| 0| 100| 1| | 1| 4| 100| 100| 1| | 1| 5| 0| 100| 2| | 1| 6| 0| 100| 2| | 1| 7| 200| 100| 2| | 1| 8| 0| 100| 3| | 1| 9| 10| 100| 3| +-------+--------+-------+-------------+--------+

Intenté usar la función lag/lead esta manera:

user_timeline = Window.partitionBy("user_id").orderBy("visit_id") find_rev = fn.when(fn.col("revenue") > 0,fn.col("revenue"))/ .otherwise(fn.lead(fn.col("revenue"), 1).over(user_timeline)) df.withColumn("purch_revenue", find_rev)

Esto duplica la columna de ingresos si los revenue > 0 y también lo detiene por una fila. Claramente, puedo encadenar esto para un N finito, pero esa no es una solución.

  • ¿Hay alguna forma de aplicar esto recursivamente hasta que los revenue > 0 ?
  • Alternativamente, ¿hay alguna manera de incrementar un valor basado en una condición? Intenté encontrar una manera de hacerlo, pero tuve problemas para encontrar uno.