scala - org - Spark RDD''s-¿Cómo funcionan?
filter scala spark (2)
val rdd = sc.textFile(file)
¿Eso significa que el archivo ahora está particionado en los nodos?
El archivo permanece donde estaba. Los elementos del RDD[String]
resultante son las líneas del archivo. El RDD está particionado para que coincida con la partición natural del sistema de archivos subyacente. La cantidad de particiones no depende de la cantidad de nodos que tenga.
Es importante comprender que cuando se ejecuta esta línea , no lee el archivo (s). El RDD es un objeto perezoso y solo hará algo cuando sea necesario. Esto es genial porque evita el uso innecesario de la memoria.
Por ejemplo, si escribe val errors = rdd.filter(line => line.startsWith("error"))
, todavía no sucede nada. Si luego escribe val errorCount = errors.count
ahora su secuencia de operaciones deberá ser ejecutada porque el resultado del count
es un número entero. Lo que cada núcleo de trabajador (hilo ejecutor) hará en paralelo, se leerá un archivo (o una porción de archivo), repetirá a través de sus líneas y contará las líneas que comiencen con "error". Dejando de lado el búfer y el GC, solo habrá una línea por núcleo en la memoria a la vez. Esto hace posible trabajar con datos muy grandes sin usar mucha memoria.
Quiero contar el número de objetos en el RDD, sin embargo, necesito usar ese número en un cálculo que debe aplicarse a objetos en el RDD, un ejemplo de pseudocódigo:
rdd.map(x => x / rdd.size)
No hay rdd.size
método rdd.size
. Hay rdd.count
, que cuenta la cantidad de elementos en el RDD. rdd.map(x => x / rdd.count)
no funcionará. El código intentará enviar la variable rdd
a todos los trabajadores y fallará con una NotSerializableException
. Lo que puedes hacer es:
val count = rdd.count
val normalized = rdd.map(x => x / count)
Esto funciona, porque count
es un Int
y se puede serializar.
Si realizo una transformación al RDD, por ejemplo,
rdd.map(_.split("-"))
, y luego deseo el nuevo tamaño del RDD, ¿necesito realizar una acción en el RDD, como elcount()
, por lo que toda la información se envía de vuelta al nodo del controlador?
map
no cambia la cantidad de elementos. No sé a qué te refieres con "talla". Pero sí, debe realizar una acción, como count
para sacar algo del RDD. Verá, no se realiza ningún trabajo hasta que realice una acción. (Cuando realiza el count
, solo el count
por partición se enviará de vuelta al controlador, por supuesto, no "toda la información").
Tengo un pequeño programa Scala que funciona bien en un solo nodo. Sin embargo, estoy escalando para que se ejecute en múltiples nodos. Este es mi primer intento de este tipo. Solo intento entender cómo funcionan los RDD en Spark, por lo que esta pregunta se basa en la teoría y puede no ser 100% correcta.
Digamos que creo un RDD: val rdd = sc.textFile(file)
Ahora, una vez que he hecho eso, ¿eso significa que el archivo en el file
ahora está particionado a través de los nodos (suponiendo que todos los nodos tengan acceso a la ruta del archivo)?
En segundo lugar, quiero contar el número de objetos en el RDD (lo suficientemente simple), sin embargo, necesito usar ese número en un cálculo que debe aplicarse a objetos en el RDD, un ejemplo de pseudocódigo:
rdd.map(x => x / rdd.size)
Digamos que hay 100 objetos en rdd
, y digamos que hay 10 nodos, por lo tanto, un recuento de 10 objetos por nodo (suponiendo que así es como funciona el concepto de RDD), ahora cuando llamo al método, cada nodo va a realizar el cálculo con rdd.size
como 10
o 100
? Porque, en general, el RDD es del tamaño 100
pero localmente en cada nodo es solo 10
. ¿Tengo que hacer una variable de transmisión antes de hacer el cálculo? Esta pregunta está vinculada a la pregunta a continuación.
Finalmente, si realizo una transformación al RDD, por ejemplo, rdd.map(_.split("-"))
, y luego quería el nuevo size
del RDD, ¿necesito realizar una acción en el RDD, como count()
, ¿entonces toda la información se envía de vuelta al nodo del controlador?
Por lo general, el archivo (o partes del archivo, si es demasiado grande) se replica en N nodos en el clúster (de forma predeterminada N = 3 en HDFS). No es una intención dividir todos los archivos entre todos los nodos disponibles.
Sin embargo, para usted (es decir, el cliente) que trabaja con el archivo utilizando Spark, debe ser transparente: no debe ver ninguna diferencia en rdd.size
, sin importar en cuántos nodos se haya dividido y / o replicado. Existen métodos (al menos, en Hadoop) para averiguar qué nodos (partes del archivo) se pueden ubicar en el momento. Sin embargo, en casos simples, lo más probable es que no necesite usar esta funcionalidad.
ACTUALIZACIÓN: un artículo que describe las funciones internas de RDD: https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf