apache-spark - ejemplo - flatmap spark
¿Cuál es la diferencia entre map y flatMap y un buen caso de uso para cada uno? (12)
¿Puede alguien explicarme la diferencia entre map y flatMap y cuál es un buen caso de uso para cada uno?
¿Qué significa "aplanar los resultados"? ¿Para que sirve?
Aquí hay un ejemplo de la diferencia, como una sesión de spark-shell
:
Primero, algunos datos: dos líneas de texto:
val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue")) // lines
rdd.collect
res0: Array[String] = Array("Roses are red", "Violets are blue")
Ahora, el map
transforma un RDD de longitud N en otro RDD de longitud N.
Por ejemplo, mapea desde dos líneas en dos longitudes de línea:
rdd.map(_.length).collect
res1: Array[Int] = Array(13, 16)
Pero flatMap
(en términos generales) transforma un RDD de longitud N en una colección de N colecciones, luego las aplana en un solo RDD de resultados.
rdd.flatMap(_.split(" ")).collect
res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")
Tenemos varias palabras por línea y varias líneas, pero terminamos con una sola matriz de palabras de salida
Solo para ilustrar eso, flatMapping de una colección de líneas a una colección de palabras se ve así:
["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]
Por lo tanto, los RDD de entrada y salida generalmente serán de diferentes tamaños para flatMap
.
Si hubiéramos intentado utilizar el map
con nuestra función split
, habríamos terminado con estructuras anidadas (un RDD de matrices de palabras, con tipo RDD[Array[String]]
) porque tenemos que tener exactamente un resultado por entrada:
rdd.map(_.split(" ")).collect
res3: Array[Array[String]] = Array(
Array(Roses, are, red),
Array(Violets, are, blue)
)
Finalmente, un caso especial útil es el mapeo con una función que podría no devolver una respuesta, y entonces devuelve una Option
. Podemos usar flatMap
para filtrar los elementos que devuelven None
y extraer los valores de aquellos que devuelven Some
:
val rdd = sc.parallelize(Seq(1,2,3,4))
def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None
rdd.flatMap(myfn).collect
res3: Array[Int] = Array(10,20)
(señalando aquí que una Opción se comporta más bien como una lista que tiene un elemento o cero elementos)
Diferencia en la salida de mapa y flatMap:
1. flatMap
val a = sc.parallelize(1 to 10, 5)
a.flatMap(1 to _).collect()
Salida:
1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
2. map
:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length).collect()
Salida:
3 6 6 3 8
En general usamos el ejemplo del conteo de palabras en hadoop. Tomaré el mismo caso de uso y flatMap
map
y flatMap
y veremos la diferencia en cómo está procesando los datos.
A continuación se muestra el archivo de datos de muestra.
hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome
El archivo anterior se analizará usando map
y flatMap
.
Usando el map
>>> wc = data.map(lambda line:line.split(" "));
>>> wc.collect()
[u''hadoop is fast'', u''hive is sql on hdfs'', u''spark is superfast'', u''spark is awesome'']
La entrada tiene 4 líneas y el tamaño de salida también es 4, es decir, N elementos ==> N elementos.
Usando flatMap
>>> fm = data.flatMap(lambda line:line.split(" "));
>>> fm.collect()
[u''hadoop'', u''is'', u''fast'', u''hive'', u''is'', u''sql'', u''on'', u''hdfs'', u''spark'', u''is'', u''superfast'', u''spark'', u''is'', u''awesome'']
La salida es diferente del mapa.
Asigne 1 como valor para cada tecla para obtener el conteo de palabras.
-
fm
: RDD creado mediante el uso deflatMap
-
wc
: RDD creado usando unmap
>>> fm.map(lambda word : (word,1)).collect()
[(u''hadoop'', 1), (u''is'', 1), (u''fast'', 1), (u''hive'', 1), (u''is'', 1), (u''sql'', 1), (u''on'', 1), (u''hdfs'', 1), (u''spark'', 1), (u''is'', 1), (u''superfast'', 1), (u''spark'', 1), (u''is'', 1), (u''awesome'', 1)]
Mientras que el map
en RDD wc
dará el siguiente resultado no deseado:
>>> wc.flatMap(lambda word : (word,1)).collect()
[[u''hadoop'', u''is'', u''fast''], 1, [u''hive'', u''is'', u''sql'', u''on'', u''hdfs''], 1, [u''spark'', u''is'', u''superfast''], 1, [u''spark'', u''is'', u''awesome''], 1]
No se puede obtener el conteo de palabras si se utiliza el map
lugar de flatMap
.
Según la definición, la diferencia entre el map
y flatMap
es:
map
: Devuelve un nuevo RDD aplicando una función dada a cada elemento del RDD. La función en elmap
devuelve solo un elemento.
flatMap
: similar almap
, devuelve un nuevo RDD aplicando una función a cada elemento del RDD, pero la salida se aplana.
Flatmap y Map transforman la colección.
Diferencia:
mapa (func)
Devuelve un nuevo conjunto de datos distribuidos formado al pasar cada elemento de la fuente a través de una función func.
flatMap (func)
Similar al mapa, pero cada elemento de entrada se puede asignar a 0 o más elementos de salida (por lo que func debería devolver un Seq en lugar de un solo elemento).
La función de transformación:
mapa : un elemento en -> un elemento fuera.
flatMap : un elemento en -> 0 o más elementos eliminados (una colección).
La diferencia puede verse a partir del código pyspark de muestra siguiente:
rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]
rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]
Para todos aquellos que han querido relacionados con PySpark:
Ejemplo de transformación: flatMap
>>> a="hello what are you doing"
>>> a.split()
[''Hola Qué estás haciendo'']
>>> b=["hello what are you doing","this is rak"]
>>> b.split()
Rastreo (llamada más reciente): Archivo "", línea 1, en AttributeError: el objeto ''lista'' no tiene atributo ''división''
>>> rline=sc.parallelize(b)
>>> type(rline)
>>> def fwords(x):
... return x.split()
>>> rword=rline.map(fwords)
>>> rword.collect()
[[''hola'', ''qué'', ''son'', ''usted'', ''haciendo''], [''esto'', ''es'', ''rak'']]
>>> rwordflat=rline.flatMap(fwords)
>>> rwordflat.collect()
[''hola'', ''qué'', ''son'', ''usted'', ''hacer'', ''esto'', ''es'', ''rak'']
Espero eso ayude :)
Si está preguntando la diferencia entre RDD.map y RDD.flatMap en Spark, el mapa transforma un RDD de tamaño N en otro de tamaño N. p.ej.
myRDD.map(x => x*2)
por ejemplo, si myRDD está compuesto de Dobles.
Mientras que flatMap puede transformar el RDD en antera, uno de un tamaño diferente: ej .:
myRDD.flatMap(x =>new Seq(2*x,3*x))
que devolverá un RDD de tamaño 2 * N o
myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )
Todo se reduce a su pregunta inicial: ¿qué quiere decir con aplanamiento ?
Cuando usa flatMap, una colección "multidimensional" se convierte en una colección "unidimensional" .
val array1d = Array ("1,2,3", "4,5,6", "7,8,9")
//array1d is an array of strings
val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )
val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)
Desea utilizar un flatMap cuando,
- la función de tu mapa da como resultado la creación de estructuras multicapa
- pero todo lo que quiere es una estructura simple, plana y unidimensional, eliminando TODAS las agrupaciones internas
Use test.md
como ejemplo:
➜ spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.
scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3
scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15
scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))
scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)
Si usa el método del map
, obtendrá las líneas de test.md
, para el método flatMap
, obtendrá el número de palabras.
El método de map
es similar a flatMap
, todos devuelven un nuevo RDD. map
método de map
menudo para usar devuelve un nuevo RDD, el método flatMap
menudo para usar palabras divididas.
map y flatMap son similares, en el sentido de que toman una línea del RDD de entrada y le aplican una función. La forma en que difieren es que la función en el mapa devuelve solo un elemento, mientras que la función en flatMap puede devolver una lista de elementos (0 o más) como un iterador.
Además, la salida de flatMap se aplana. Aunque la función en flatMap devuelve una lista de elementos, el flatMap devuelve un RDD que tiene todos los elementos de la lista de una manera plana (no una lista).
map
devuelve RDD de igual número de elementos, mientras que flatMap
no.
Un caso de uso de ejemplo para flatMap
Filtra los datos faltantes o incorrectos.
Un caso de uso de ejemplo para el map
utiliza en una amplia variedad de casos donde la cantidad de elementos de entrada y salida es la misma.
number.csv
1
2
3
-
4
-
5
map.py agrega todos los números en add.csv.
from operator import *
def f(row):
try:
return float(row)
except Exception:
return 0
rdd = sc.textFile(''a.csv'').map(f)
print(rdd.count()) # 7
print(rdd.reduce(add)) # 15.0
flatMap.py usa flatMap
para filtrar los datos faltantes antes de flatMap
. Se agregan menos números en comparación con la versión anterior.
from operator import *
def f(row):
try:
return [float(row)]
except Exception:
return []
rdd = sc.textFile(''a.csv'').flatMap(f)
print(rdd.count()) # 5
print(rdd.reduce(add)) # 15.0
- map (func) Devuelve un nuevo conjunto de datos distribuidos formado al pasar cada elemento de la fuente a través de una función declarada func.so map () es un término único
whiles
- flatMap (func) Similar al mapa, pero cada elemento de entrada se puede asignar a 0 o más elementos de salida para que func devuelva una secuencia en lugar de un solo elemento.