vectores suma resta metodo matematicamente graficamente con angulos analitico scala apache-spark apache-spark-sql aggregate-functions apache-spark-ml

scala - resta - ¿Cómo definir una función de agregación personalizada para sumar una columna de vectores?



suma de vectores matematicamente (2)

Personalmente, no me molestaría con los UDAF. Hay más que detallado y no exactamente rápido ( Spark UDAF con ArrayType como problemas de rendimiento de bufferSchema ) En su lugar, simplemente usaría reduceByKey / foldByKey :

import org.apache.spark.sql.Row import breeze.linalg.{DenseVector => BDV} import org.apache.spark.ml.linalg.{Vector, Vectors} def dv(values: Double*): Vector = Vectors.dense(values.toArray) val df = spark.createDataFrame(Seq( (1, dv(0,0,5)), (1, dv(4,0,1)), (1, dv(1,2,1)), (2, dv(7,5,0)), (2, dv(3,3,4)), (3, dv(0,8,1)), (3, dv(0,0,1)), (3, dv(7,7,7))) ).toDF("id", "vec") val aggregated = df .rdd .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) } .foldByKey(BDV.zeros[Double](3))(_ += _) .mapValues(v => Vectors.dense(v.toArray)) .toDF("id", "vec") aggregated.show // +---+--------------+ // | id| vec| // +---+--------------+ // | 1| [5.0,2.0,7.0]| // | 2|[10.0,8.0,4.0]| // | 3|[7.0,15.0,9.0]| // +---+--------------+

Y solo por comparación, un UDAF "simple". Importaciones requeridas:

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes} import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType} import org.apache.spark.sql.Row import scala.collection.mutable.WrappedArray

Definición de clase:

class VectorSum (n: Int) extends UserDefinedAggregateFunction { def inputSchema = new StructType().add("v", SQLDataTypes.VectorType) def bufferSchema = new StructType().add("buff", ArrayType(DoubleType)) def dataType = SQLDataTypes.VectorType def deterministic = true def initialize(buffer: MutableAggregationBuffer) = { buffer.update(0, Array.fill(n)(0.0)) } def update(buffer: MutableAggregationBuffer, input: Row) = { if (!input.isNullAt(0)) { val buff = buffer.getAs[WrappedArray[Double]](0) val v = input.getAs[Vector](0).toSparse for (i <- v.indices) { buff(i) += v(i) } buffer.update(0, buff) } } def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { val buff1 = buffer1.getAs[WrappedArray[Double]](0) val buff2 = buffer2.getAs[WrappedArray[Double]](0) for ((x, i) <- buff2.zipWithIndex) { buff1(i) += x } buffer1.update(0, buff1) } def evaluate(buffer: Row) = Vectors.dense( buffer.getAs[Seq[Double]](0).toArray) }

Y un ejemplo de uso:

df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show // +---+--------------+ // | id| vec| // +---+--------------+ // | 1| [5.0,2.0,7.0]| // | 2|[10.0,8.0,4.0]| // | 3|[7.0,15.0,9.0]| // +---+--------------+

Consulte también: ¿Cómo encontrar la media de las columnas vectoriales agrupadas en Spark SQL? .

Tengo un DataFrame de dos columnas, ID de tipo Int y Vec de tipo Vector ( org.apache.spark.mllib.linalg.Vector ).

El DataFrame tiene el siguiente aspecto:

ID,Vec 1,[0,0,5] 1,[4,0,1] 1,[1,2,1] 2,[7,5,0] 2,[3,3,4] 3,[0,8,1] 3,[0,0,1] 3,[7,7,7] ....

Me gustaría hacer un groupBy($"ID") luego aplicar una agregación en las filas dentro de cada grupo sumando los vectores.

El resultado deseado del ejemplo anterior sería:

ID,SumOfVectors 1,[5,2,7] 2,[10,8,4] 3,[7,15,9] ...

Las funciones de agregación disponibles no funcionarán, por ejemplo, df.groupBy($"ID").agg(sum($"Vec") conducirá a una ClassCastException.

¿Cómo implementar una función de agregación personalizada que me permita hacer la suma de vectores o matrices o cualquier otra operación personalizada?


Sugiero lo siguiente (funciona en Spark 2.0.2 en adelante), podría estar optimizado pero es muy bueno, una cosa que debes saber de antemano es el tamaño del vector cuando creas la instancia UDAF

import org.apache.spark.ml.linalg._ import org.apache.spark.mllib.linalg.WeightedSparseVector import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ class VectorAggregate(val numFeatures: Int) extends UserDefinedAggregateFunction { private type B = Map[Int, Double] def inputSchema: StructType = StructType(StructField("vec", new VectorUDT()) :: Nil) def bufferSchema: StructType = StructType(StructField("agg", MapType(IntegerType, DoubleType)) :: Nil) def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0, Map.empty[Int, Double]) def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val zero = buffer.getAs[B](0) input match { case Row(DenseVector(values)) => buffer.update(0, values.zipWithIndex.foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) case Row(SparseVector(_, indices, values)) => buffer.update(0, values.zip(indices).foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) }} def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val zero = buffer1.getAs[B](0) buffer1.update(0, buffer2.getAs[B](0).foldLeft(zero){case (acc,(i,v)) => acc.updated(i, v + acc.getOrElse(i,0d))})} def deterministic: Boolean = true def evaluate(buffer: Row): Any = { val Row(agg: B) = buffer val indices = agg.keys.toArray.sorted Vectors.sparse(numFeatures,indices,indices.map(agg)).compressed } def dataType: DataType = new VectorUDT() }