textfile sparkcontext spark parallelize ejemplo scala apache-spark rdd

scala - sparkcontext - (¿Por qué?) Necesitamos llamar a la memoria caché o persistir en un RDD



sparkcontext text file (5)

A continuación se muestran las tres situaciones en las que debe almacenar en caché sus RDD:

usando un RDD muchas veces

realizar múltiples acciones en el mismo RDD

para largas cadenas de transformaciones (o muy caras)

Cuando se crea un conjunto de datos distribuido (RDD) resistente a partir de un archivo de texto o colección (o de otro RDD), ¿necesitamos llamar explícitamente "caché" o "persistir" para almacenar los datos RDD en la memoria? ¿O los datos RDD se almacenan de forma distribuida en la memoria de forma predeterminada?

val textFile = sc.textFile("/user/emp.txt")

Según tengo entendido, después del paso anterior, textFile es un RDD y está disponible en toda / parte de la memoria del nodo.

Si es así, ¿por qué necesitamos llamar "caché" o "persistir" en textFile RDD entonces?


Agregar otra razón para agregar (o agregar temporalmente) llamada al método de cache .

para problemas de memoria de depuración

con el método de cache , spark dará información de depuración sobre el tamaño del RDD. así que en la interfaz de usuario integrada de spark, obtendrá información de consumo de memoria RDD. y esto resultó muy útil para diagnosticar problemas de memoria.


Creo que la pregunta se formularía mejor como:

¿Cuándo necesitamos llamar al caché o persistir en un RDD?

Los procesos de chispa son flojos, es decir, nada sucederá hasta que se requiera. Para responder rápidamente a la pregunta, después de val textFile = sc.textFile("/user/emp.txt") , no sucede nada con los datos, solo se construye un HadoopRDD , utilizando el archivo como fuente.

Digamos que transformamos esos datos un poco:

val wordsRDD = textFile.flatMap(line => line.split("//W"))

Nuevamente, nada le sucede a los datos. Ahora hay una nueva RDD wordsRDD que contiene una referencia a testFile y una función que se aplicará cuando sea necesario.

Solo cuando se ejecuta una acción sobre un RDD, como wordsRDD.count , se wordsRDD.count la cadena RDD, llamada linaje . Es decir, los datos, desglosados ​​en particiones, serán cargados por los ejecutores del clúster Spark, se flatMap función flatMap y se calculará el resultado.

En un linaje lineal, como el de este ejemplo, no se necesita cache() . Los datos se cargarán a los ejecutores, se aplicarán todas las transformaciones y finalmente se computará el count , todo en la memoria, si los datos se ajustan en la memoria.

cache es útil cuando el linaje del RDD se ramifica. Digamos que desea filtrar las palabras del ejemplo anterior en un recuento de palabras positivas y negativas. Podrías hacer esto así:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Aquí, cada rama emite una recarga de los datos. Agregar una declaración de cache explícita asegurará que el procesamiento realizado previamente se conserve y reutilice. El trabajo se verá así:

val textFile = sc.textFile("/user/emp.txt") val wordsRDD = textFile.flatMap(line => line.split("//W")) wordsRDD.cache() val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Por esa razón, se dice que la cache ''rompe el linaje'' ya que crea un punto de control que puede reutilizarse para su posterior procesamiento.

Regla general: use cache cuando el linaje de su RDD se ramifique o cuando un RDD se use varias veces como en un bucle.


La mayoría de las operaciones RDD son perezosas. Piense en un RDD como una descripción de una serie de operaciones. Un RDD no son datos. Entonces esta línea:

val textFile = sc.textFile("/user/emp.txt")

No hace nada Crea un RDD que dice "necesitaremos cargar este archivo". El archivo no está cargado en este momento.

Las operaciones RDD que requieren observar el contenido de los datos no pueden ser perezosas. (Estas se denominan acciones ). Un ejemplo es RDD.count : para indicarle el número de líneas en el archivo, el archivo debe leerse. Entonces, si escribe textFile.count , en este punto se leerá el archivo, se contarán las líneas y se devolverá el recuento.

¿Qué textFile.count vuelve a llamar a textFile.count ? Lo mismo: el archivo será leído y contado nuevamente. Nada es almacenado. Un RDD no son datos.

Entonces, ¿qué hace RDD.cache ? Si agrega textFile.cache al código anterior:

val textFile = sc.textFile("/user/emp.txt") textFile.cache

No hace nada RDD.cache también es una operación perezosa. El archivo aún no se lee. Pero ahora el RDD dice "lea este archivo y luego guarde en caché el contenido". Si luego ejecuta textFile.count la primera vez, el archivo se cargará, se almacenará en caché y se contará. Si llama a textFile.count por segunda vez, la operación usará el caché. Solo tomará los datos del caché y contará las líneas.

El comportamiento de la memoria caché depende de la memoria disponible. Si el archivo no cabe en la memoria, por ejemplo, textFile.count volverá al comportamiento habitual y volverá a leer el archivo.


¿Necesitamos llamar "caché" o "persistir" explícitamente para almacenar los datos RDD en la memoria?

Sí, solo si es necesario.

¿Los datos RDD almacenados de forma distribuida en la memoria por defecto?

¡No!

Y estas son las razones por las cuales:

  • Spark admite dos tipos de variables compartidas: variables de difusión, que se pueden usar para almacenar en caché un valor en la memoria en todos los nodos, y acumuladores, que son variables que solo se "agregan", como contadores y sumas.

  • Los RDD admiten dos tipos de operaciones: transformaciones, que crean un nuevo conjunto de datos a partir de uno existente, y acciones, que devuelven un valor al programa del controlador después de ejecutar un cálculo en el conjunto de datos. Por ejemplo, el mapa es una transformación que pasa cada elemento del conjunto de datos a través de una función y devuelve un nuevo RDD que representa los resultados. Por otro lado, reducir es una acción que agrega todos los elementos del RDD usando alguna función y devuelve el resultado final al programa del controlador (aunque también hay un reduceByKey paralelo que devuelve un conjunto de datos distribuido).

  • Todas las transformaciones en Spark son perezosas, ya que no calculan sus resultados de inmediato. En cambio, solo recuerdan las transformaciones aplicadas a algún conjunto de datos base (por ejemplo, un archivo). Las transformaciones solo se calculan cuando una acción requiere que se devuelva un resultado al programa controlador. Este diseño permite que Spark se ejecute de manera más eficiente; por ejemplo, podemos darnos cuenta de que un conjunto de datos creado a través del mapa se usará en una reducción y devolverá solo el resultado de la reducción al controlador, en lugar del conjunto de datos mapeado más grande.

  • Por defecto, cada RDD transformado puede ser recalculado cada vez que ejecuta una acción en él. Sin embargo, también puede conservar un RDD en la memoria utilizando el método de persistencia (o caché), en cuyo caso Spark mantendrá los elementos en el clúster para un acceso mucho más rápido la próxima vez que lo consulte. También hay soporte para RDDs persistentes en disco, o replicados en múltiples nodos.

Para más detalles, consulte la guía de programación de Spark .