spark last example core_2 apache-spark apache-spark-sql window-functions

apache-spark - last - spark dependencies



FunciĆ³n de ventana Spark SQL con condiciĆ³n compleja (1)

Aquí está el truco. Importar un conjunto de funciones:

import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}

Definir ventanas:

val userWindow = Window.partitionBy("user_name").orderBy("login_date") val userSessionWindow = Window.partitionBy("user_name", "session")

Encuentre los puntos donde comienzan nuevas sesiones:

val newSession = (coalesce( datediff($"login_date", lag($"login_date", 1).over(userWindow)), lit(0) ) > 5).cast("bigint") val sessionized = df.withColumn("session", sum(newSession).over(userWindow))

Encuentre la fecha más temprana por sesión:

val result = sessionized .withColumn("became_active", min($"login_date").over(userSessionWindow)) .drop("session")

Con el conjunto de datos definido como:

val df = Seq( ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"), ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"), ("SirChillingtonIV", "2012-08-11") ).toDF("user_name", "login_date")

El resultado es:

+----------------+----------+-------------+ | user_name|login_date|became_active| +----------------+----------+-------------+ | OprahWinfreyJr|2012-01-10| 2012-01-10| |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user |SirChillingtonIV|2012-01-14| 2012-01-11| |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user |Booooooo99900098|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-06| 2012-01-04| +----------------+----------+-------------+

Esto es probablemente más fácil de explicar a través del ejemplo. Supongamos que tengo un DataFrame de inicios de sesión de usuario en un sitio web, por ejemplo:

scala> df.show(5) +----------------+----------+ | user_name|login_date| +----------------+----------+ |SirChillingtonIV|2012-01-04| |Booooooo99900098|2012-01-04| |Booooooo99900098|2012-01-06| | OprahWinfreyJr|2012-01-10| |SirChillingtonIV|2012-01-11| +----------------+----------+ only showing top 5 rows

Me gustaría agregar a esto una columna que indica cuándo se convirtieron en usuarios activos en el sitio. Pero hay una advertencia: hay un período de tiempo durante el cual un usuario se considera activo, y después de este período, si vuelve a iniciar sesión, se restablece su fecha de became_active . Supongamos que este período es de 5 días . Entonces la tabla deseada derivada de la tabla anterior sería algo como esto:

+----------------+----------+-------------+ | user_name|login_date|became_active| +----------------+----------+-------------+ |SirChillingtonIV|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-06| 2012-01-04| | OprahWinfreyJr|2012-01-10| 2012-01-10| |SirChillingtonIV|2012-01-11| 2012-01-11| +----------------+----------+-------------+

Así que, en particular, la fecha de was_active de became_active se restableció porque su segundo inicio de sesión se produjo después de que expiró el período activo, pero la fecha de inicio de la became_active de became_active no se restableció la segunda vez que se conectó, porque cayó dentro del período activo.

Mi idea inicial era usar funciones de ventana con lag , y luego usar los valores de lag para llenar la columna de became_active ; por ejemplo, algo que empieza aproximadamente así:

import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val window = Window.partitionBy("user_name").orderBy("login_date") val df2 = df.withColumn("tmp", lag("login_date", 1).over(window)

Entonces, la regla para completar la fecha de became_active sería, si tmp es null (es decir, si es el primer inicio de sesión) o si login_date - tmp >= 5 then became_active = login_date ; de lo contrario, vaya al siguiente valor más reciente en tmp y aplique la misma regla. Esto sugiere un enfoque recursivo, que tengo problemas para imaginar una forma de implementarlo.

Mis preguntas: ¿Es este un enfoque viable, y si es así, cómo puedo "volver" y mirar los valores anteriores de tmp hasta que encuentre uno donde pare? No puedo, a mi conocimiento, iterar a través de los valores de una Column Spark SQL. ¿Hay alguna otra forma de lograr este resultado?