scala performance apache-spark apache-spark-sql user-defined-functions

scala - Spark UDAF con ArrayType como problemas de rendimiento de bufferSchema



performance apache-spark (1)

Estoy trabajando en un UDAF que devuelve una variedad de elementos.

La entrada para cada actualización es una tupla de índice y valor.

Lo que hace el UDAF es sumar todos los valores bajo el mismo índice.

Ejemplo:

Para entrada (índice, valor): (2,1), (3,1), (2,3)

debería volver (0,0,4,1, ..., 0)

La lógica funciona bien, pero tengo un problema con el método de actualización , mi implementación solo actualiza 1 celda para cada fila , pero la última asignación en ese método en realidad copia toda la matriz , lo cual es redundante y consume mucho tiempo.

Esta tarea sola es responsable del 98% del tiempo de ejecución de mi consulta .

Mi pregunta es, ¿cómo puedo reducir ese tiempo? ¿Es posible asignar 1 valor en la matriz del búfer sin tener que reemplazar todo el búfer?

PD: Estoy trabajando con Spark 1.6, y no puedo actualizarlo en el corto plazo, por lo tanto, quédese con una solución que funcione con esta versión.

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{ val bucketSize = 1000 def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil) def dataType: DataType = ArrayType(LongType) def deterministic: Boolean = true def bufferSchema: StructType = { StructType( StructField("buckets", ArrayType(LongType)) :: Nil ) } override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = new Array[Long](bucketSize) } override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val index = input.getLong(0) val value = input.getLong(1) val arr = buffer.getAs[mutable.WrappedArray[Long]](0) buffer(0) = arr // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell) } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0) val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0) for(i <- arr1.indices){ arr1.update(i, arr1(i) + arr2(i)) } buffer1(0) = arr1 } override def evaluate(buffer: Row): Any = { buffer.getAs[mutable.WrappedArray[Long]](0) } }


TL; DR No use UDAF o use tipos primitivos en lugar de ArrayType .

Sin función UserDefinedFunction

Ambas soluciones deberían saltear el costoso malabarismo entre la representación interna y externa.

Usando agregados estándar y pivot

Esto utiliza agregaciones SQL estándar. Si bien está optimizado internamente, puede ser costoso cuando aumenta el número de claves y el tamaño de la matriz.

Entrada dada:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")

Usted puede:

import org.apache.spark.sql.functions.{array, coalesce, col, lit} val nBuckets = 10 @transient val values = array( 0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _* ) df .groupBy("id") .pivot("index", 0 until nBuckets) .sum("value") .select($"id", values.alias("values"))

+---+--------------------+ | id| values| +---+--------------------+ | 1|[0, 0, 4, 1, 0, 0...| +---+--------------------+

Uso de la API RDD con combineByKey / aggregateByKey .

Agregación byKey simple y antigua con buffer mutable. Sin campanas y silbatos, pero debería funcionar razonablemente bien con una amplia gama de entradas. Si sospecha que la entrada es escasa, puede considerar una representación intermedia más eficiente, como Map mutable.

rdd .aggregateByKey(Array.fill(nBuckets)(0L))( { case (acc, (index, value)) => { acc(index) += value; acc }}, (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1} ).toDF

+---+--------------------+ | _1| _2| +---+--------------------+ | 1|[0, 0, 4, 1, 0, 0...| +---+--------------------+

Uso de la función UserDefinedFunction por el UserDefinedFunction con tipos primitivos

Hasta donde yo entiendo los ArrayConverter.toCatalystImpl internos, el cuello de botella de rendimiento es ArrayConverter.toCatalystImpl .

Parece que se llama para cada llamada MutableAggregationBuffer.update y, a su vez, asigna nuevos GenericArrayData para cada Row .

Si redefinimos bufferSchema como:

def bufferSchema: StructType = { StructType( 0 to nBuckets map (i => StructField(s"x$i", LongType)) ) }

tanto la update como la merge se pueden expresar como reemplazos simples de valores primitivos en el búfer. La cadena de llamadas seguirá siendo bastante larga, pero no requerirá copias / conversiones y asignaciones locas. Omitiendo cheques null , necesitarás algo similar a

val index = input.getLong(0) buffer.update(index, buffer.getLong(index) + input.getLong(1))

y

for(i <- 0 to nBuckets){ buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i)) }

respectivamente.

Finalmente evaluate debería tomar Row y convertirlo a la salida Seq :

for (i <- 0 to nBuckets) yield buffer.getLong(i)

Tenga en cuenta que en esta implementación se merge un posible cuello de botella. Si bien no debería presentar ningún problema de rendimiento nuevo, con M buckets, cada llamada a merge es O (M) .

Con K claves únicas y particiones P , se llamará M * K veces en el peor de los casos, donde cada clave se produce al menos una vez en cada partición. Esto aumenta efectivamente la complicidad del componente de merge a O (M * N * K) .

En general, no hay mucho que pueda hacer al respecto. Sin embargo, si realiza suposiciones específicas sobre la distribución de datos (los datos son escasos, la distribución de claves es uniforme), puede atajar un poco las cosas y mezclar primero:

df .repartition(n, $"key") .groupBy($"key") .agg(SumArrayAtIndexUDAF($"index", $"value"))

Si se cumplen los supuestos, debería:

  • Reduzca de forma contraintuitiva el tamaño aleatorio barajando pares dispersos, en lugar de Rows densas en forma de matriz.
  • Agregue datos usando solo actualizaciones (cada O (1) ) posiblemente tocando solo como un subconjunto de índices.

Sin embargo, si uno o ambos supuestos no se satisfacen, puede esperar que el tamaño aleatorio aumente mientras que el número de actualizaciones se mantendrá igual. Al mismo tiempo, los sesgos de datos pueden empeorar aún más las cosas en el escenario de update , merge shuffle y merge .

Uso del Aggregator con un Dataset con un tipo "fuerte"

import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{Encoder, Encoders} class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int) extends Aggregator[I, Array[Long], Seq[Long]] with Serializable { def zero = Array.fill(bucketSize)(0L) def reduce(acc: Array[Long], x: I) = { val (i, v) = f(x) acc(i) += v acc } def merge(acc1: Array[Long], acc2: Array[Long]) = { for { i <- 0 until bucketSize } acc1(i) += acc2(i) acc1 } def finish(acc: Array[Long]) = acc.toSeq def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]] def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder() }

que podría usarse como se muestra a continuación

val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS ds .groupByKey(_._1) .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn) .show(false)

+-----+-------------------------------+ |value|SumArrayAtIndex(scala.Tuple2) | +-----+-------------------------------+ |1 |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] | |2 |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]| +-----+-------------------------------+