scala - transformations - ¿Cómo definir y usar una función agregada definida por el usuario en Spark SQL?
spark transformations and actions (1)
Métodos admitidos
Chispa> = 2.3
Udf vectorizado (solo Python):
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import *
import pandas as pd
df = sc.parallelize([
("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])
def below_threshold(threshold, group="group", power="power"):
@pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
def below_threshold_(df):
df = pd.DataFrame(
df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
df.reset_index(inplace=True, drop=False)
return df
return below_threshold_
Ejemplo de uso:
df.groupBy("group").apply(below_threshold(-40)).show()
## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## | b| true|
## | a| false|
## +-----+---------------+
Consulte también Aplicar UDF en GroupedData en PySpark (con ejemplo de Python en funcionamiento)
Spark> = 2.0 (opcionalmente 1.6 pero con API ligeramente diferente):
Es posible usar
Aggregators
en
Datasets
escritos:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]
with Serializable {
def zero = false
def reduce(acc: Boolean, x: I) = acc | f(x)
def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
def finish(acc: Boolean) = acc
def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}
val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)
Chispa> = 1.5 :
En Spark 1.5 puede crear UDAF de esta manera, aunque probablemente sea una exageración:
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
object belowThreshold extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("power", IntegerType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("ind", BooleanType)
// Returned type
def dataType = BooleanType
// Self-explaining
def deterministic = true
// zero value
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
// Similar to seqOp in aggregate
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
}
// Similar to combOp in aggregate
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))
}
// Called on exit to get return value
def evaluate(buffer: Row) = buffer.getBoolean(0)
}
Ejemplo de uso:
df
.groupBy($"group")
.agg(belowThreshold($"power").alias("belowThreshold"))
.show
// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+
Solución de Spark 1.4 :
No estoy seguro de si entiendo correctamente sus requisitos, pero por lo que puedo decir, la agregación antigua debería ser suficiente aquí:
val df = sc.parallelize(Seq(
("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")
df
.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
.groupBy($"group")
.agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
.show
// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+
Chispa <= 1.4 :
Hasta donde sé, en este momento (Spark 1.4.1), no hay soporte para UDAF, aparte de los de Hive. Debería ser posible con Spark 1.5 (ver SPARK-3947 ).
Métodos internos / no admitidos
Internamente, Spark utiliza una serie de clases que incluyen
ImperativeAggregates
y
DeclarativeAggregates
.
Están destinados para uso interno y pueden cambiar sin previo aviso, por lo que probablemente no sea algo que desee usar en su código de producción, pero solo para completar por
BelowThreshold
con
DeclarativeAggregate
podría implementarse de esta manera (probado con Spark 2.2-SNAPSHOT):
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
case class BelowThreshold(child: Expression, threshold: Expression)
extends DeclarativeAggregate {
override def children: Seq[Expression] = Seq(child, threshold)
override def nullable: Boolean = false
override def dataType: DataType = BooleanType
private lazy val belowThreshold = AttributeReference(
"belowThreshold", BooleanType, nullable = false
)()
// Used to derive schema
override lazy val aggBufferAttributes = belowThreshold :: Nil
override lazy val initialValues = Seq(
Literal(false)
)
override lazy val updateExpressions = Seq(Or(
belowThreshold,
If(IsNull(child), Literal(false), LessThan(child, threshold))
))
override lazy val mergeExpressions = Seq(
Or(belowThreshold.left, belowThreshold.right)
)
override lazy val evaluateExpression = belowThreshold
override def defaultResult: Option[Literal] = Option(Literal(false))
}
Se debe envolver con un equivalente de
withAggregateFunction
.
Sé cómo escribir un UDF en Spark SQL:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
¿Puedo hacer algo similar para definir una función agregada? ¿Cómo se hace esto?
Para el contexto, quiero ejecutar la siguiente consulta SQL:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
Debería devolver algo como
Row(span1, false, T0)
Quiero que la función de agregado me diga si hay algún valor para
opticalReceivePower
en los grupos definidos por
span
y
timestamp
que están por debajo del umbral.
¿Debo escribir mi UDAF de manera diferente al UDF que pegué arriba?