scala - Spark UDAF con ArrayType como problemas de rendimiento de bufferSchema
performance apache-spark (1)
Estoy trabajando en un UDAF que devuelve una variedad de elementos.
La entrada para cada actualización es una tupla de índice y valor.
Lo que hace el UDAF es sumar todos los valores bajo el mismo índice.
Ejemplo:
Para entrada (índice, valor): (2,1), (3,1), (2,3)
debería volver (0,0,4,1, ..., 0)
La lógica funciona bien, pero tengo un problema con el método de actualización , mi implementación solo actualiza 1 celda para cada fila , pero la última asignación en ese método en realidad copia toda la matriz , lo cual es redundante y consume mucho tiempo.
Esta tarea sola es responsable del 98% del tiempo de ejecución de mi consulta .
Mi pregunta es, ¿cómo puedo reducir ese tiempo? ¿Es posible asignar 1 valor en la matriz del búfer sin tener que reemplazar todo el búfer?
PD: Estoy trabajando con Spark 1.6, y no puedo actualizarlo en el corto plazo, por lo tanto, quédese con una solución que funcione con esta versión.
class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{
val bucketSize = 1000
def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)
def dataType: DataType = ArrayType(LongType)
def deterministic: Boolean = true
def bufferSchema: StructType = {
StructType(
StructField("buckets", ArrayType(LongType)) :: Nil
)
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new Array[Long](bucketSize)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val index = input.getLong(0)
val value = input.getLong(1)
val arr = buffer.getAs[mutable.WrappedArray[Long]](0)
buffer(0) = arr // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for(i <- arr1.indices){
arr1.update(i, arr1(i) + arr2(i))
}
buffer1(0) = arr1
}
override def evaluate(buffer: Row): Any = {
buffer.getAs[mutable.WrappedArray[Long]](0)
}
}
TL; DR
No use UDAF o use tipos primitivos en lugar de
ArrayType
.
Sin función
UserDefinedFunction
Ambas soluciones deberían saltear el costoso malabarismo entre la representación interna y externa.
Usando agregados estándar y
pivot
Esto utiliza agregaciones SQL estándar. Si bien está optimizado internamente, puede ser costoso cuando aumenta el número de claves y el tamaño de la matriz.
Entrada dada:
val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")
Usted puede:
import org.apache.spark.sql.functions.{array, coalesce, col, lit}
val nBuckets = 10
@transient val values = array(
0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)
df
.groupBy("id")
.pivot("index", 0 until nBuckets)
.sum("value")
.select($"id", values.alias("values"))
+---+--------------------+
| id| values|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
Uso de la API RDD con
combineByKey
/
aggregateByKey
.
Agregación
byKey
simple y antigua con buffer mutable.
Sin campanas y silbatos, pero debería funcionar razonablemente bien con una amplia gama de entradas.
Si sospecha que la entrada es escasa, puede considerar una representación intermedia más eficiente, como
Map
mutable.
rdd
.aggregateByKey(Array.fill(nBuckets)(0L))(
{ case (acc, (index, value)) => { acc(index) += value; acc }},
(acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
).toDF
+---+--------------------+
| _1| _2|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
Uso de la función
UserDefinedFunction
por el
UserDefinedFunction
con tipos primitivos
Hasta donde yo entiendo los
ArrayConverter.toCatalystImpl
internos, el cuello de botella de rendimiento es
ArrayConverter.toCatalystImpl
.
Parece que se llama para cada llamada
MutableAggregationBuffer.update
y, a su vez, asigna nuevos
GenericArrayData
para cada
Row
.
Si redefinimos
bufferSchema
como:
def bufferSchema: StructType = {
StructType(
0 to nBuckets map (i => StructField(s"x$i", LongType))
)
}
tanto la
update
como la
merge
se pueden expresar como reemplazos simples de valores primitivos en el búfer.
La cadena de llamadas seguirá siendo bastante larga, pero
no requerirá copias / conversiones
y asignaciones locas.
Omitiendo cheques
null
, necesitarás algo similar a
val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))
y
for(i <- 0 to nBuckets){
buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}
respectivamente.
Finalmente
evaluate
debería tomar
Row
y convertirlo a la salida
Seq
:
for (i <- 0 to nBuckets) yield buffer.getLong(i)
Tenga en cuenta que en esta implementación se
merge
un posible cuello de botella.
Si bien no debería presentar ningún problema de rendimiento nuevo, con
M
buckets, cada llamada a
merge
es
O (M)
.
Con
K
claves únicas y particiones
P
, se llamará
M * K
veces en el peor de los casos, donde cada clave se produce al menos una vez en cada partición.
Esto aumenta efectivamente la complicidad del componente de
merge
a
O (M * N * K)
.
En general, no hay mucho que pueda hacer al respecto. Sin embargo, si realiza suposiciones específicas sobre la distribución de datos (los datos son escasos, la distribución de claves es uniforme), puede atajar un poco las cosas y mezclar primero:
df
.repartition(n, $"key")
.groupBy($"key")
.agg(SumArrayAtIndexUDAF($"index", $"value"))
Si se cumplen los supuestos, debería:
-
Reduzca de forma contraintuitiva el tamaño aleatorio barajando pares dispersos, en lugar de
Rows
densas en forma de matriz. - Agregue datos usando solo actualizaciones (cada O (1) ) posiblemente tocando solo como un subconjunto de índices.
Sin embargo, si uno o ambos supuestos no se satisfacen, puede esperar que el tamaño aleatorio aumente mientras que el número de actualizaciones se mantendrá igual.
Al mismo tiempo, los sesgos de datos pueden empeorar aún más las cosas en el escenario de
update
,
merge
shuffle
y
merge
.
Uso del
Aggregator
con un
Dataset
con un tipo "fuerte"
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int) extends Aggregator[I, Array[Long], Seq[Long]]
with Serializable {
def zero = Array.fill(bucketSize)(0L)
def reduce(acc: Array[Long], x: I) = {
val (i, v) = f(x)
acc(i) += v
acc
}
def merge(acc1: Array[Long], acc2: Array[Long]) = {
for {
i <- 0 until bucketSize
} acc1(i) += acc2(i)
acc1
}
def finish(acc: Array[Long]) = acc.toSeq
def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}
que podría usarse como se muestra a continuación
val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS
ds
.groupByKey(_._1)
.agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
.show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2) |
+-----+-------------------------------+
|1 |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2 |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+