tutorial transformations textfile spark parallelize español ejemplo and actions scala apache-spark apache-spark-sql aggregate-functions user-defined-functions

scala - transformations - ¿Cómo definir y usar una función agregada definida por el usuario en Spark SQL?



spark transformations and actions (1)

Métodos admitidos

Chispa> = 2.3

Udf vectorizado (solo Python):

from pyspark.sql.functions import pandas_udf from pyspark.sql.functions import PandasUDFType from pyspark.sql.types import * import pandas as pd df = sc.parallelize([ ("a", 0), ("a", 1), ("b", 30), ("b", -50) ]).toDF(["group", "power"]) def below_threshold(threshold, group="group", power="power"): @pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP) def below_threshold_(df): df = pd.DataFrame( df.groupby(group).apply(lambda x: (x[power] < threshold).any())) df.reset_index(inplace=True, drop=False) return df return below_threshold_

Ejemplo de uso:

df.groupBy("group").apply(below_threshold(-40)).show() ## +-----+---------------+ ## |group|below_threshold| ## +-----+---------------+ ## | b| true| ## | a| false| ## +-----+---------------+

Consulte también Aplicar UDF en GroupedData en PySpark (con ejemplo de Python en funcionamiento)

Spark> = 2.0 (opcionalmente 1.6 pero con API ligeramente diferente):

Es posible usar Aggregators en Datasets escritos:

import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Encoder, Encoders} class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean] with Serializable { def zero = false def reduce(acc: Boolean, x: I) = acc | f(x) def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2 def finish(acc: Boolean) = acc def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean } val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)

Chispa> = 1.5 :

En Spark 1.5 puede crear UDAF de esta manera, aunque probablemente sea una exageración:

import org.apache.spark.sql.expressions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.Row object belowThreshold extends UserDefinedAggregateFunction { // Schema you get as an input def inputSchema = new StructType().add("power", IntegerType) // Schema of the row which is used for aggregation def bufferSchema = new StructType().add("ind", BooleanType) // Returned type def dataType = BooleanType // Self-explaining def deterministic = true // zero value def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false) // Similar to seqOp in aggregate def update(buffer: MutableAggregationBuffer, input: Row) = { if (!input.isNullAt(0)) buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40) } // Similar to combOp in aggregate def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0)) } // Called on exit to get return value def evaluate(buffer: Row) = buffer.getBoolean(0) }

Ejemplo de uso:

df .groupBy($"group") .agg(belowThreshold($"power").alias("belowThreshold")) .show // +-----+--------------+ // |group|belowThreshold| // +-----+--------------+ // | a| false| // | b| true| // +-----+--------------+

Solución de Spark 1.4 :

No estoy seguro de si entiendo correctamente sus requisitos, pero por lo que puedo decir, la agregación antigua debería ser suficiente aquí:

val df = sc.parallelize(Seq( ("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power") df .withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType)) .groupBy($"group") .agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold")) .show // +-----+--------------+ // |group|belowThreshold| // +-----+--------------+ // | a| false| // | b| true| // +-----+--------------+

Chispa <= 1.4 :

Hasta donde sé, en este momento (Spark 1.4.1), no hay soporte para UDAF, aparte de los de Hive. Debería ser posible con Spark 1.5 (ver SPARK-3947 ).

Métodos internos / no admitidos

Internamente, Spark utiliza una serie de clases que incluyen ImperativeAggregates y DeclarativeAggregates .

Están destinados para uso interno y pueden cambiar sin previo aviso, por lo que probablemente no sea algo que desee usar en su código de producción, pero solo para completar por BelowThreshold con DeclarativeAggregate podría implementarse de esta manera (probado con Spark 2.2-SNAPSHOT):

import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ case class BelowThreshold(child: Expression, threshold: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = Seq(child, threshold) override def nullable: Boolean = false override def dataType: DataType = BooleanType private lazy val belowThreshold = AttributeReference( "belowThreshold", BooleanType, nullable = false )() // Used to derive schema override lazy val aggBufferAttributes = belowThreshold :: Nil override lazy val initialValues = Seq( Literal(false) ) override lazy val updateExpressions = Seq(Or( belowThreshold, If(IsNull(child), Literal(false), LessThan(child, threshold)) )) override lazy val mergeExpressions = Seq( Or(belowThreshold.left, belowThreshold.right) ) override lazy val evaluateExpression = belowThreshold override def defaultResult: Option[Literal] = Option(Literal(false)) }

Se debe envolver con un equivalente de withAggregateFunction .

Sé cómo escribir un UDF en Spark SQL:

def belowThreshold(power: Int): Boolean = { return power < -40 } sqlContext.udf.register("belowThreshold", belowThreshold _)

¿Puedo hacer algo similar para definir una función agregada? ¿Cómo se hace esto?

Para el contexto, quiero ejecutar la siguiente consulta SQL:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp FROM ifDF WHERE opticalReceivePower IS NOT null GROUP BY span, timestamp ORDER BY span""")

Debería devolver algo como

Row(span1, false, T0)

Quiero que la función de agregado me diga si hay algún valor para opticalReceivePower en los grupos definidos por span y timestamp que están por debajo del umbral. ¿Debo escribir mi UDAF de manera diferente al UDF que pegué arriba?