tutorial spark reducebykey java scala apache-spark rdd apache-spark-mllib

java - reducebykey - Multiplicación matricial en Apache Spark



reducebykey spark (1)

Todo depende de los datos de entrada y las dimensiones, pero en general lo que desea no es un RDD sino una de las estructuras de datos distribuidas de org.apache.spark.mllib.linalg.distributed . En este momento proporciona cuatro implementaciones diferentes de DistributedMatrix

  • IndexedRowMatrix : se puede crear directamente desde un RDD[IndexedRow] donde IndexedRow consta de índice de fila y org.apache.spark.mllib.linalg.Vector

    import org.apache.spark.mllib.linalg.{Vectors, Matrices} import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix, IndexedRow} val rows = sc.parallelize(Seq( (0L, Array(1.0, 0.0, 0.0)), (0L, Array(0.0, 1.0, 0.0)), (0L, Array(0.0, 0.0, 1.0))) ).map{case (i, xs) => IndexedRow(i, Vectors.dense(xs))} val indexedRowMatrix = new IndexedRowMatrix(rows)

  • RowMatrix : similar a IndexedRowMatrix pero sin índices de fila significativos. Se puede crear directamente desde RDD[org.apache.spark.mllib.linalg.Vector]

    import org.apache.spark.mllib.linalg.distributed.RowMatrix val rowMatrix = new RowMatrix(rows.map(_.vector))

  • BlockMatrix : se puede crear desde RDD[((Int, Int), Matrix)] donde el primer elemento de la tupla contiene coordenadas del bloque y el segundo es un org.apache.spark.mllib.linalg.Matrix local.

    val eye = Matrices.sparse( 3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1)) val blocks = sc.parallelize(Seq( ((0, 0), eye), ((1, 1), eye), ((2, 2), eye))) val blockMatrix = new BlockMatrix(blocks, 3, 3, 9, 9)

  • CoordinateMatrix : se puede crear desde RDD[MatrixEntry] donde MatrixEntry consta de fila, columna y valor.

    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries = sc.parallelize(Seq( (0, 0, 3.0), (2, 0, -5.0), (3, 2, 1.0), (4, 1, 6.0), (6, 2, 2.0), (8, 1, 4.0)) ).map{case (i, j, v) => MatrixEntry(i, j, v)} val coordinateMatrix = new CoordinateMatrix(entries, 9, 3)

Las dos primeras implementaciones admiten la multiplicación por una Matrix local:

val localMatrix = Matrices.dense(3, 2, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)) indexedRowMatrix.multiply(localMatrix).rows.collect // Array(IndexedRow(0,[1.0,4.0]), IndexedRow(0,[2.0,5.0]), // IndexedRow(0,[3.0,6.0]))

y el tercero se puede multiplicar por otro BlockMatrix siempre que el número de columnas por bloque en esta matriz coincida con el número de filas por bloque de la otra matriz. CoordinateMatrix no admite multiplicaciones, pero es bastante fácil de crear y transformar a otros tipos de matrices distribuidas:

blockMatrix.multiply(coordinateMatrix.toBlockMatrix(3, 3))

Cada tipo tiene sus propios lados fuertes y débiles y hay algunos factores adicionales a tener en cuenta cuando se utilizan elementos dispersos o densos ( Vectors o Matrices bloques). Por lo general, es preferible multiplicar por una matriz local, ya que no requiere una combinación aleatoria costosa.

Puede encontrar más detalles sobre cada tipo en la guía Tipos de datos MLlib .

Estoy tratando de realizar una multiplicación de matrices usando Apache Spark y Java.

Tengo 2 preguntas principales:

  1. ¿Cómo crear RDD que pueda representar una matriz en Apache Spark?
  2. ¿Cómo multiplicar dos de estos RDD?