tutorial - Chispa/Scala: relleno hacia adelante con última observación
spark sql tutorial (1)
Utilizando Spark 1.4.0, Scala 2.10
He estado tratando de encontrar una manera de reenviar los valores nulos con la última observación conocida, pero no veo una manera fácil. Creo que esto es algo muy común de hacer, pero no puedo encontrar un ejemplo que muestre cómo hacer esto.
Veo funciones para reenviar el relleno NaN con un valor, o funciones de retardo / avance para llenar o cambiar los datos mediante un desplazamiento, pero nada para recoger el último valor conocido.
Al mirar en línea, veo muchas preguntas y respuestas acerca de lo mismo en R, pero no en Spark / Scala.
Estaba pensando en mapear en un rango de fechas, filtrar los NaNs de los resultados y elegir el último elemento, pero creo que estoy confundido acerca de la sintaxis.
usando DataFrames intento algo como
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
Pero eso no me lleva a ningún lado.
La parte del filtro no funciona; la función de mapa devuelve una secuencia de spark.sql.Columns, pero la función de filtro espera devolver un valor booleano, por lo que necesito obtener un valor de la columna para probar, pero parece que solo hay métodos de columna que devuelven una columna.
¿Hay alguna manera de hacer esto más ''simplemente'' en Spark?
Gracias por tu contribución
EDITAR:
Ejemplo simple de entrada de ejemplo:
2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
Rendimiento esperado:
2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
NOTA: 1) Tengo muchas columnas, muchas de las cuales tienen este patrón de datos faltantes, pero no en la misma fecha / hora. Si es necesario, haré la transformación una columna a la vez.
EDITAR :
Siguiendo la respuesta de @ zero323 intenté de esta manera:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = { !row.isNullAt(1) }
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
val toCarryBd = sc.broadcast(toCarry)
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }
val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}
La variable de difusión termina como una lista de valores sin nulos. Eso es progreso pero todavía no puedo hacer que el mapeo funcione. pero no consigo nada, porque el índice i
en el no se asigna a los datos originales, se asigna al subconjunto sin nulo.
¿Que me estoy perdiendo aqui?
EDITAR y solución (como se deduce de la respuesta de @ zero323):
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))
Consulte la respuesta de zero323 a continuación para obtener más opciones si está usando RDD en lugar de DataFrames. La solución anterior puede no ser la más eficiente, pero funciona para mí. Si está buscando optimizar, eche un vistazo a la solución RDD.
Respuesta inicial (un solo supuesto de serie de tiempo):
En primer lugar, intente evitar las funciones de la ventana si no puede proporcionar la cláusula PARTITION BY
. Mueve los datos a una sola partición, por lo que la mayoría de las veces simplemente no es factible.
Lo que puede hacer es llenar los vacíos en RDD
utilizando mapPartitionsWithIndex
. Dado que no proporcionó un ejemplo de datos o el resultado esperado, considere que se trata de un pseudocódigo no un programa Scala real:
Primero vamos a ordenar
DataFrame
por fecha y convertir aRDD
import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD val rows: RDD[Row] = df.orderBy($"Date").rdd
A continuación, encontremos la última observación no nula por partición.
def notMissing(row: Row): Boolean = ??? val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows .mapPartitionsWithIndex{ case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) } .collectAsMap
y convertir este
Map
para transmitirval toCarryBd = sc.broadcast(toCarry)
Finalmente mapear sobre particiones una vez más llenando los huecos:
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { // If it is the beginning of partition and value is missing // extract value to fill from toCarryBd.value // Remember to correct for empty / only missing partitions // otherwise take last not-null from the current partition } val imputed: RDD[Row] = rows .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) }
finalmente convertir de nuevo a DataFrame
Editar (particiones / series de tiempo por datos de grupo):
El diablo está en el detalle. Si, después de todo, sus datos están particionados, se puede resolver un problema completo usando groupBy
. Supongamos que simplemente particiona por la columna "v" de tipo T
y la Date
es una marca de tiempo entera:
def fill(iter: List[Row]): List[Row] = {
// Just go row by row and fill with last non-empty value
???
}
val groupedAndSorted = df.rdd
.groupBy(_.getAs[T]("k"))
.mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
val dfFilled = sqlContext.createDataFrame(rows, df.schema)
De esta manera puedes rellenar todas las columnas al mismo tiempo.
¿Se puede hacer esto con DataFrames en lugar de convertirlos a RDD?
Depende, aunque es poco probable que sea eficiente. Si el espacio máximo es relativamente pequeño, puedes hacer algo como esto:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column
val maxGap: Int = ??? // Maximum gap between observations
val columnsToFill: List[String] = ??? // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed
// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
// Generate lag values between 1 and maxGap
val lags = (1 to maxGap).map(lag(col(c), _)over(w))
// Add current, coalesce and set alias
coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}
// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))
// Finally select
val dfImputed = df.select($"*" :: lags: _*)
Se puede ajustar fácilmente para usar diferentes espacios máximos por columna.
Una forma más sencilla de lograr un resultado similar en la última versión de Spark es usar el last
con ignoreNulls
:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"k").orderBy($"Date")
.rowsBetween(Window.unboundedPreceding, -1)
df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
Si bien es posible eliminar la cláusula partitionBy
y aplicar este método globalmente, sería prohibitivamente costoso con grandes conjuntos de datos.