performance - ejemplo - introducción a apache spark pdf
Apache Spark: map vs mapPartitions? (3)
¿Cuál es la diferencia entre el mapa de un RDD y el método mapPartitions?
El map métodos convierte cada elemento del RDD fuente en un único elemento del resultado RDD aplicando una función. mapPartitions convierte cada partición del RDD de origen en varios elementos del resultado (posiblemente ninguno).
¿Y FlatMap se comporta como un mapa o como mapPartitions?
Tampoco, flatMap funciona en un solo elemento (como map
) y produce múltiples elementos del resultado (como mapPartitions
).
¿Cuál es la diferencia entre RDD''s map
un RDD''s y el método mapPartitions
? ¿Y flatMap
comporta como un map
o como mapPartitions
? Gracias.
(editar) es decir, ¿cuál es la diferencia (ya sea semánticamente o en términos de ejecución) entre
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
Y:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
Diablillo. PROPINA :
Siempre que tenga una inicialización pesada, debe hacerse una vez para muchos elementos
RDD
lugar de una vez para cada elementoRDD
, y si esta inicialización, como la creación de objetos de una biblioteca externa, no se puede serializar (para que Spark pueda transmitirla a través del agrupar en los nodos de trabajador), usemapPartitions()
lugar demap()
.mapPartitions()
permite que la inicialización se realice una vez por tarea de trabajo / subproceso / partición en lugar de una vez por elemento de datos deRDD
, por ejemplo: ver a continuación.
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
Q2. ¿
flatMap
comporta como un mapa o comomapPartitions
?
Sí. por favor, vea el ejemplo 2 de flatmap
... se explica por sí mismo.
Q1. ¿Cuál es la diferencia entre un
map
de RDD ymapPartitions
map
funciona la función que se utiliza a nivel de elemento mientrasmapPartitions
ejerce la función en el nivel de partición.
Escenario de ejemplo : si tenemos elementos de 100K en una partición RDD
particular, dispararemos la función utilizada por la transformación de mapeo 100K veces cuando utilicemos el map
.
Por el contrario, si usamos mapPartitions
, solo llamaremos a la función particular una vez, pero pasaremos todos los 100K registros y obtendremos todas las respuestas en una llamada a función.
Habrá una ganancia de rendimiento ya que el map
funciona en una función particular tantas veces, especialmente si la función está haciendo algo caro cada vez que no tendría que hacer si pasamos todos los elementos a la vez (en el caso de las mappartitions
de mappartitions
).
mapa
Aplica una función de transformación en cada elemento del RDD y devuelve el resultado como un nuevo RDD.
Variantes de listado
def map [U: ClassTag] (f: T => U): RDD [U]
Ejemplo:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
Este es un mapa especializado que se llama solo una vez para cada partición. El contenido completo de las particiones respectivas está disponible como un flujo secuencial de valores a través del argumento de entrada (Iterarator [T]). La función personalizada debe devolver otro iterador [U]. Los iteradores de resultados combinados se convierten automáticamente en un nuevo RDD. Tenga en cuenta que las tuplas (3,4) y (6,7) faltan en el siguiente resultado debido a la partición que elegimos.
preservesPartitioning
indica si la función de entrada preserva el particionador, que debe serfalse
menos que sea un par RDD y la función de entrada no modifique las claves.Variantes de listado
def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], conservaPartitioning: Boolean = false): RDD [U]
Ejemplo 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Ejemplo 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
El programa anterior también se puede escribir usando flatMap de la siguiente manera.
Ejemplo 2 usando flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
Conclusión
mapPartitions
transformación es más rápido que el map
ya que llama a su función una vez / partición, no una vez / elemento ..
Mapa :
- Procesa una fila a la vez, muy similar al método map () de MapReduce.
- Regresas de la transformación después de cada fila.
MapPartitions
- Procesa la partición completa de una vez.
- Puede regresar de la función solo una vez después de procesar toda la partición.
- Todos los resultados intermedios deben conservarse en la memoria hasta que procese toda la partición.
- Proporciona la función setup () map () y cleanup () de MapReduce
Map Vs mapPartitions
http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Map
http://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitions
http://bytepadding.com/big-data/spark/spark-mappartitions/