apache-spark - last - spark dependencies
FunciĆ³n de ventana Spark SQL con condiciĆ³n compleja (1)
Aquí está el truco. Importar un conjunto de funciones:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
Definir ventanas:
val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")
Encuentre los puntos donde comienzan nuevas sesiones:
val newSession = (coalesce(
datediff($"login_date", lag($"login_date", 1).over(userWindow)),
lit(0)
) > 5).cast("bigint")
val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
Encuentre la fecha más temprana por sesión:
val result = sessionized
.withColumn("became_active", min($"login_date").over(userSessionWindow))
.drop("session")
Con el conjunto de datos definido como:
val df = Seq(
("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")
El resultado es:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14| 2012-01-11|
|SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
+----------------+----------+-------------+
Esto es probablemente más fácil de explicar a través del ejemplo. Supongamos que tengo un DataFrame de inicios de sesión de usuario en un sitio web, por ejemplo:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
Me gustaría agregar a esto una columna que indica cuándo se convirtieron en usuarios activos en el sitio. Pero hay una advertencia: hay un período de tiempo durante el cual un usuario se considera activo, y después de este período, si vuelve a iniciar sesión, se restablece su fecha de became_active
. Supongamos que este período es de 5 días . Entonces la tabla deseada derivada de la tabla anterior sería algo como esto:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
Así que, en particular, la fecha de was_active de became_active
se restableció porque su segundo inicio de sesión se produjo después de que expiró el período activo, pero la fecha de inicio de la became_active
de became_active
no se restableció la segunda vez que se conectó, porque cayó dentro del período activo.
Mi idea inicial era usar funciones de ventana con lag
, y luego usar los valores de lag
para llenar la columna de became_active
; por ejemplo, algo que empieza aproximadamente así:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window)
Entonces, la regla para completar la fecha de became_active
sería, si tmp
es null
(es decir, si es el primer inicio de sesión) o si login_date - tmp >= 5
then became_active = login_date
; de lo contrario, vaya al siguiente valor más reciente en tmp
y aplique la misma regla. Esto sugiere un enfoque recursivo, que tengo problemas para imaginar una forma de implementarlo.
Mis preguntas: ¿Es este un enfoque viable, y si es así, cómo puedo "volver" y mirar los valores anteriores de tmp
hasta que encuentre uno donde pare? No puedo, a mi conocimiento, iterar a través de los valores de una Column
Spark SQL. ¿Hay alguna otra forma de lograr este resultado?