spark introducción ejemplo performance scala apache-spark rdd

performance - ejemplo - introducción a apache spark pdf



Apache Spark: map vs mapPartitions? (3)

¿Cuál es la diferencia entre el mapa de un RDD y el método mapPartitions?

El map métodos convierte cada elemento del RDD fuente en un único elemento del resultado RDD aplicando una función. mapPartitions convierte cada partición del RDD de origen en varios elementos del resultado (posiblemente ninguno).

¿Y FlatMap se comporta como un mapa o como mapPartitions?

Tampoco, flatMap funciona en un solo elemento (como map ) y produce múltiples elementos del resultado (como mapPartitions ).

¿Cuál es la diferencia entre RDD''s map un RDD''s y el método mapPartitions ? ¿Y flatMap comporta como un map o como mapPartitions ? Gracias.

(editar) es decir, ¿cuál es la diferencia (ya sea semánticamente o en términos de ejecución) entre

def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) }, preservesPartitioning = true) }

Y:

def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { rdd.map(fn) }


Diablillo. PROPINA :

Siempre que tenga una inicialización pesada, debe hacerse una vez para muchos elementos RDD lugar de una vez para cada elemento RDD , y si esta inicialización, como la creación de objetos de una biblioteca externa, no se puede serializar (para que Spark pueda transmitirla a través del agrupar en los nodos de trabajador), use mapPartitions() lugar de map() . mapPartitions() permite que la inicialización se realice una vez por tarea de trabajo / subproceso / partición en lugar de una vez por elemento de datos de RDD , por ejemplo: ver a continuación.

val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator })

Q2. ¿ flatMap comporta como un mapa o como mapPartitions ?

Sí. por favor, vea el ejemplo 2 de flatmap ... se explica por sí mismo.

Q1. ¿Cuál es la diferencia entre un map de RDD y mapPartitions

map funciona la función que se utiliza a nivel de elemento mientras mapPartitions ejerce la función en el nivel de partición.

Escenario de ejemplo : si tenemos elementos de 100K en una partición RDD particular, dispararemos la función utilizada por la transformación de mapeo 100K veces cuando utilicemos el map .

Por el contrario, si usamos mapPartitions , solo llamaremos a la función particular una vez, pero pasaremos todos los 100K registros y obtendremos todas las respuestas en una llamada a función.

Habrá una ganancia de rendimiento ya que el map funciona en una función particular tantas veces, especialmente si la función está haciendo algo caro cada vez que no tendría que hacer si pasamos todos los elementos a la vez (en el caso de las mappartitions de mappartitions ).

mapa

Aplica una función de transformación en cada elemento del RDD y devuelve el resultado como un nuevo RDD.

Variantes de listado

def map [U: ClassTag] (f: T => U): RDD [U]

Ejemplo:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

mapPartitions

Este es un mapa especializado que se llama solo una vez para cada partición. El contenido completo de las particiones respectivas está disponible como un flujo secuencial de valores a través del argumento de entrada (Iterarator [T]). La función personalizada debe devolver otro iterador [U]. Los iteradores de resultados combinados se convierten automáticamente en un nuevo RDD. Tenga en cuenta que las tuplas (3,4) y (6,7) faltan en el siguiente resultado debido a la partición que elegimos.

preservesPartitioning indica si la función de entrada preserva el particionador, que debe ser false menos que sea un par RDD y la función de entrada no modifique las claves.

Variantes de listado

def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], conservaPartitioning: Boolean = false): RDD [U]

Ejemplo 1

val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

Ejemplo 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)

El programa anterior también se puede escribir usando flatMap de la siguiente manera.

Ejemplo 2 usando flatmap

val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)

Conclusión

mapPartitions transformación es más rápido que el map ya que llama a su función una vez / partición, no una vez / elemento ..


Mapa :

  1. Procesa una fila a la vez, muy similar al método map () de MapReduce.
  2. Regresas de la transformación después de cada fila.

MapPartitions

  1. Procesa la partición completa de una vez.
  2. Puede regresar de la función solo una vez después de procesar toda la partición.
  3. Todos los resultados intermedios deben conservarse en la memoria hasta que procese toda la partición.
  4. Proporciona la función setup () map () y cleanup () de MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/