PySpark - Transmisión y acumulador

Para el procesamiento paralelo, Apache Spark usa variables compartidas. Una copia de la variable compartida va en cada nodo del clúster cuando el controlador envía una tarea al ejecutor en el clúster, para que pueda usarse para realizar tareas.

Apache Spark admite dos tipos de variables compartidas:

  • Broadcast
  • Accumulator

Entendamos en detalle.

Transmitir

Las variables de difusión se utilizan para guardar la copia de datos en todos los nodos. Esta variable se almacena en caché en todas las máquinas y no se envía a las máquinas con tareas. El siguiente bloque de código tiene los detalles de una clase Broadcast para PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

El siguiente ejemplo muestra cómo utilizar una variable de difusión. Una variable de difusión tiene un atributo llamado valor, que almacena los datos y se utiliza para devolver un valor de difusión.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - El comando para una variable de difusión es el siguiente:

$SPARK_HOME/bin/spark-submit broadcast.py

Output - El resultado del siguiente comando se muestra a continuación.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Acumulador

Las variables acumuladoras se utilizan para agregar la información mediante operaciones asociativas y conmutativas. Por ejemplo, puede usar un acumulador para una operación de suma o contadores (en MapReduce). El siguiente bloque de código tiene los detalles de una clase Accumulator para PySpark.

class pyspark.Accumulator(aid, value, accum_param)

El siguiente ejemplo muestra cómo utilizar una variable de acumulador. Una variable acumuladora tiene un atributo llamado valor que es similar a lo que tiene una variable de difusión. Almacena los datos y se usa para devolver el valor del acumulador, pero solo se puede usar en un programa controlador.

En este ejemplo, varios trabajadores utilizan una variable acumuladora y devuelve un valor acumulado.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - El comando para una variable de acumulador es el siguiente -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - El resultado del comando anterior se muestra a continuación.

Accumulated value is -> 150