PySpark - RDD

Ahora que hemos instalado y configurado PySpark en nuestro sistema, podemos programar en Python en Apache Spark. Sin embargo, antes de hacerlo, comprendamos un concepto fundamental en Spark: RDD.

RDD significa Resilient Distributed Dataset, estos son los elementos que se ejecutan y operan en varios nodos para realizar un procesamiento paralelo en un clúster. Los RDD son elementos inmutables, lo que significa que una vez que crea un RDD no puede cambiarlo. Los RDD también son tolerantes a fallas, por lo tanto, en caso de falla, se recuperan automáticamente. Puede aplicar varias operaciones en estos RDD para lograr una determinada tarea.

Para aplicar operaciones en estos RDD, hay dos formas:

  • Transformación y
  • Action

Entendamos estas dos formas en detalle.

Transformation- Estas son las operaciones, que se aplican en un RDD para crear un nuevo RDD. Filter, groupBy y map son ejemplos de transformaciones.

Action - Estas son las operaciones que se aplican en RDD, lo que indica a Spark que realice el cálculo y envíe el resultado al controlador.

Para aplicar cualquier operación en PySpark, necesitamos crear un PySpark RDDprimero. El siguiente bloque de código tiene el detalle de una clase RDD de PySpark:

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Veamos cómo ejecutar algunas operaciones básicas usando PySpark. El siguiente código en un archivo Python crea palabras RDD, que almacena un conjunto de palabras mencionadas.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

Ahora ejecutaremos algunas operaciones con palabras.

contar()

Se devuelve el número de elementos del RDD.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command - El comando para count () es -

$SPARK_HOME/bin/spark-submit count.py

Output - La salida para el comando anterior es -

Number of elements in RDD → 8

recoger()

Se devuelven todos los elementos del RDD.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command - El comando para recolectar () es -

$SPARK_HOME/bin/spark-submit collect.py

Output - La salida para el comando anterior es -

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (f)

Devuelve solo aquellos elementos que cumplen la condición de la función dentro de foreach. En el siguiente ejemplo, llamamos a una función de impresión en foreach, que imprime todos los elementos en el RDD.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command - El comando para foreach (f) es -

$SPARK_HOME/bin/spark-submit foreach.py

Output - La salida para el comando anterior es -

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filtro (f)

Se devuelve un nuevo RDD que contiene los elementos, que satisface la función dentro del filtro. En el siguiente ejemplo, filtramos las cadenas que contienen "chispa".

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command - El comando para el filtro (f) es -

$SPARK_HOME/bin/spark-submit filter.py

Output - La salida para el comando anterior es -

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map (f, preservesPartitioning = False)

Se devuelve un nuevo RDD aplicando una función a cada elemento del RDD. En el siguiente ejemplo, formamos un par clave-valor y asignamos cada cadena con un valor de 1.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command - El comando para map (f, preservesPartitioning = False) es -

$SPARK_HOME/bin/spark-submit map.py

Output - La salida del comando anterior es -

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

reducir (f)

Después de realizar la operación binaria conmutativa y asociativa especificada, se devuelve el elemento en el RDD. En el siguiente ejemplo, estamos importando add package del operador y aplicándolo en 'num' para realizar una simple operación de suma.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command - El comando para reducir (f) es -

$SPARK_HOME/bin/spark-submit reduce.py

Output - La salida del comando anterior es -

Adding all the elements -> 15

unirse (otro, numPartitions = None)

Devuelve RDD con un par de elementos con las claves coincidentes y todos los valores para esa clave en particular. En el siguiente ejemplo, hay dos pares de elementos en dos RDD diferentes. Después de unir estos dos RDD, obtenemos un RDD con elementos que tienen claves coincidentes y sus valores.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command - El comando para unirse (otro, numPartitions = None) es -

$SPARK_HOME/bin/spark-submit join.py

Output - La salida para el comando anterior es -

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache()

Conserve este RDD con el nivel de almacenamiento predeterminado (MEMORY_ONLY). También puede verificar si el RDD está en caché o no.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command - El comando para cache () es -

$SPARK_HOME/bin/spark-submit cache.py

Output - La salida para el programa anterior es -

Words got cached -> True

Estas fueron algunas de las operaciones más importantes que se realizan en PySpark RDD.