sql - Funciones de ventana de chispa-rango entre fechas
apache-spark pyspark (1)
Tengo un Spark SQL
DataFrame
con datos y lo que intento obtener son todas las filas que preceden a la fila actual en un rango de fechas determinado.
Entonces, por ejemplo, quiero tener todas las filas de 7 días anteriores a la fila dada.
Descubrí que necesito usar una
Window Function
como:
Window /
.partitionBy(''id'') /
.orderBy(''start'')
Y aquí viene el problema.
Quiero tener un
rangeBetween
7 días, pero no hay nada en los documentos de Spark que pueda encontrar sobre esto.
¿Spark incluso ofrece esa opción?
Por ahora solo obtengo todas las filas anteriores con:
.rowsBetween(-sys.maxsize, 0)
pero me gustaría lograr algo como:
.rangeBetween("7 days", 0)
Si alguien pudiera ayudarme con esto, estaría muy agradecido. ¡Gracias por adelantado!
Chispa> = 2.3
Desde Spark 2.3 es posible usar objetos de intervalo usando la API SQL, pero el
DataFrame
API
DataFrame
todavía
está
en progreso
.
df.createOrReplaceTempView("df")
spark.sql(
"""SELECT *, mean(some_value) OVER (
PARTITION BY id
ORDER BY CAST(start AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS mean FROM df""").show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
Chispa <2.3
Hasta donde yo sé, no es posible directamente ni en Spark ni en Hive.
Ambos requieren que la cláusula
ORDER BY
utilizada con
RANGE
sea numérica.
Lo más parecido que encontré es la conversión a marca de tiempo y operación en segundos.
Suponiendo que la columna de
start
contiene el tipo de
date
:
from pyspark.sql import Row
row = Row("id", "start", "some_value")
df = sc.parallelize([
row(1, "2015-01-01", 20.0),
row(1, "2015-01-06", 10.0),
row(1, "2015-01-07", 25.0),
row(1, "2015-01-12", 30.0),
row(2, "2015-01-01", 5.0),
row(2, "2015-01-03", 30.0),
row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))
Una pequeña ayuda y definición de ventana:
from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col
# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400
Finalmente consulta:
w = (Window()
.partitionBy(col("id"))
.orderBy(col("start").cast("timestamp").cast("long"))
.rangeBetween(-days(7), 0))
df.select(col("*"), mean("some_value").over(w).alias("mean")).show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
Lejos de ser bonita pero funciona.