sql apache-spark pyspark apache-spark-sql window-functions

sql - Funciones de ventana de chispa-rango entre fechas



apache-spark pyspark (1)

Tengo un Spark SQL DataFrame con datos y lo que intento obtener son todas las filas que preceden a la fila actual en un rango de fechas determinado. Entonces, por ejemplo, quiero tener todas las filas de 7 días anteriores a la fila dada. Descubrí que necesito usar una Window Function como:

Window / .partitionBy(''id'') / .orderBy(''start'')

Y aquí viene el problema. Quiero tener un rangeBetween 7 días, pero no hay nada en los documentos de Spark que pueda encontrar sobre esto. ¿Spark incluso ofrece esa opción? Por ahora solo obtengo todas las filas anteriores con:

.rowsBetween(-sys.maxsize, 0)

pero me gustaría lograr algo como:

.rangeBetween("7 days", 0)

Si alguien pudiera ayudarme con esto, estaría muy agradecido. ¡Gracias por adelantado!


Chispa> = 2.3

Desde Spark 2.3 es posible usar objetos de intervalo usando la API SQL, pero el DataFrame API DataFrame todavía está en progreso .

df.createOrReplaceTempView("df") spark.sql( """SELECT *, mean(some_value) OVER ( PARTITION BY id ORDER BY CAST(start AS timestamp) RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW ) AS mean FROM df""").show() ## +---+----------+----------+------------------+ ## | id| start|some_value| mean| ## +---+----------+----------+------------------+ ## | 1|2015-01-01| 20.0| 20.0| ## | 1|2015-01-06| 10.0| 15.0| ## | 1|2015-01-07| 25.0|18.333333333333332| ## | 1|2015-01-12| 30.0|21.666666666666668| ## | 2|2015-01-01| 5.0| 5.0| ## | 2|2015-01-03| 30.0| 17.5| ## | 2|2015-02-01| 20.0| 20.0| ## +---+----------+----------+------------------+

Chispa <2.3

Hasta donde yo sé, no es posible directamente ni en Spark ni en Hive. Ambos requieren que la cláusula ORDER BY utilizada con RANGE sea ​​numérica. Lo más parecido que encontré es la conversión a marca de tiempo y operación en segundos. Suponiendo que la columna de start contiene el tipo de date :

from pyspark.sql import Row row = Row("id", "start", "some_value") df = sc.parallelize([ row(1, "2015-01-01", 20.0), row(1, "2015-01-06", 10.0), row(1, "2015-01-07", 25.0), row(1, "2015-01-12", 30.0), row(2, "2015-01-01", 5.0), row(2, "2015-01-03", 30.0), row(2, "2015-02-01", 20.0) ]).toDF().withColumn("start", col("start").cast("date"))

Una pequeña ayuda y definición de ventana:

from pyspark.sql.window import Window from pyspark.sql.functions import mean, col # Hive timestamp is interpreted as UNIX timestamp in seconds* days = lambda i: i * 86400

Finalmente consulta:

w = (Window() .partitionBy(col("id")) .orderBy(col("start").cast("timestamp").cast("long")) .rangeBetween(-days(7), 0)) df.select(col("*"), mean("some_value").over(w).alias("mean")).show() ## +---+----------+----------+------------------+ ## | id| start|some_value| mean| ## +---+----------+----------+------------------+ ## | 1|2015-01-01| 20.0| 20.0| ## | 1|2015-01-06| 10.0| 15.0| ## | 1|2015-01-07| 25.0|18.333333333333332| ## | 1|2015-01-12| 30.0|21.666666666666668| ## | 2|2015-01-01| 5.0| 5.0| ## | 2|2015-01-03| 30.0| 17.5| ## | 2|2015-02-01| 20.0| 20.0| ## +---+----------+----------+------------------+

Lejos de ser bonita pero funciona.

* Manual de idioma de colmena, tipos