PySpark - Guía rápida

En este capítulo, nos familiarizaremos con qué es Apache Spark y cómo se desarrolló PySpark.

Spark - Descripción general

Apache Spark es un marco de procesamiento en tiempo real ultrarrápido. Realiza cálculos en memoria para analizar datos en tiempo real. Entró en la imagen comoApache Hadoop MapReducerealizaba solo procesamiento por lotes y carecía de una función de procesamiento en tiempo real. Por lo tanto, se introdujo Apache Spark, ya que puede realizar el procesamiento de secuencias en tiempo real y también puede encargarse del procesamiento por lotes.

Además del procesamiento por lotes y en tiempo real, Apache Spark también admite consultas interactivas y algoritmos iterativos. Apache Spark tiene su propio administrador de clústeres, donde puede alojar su aplicación. Aprovecha Apache Hadoop tanto para el almacenamiento como para el procesamiento. UsaHDFS (Sistema de archivos distribuidos Hadoop) para almacenamiento y puede ejecutar aplicaciones Spark en YARN también.

PySpark - Descripción general

Apache Spark está escrito en Scala programming language. Para admitir Python con Spark, Apache Spark Community lanzó una herramienta, PySpark. Usando PySpark, puede trabajar conRDDstambién en el lenguaje de programación Python. Es por una biblioteca llamadaPy4j que son capaces de lograrlo.

Ofertas PySpark PySpark Shellque vincula la API de Python al núcleo de Spark e inicializa el contexto de Spark. La mayoría de los científicos de datos y los expertos en análisis utilizan Python debido a su rico conjunto de bibliotecas. Integrar Python con Spark es una bendición para ellos.

En este capítulo, entenderemos la configuración del entorno de PySpark.

Note - Esto es considerando que tiene Java y Scala instalados en su computadora.

Ahora descarguemos y configuremos PySpark con los siguientes pasos.

Step 1- Vaya a la página de descarga oficial de Apache Spark y descargue la última versión de Apache Spark disponible allí. En este tutorial, estamos usandospark-2.1.0-bin-hadoop2.7.

Step 2- Ahora, extraiga el archivo tar de Spark descargado. De forma predeterminada, se descargará en el directorio de Descargas.

# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz

Creará un directorio spark-2.1.0-bin-hadoop2.7. Antes de iniciar PySpark, debe configurar los siguientes entornos para configurar la ruta de Spark y laPy4j path.

export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH

O, para configurar los entornos anteriores globalmente, colóquelos en el .bashrc file. Luego ejecute el siguiente comando para que los entornos funcionen.

# source .bashrc

Ahora que tenemos todos los entornos configurados, vayamos al directorio Spark e invoquemos el shell PySpark ejecutando el siguiente comando:

# ./bin/pyspark

Esto iniciará su shell de PySpark.

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<

SparkContext es el punto de entrada a cualquier funcionalidad de Spark. Cuando ejecutamos cualquier aplicación Spark, se inicia un programa controlador, que tiene la función principal y su SparkContext se inicia aquí. Luego, el programa controlador ejecuta las operaciones dentro de los ejecutores en los nodos trabajadores.

SparkContext usa Py4J para lanzar un JVM y crea un JavaSparkContext. Por defecto, PySpark tiene SparkContext disponible como‘sc’, por lo que la creación de un nuevo SparkContext no funcionará.

El siguiente bloque de código tiene los detalles de una clase PySpark y los parámetros que puede tomar SparkContext.

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

Parámetros

A continuación se muestran los parámetros de SparkContext.

  • Master - Es la URL del clúster al que se conecta.

  • appName - Nombre de su trabajo.

  • sparkHome - Directorio de instalación de Spark.

  • pyFiles - Los archivos .zip o .py para enviar al clúster y agregar a PYTHONPATH.

  • Environment - Variables de entorno de los nodos trabajadores.

  • batchSize- El número de objetos Python representados como un solo objeto Java. Configure 1 para deshabilitar el procesamiento por lotes, 0 para elegir automáticamente el tamaño del lote según el tamaño de los objetos o -1 para usar un tamaño de lote ilimitado.

  • Serializer - Serializador RDD.

  • Conf - Un objeto de L {SparkConf} para establecer todas las propiedades de Spark.

  • Gateway - Utilice una puerta de enlace y una JVM existentes, de lo contrario inicialice una nueva JVM.

  • JSC - La instancia de JavaSparkContext.

  • profiler_cls - Una clase de Profiler personalizado que se utiliza para crear perfiles (el valor predeterminado es pyspark.profiler.BasicProfiler).

Entre los parámetros anteriores, master y appnamese utilizan principalmente. Las dos primeras líneas de cualquier programa PySpark se ven como se muestra a continuación:

from pyspark import SparkContext
sc = SparkContext("local", "First App")

Ejemplo de SparkContext: PySpark Shell

Ahora que sabe lo suficiente sobre SparkContext, ejecutemos un ejemplo simple en el shell de PySpark. En este ejemplo, contaremos el número de líneas con el carácter 'a' o 'b' en elREADME.mdarchivo. Entonces, digamos que si hay 5 líneas en un archivo y 3 líneas tienen el carácter 'a', entonces la salida será →Line with a: 3. Lo mismo se hará con el carácter 'b'.

Note- No estamos creando ningún objeto SparkContext en el siguiente ejemplo porque, de forma predeterminada, Spark crea automáticamente el objeto SparkContext llamado sc, cuando se inicia PySpark shell. En caso de que intente crear otro objeto SparkContext, obtendrá el siguiente error:"ValueError: Cannot run multiple SparkContexts at once".

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

Ejemplo de SparkContext: programa Python

Ejecutemos el mismo ejemplo usando un programa de Python. Crea un archivo de Python llamadofirstapp.py e ingrese el siguiente código en ese archivo.

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"  
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

Luego ejecutaremos el siguiente comando en la terminal para ejecutar este archivo de Python. Obtendremos el mismo resultado que el anterior.

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30

Ahora que hemos instalado y configurado PySpark en nuestro sistema, podemos programar en Python en Apache Spark. Sin embargo, antes de hacerlo, comprendamos un concepto fundamental en Spark: RDD.

RDD significa Resilient Distributed Dataset, estos son los elementos que se ejecutan y operan en varios nodos para realizar un procesamiento paralelo en un clúster. Los RDD son elementos inmutables, lo que significa que una vez que crea un RDD no puede cambiarlo. Los RDD también son tolerantes a fallas, por lo tanto, en caso de falla, se recuperan automáticamente. Puede aplicar varias operaciones en estos RDD para lograr una determinada tarea.

Para aplicar operaciones en estos RDD, hay dos formas:

  • Transformación y
  • Action

Entendamos estas dos formas en detalle.

Transformation- Estas son las operaciones, que se aplican en un RDD para crear un nuevo RDD. Filter, groupBy y map son ejemplos de transformaciones.

Action - Estas son las operaciones que se aplican en RDD, lo que le indica a Spark que realice el cálculo y envíe el resultado al controlador.

Para aplicar cualquier operación en PySpark, necesitamos crear un PySpark RDDprimero. El siguiente bloque de código tiene el detalle de una clase RDD de PySpark:

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Veamos cómo ejecutar algunas operaciones básicas usando PySpark. El siguiente código en un archivo Python crea palabras RDD, que almacena un conjunto de palabras mencionadas.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

Ahora ejecutaremos algunas operaciones con palabras.

contar()

Se devuelve el número de elementos del RDD.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command - El comando para count () es -

$SPARK_HOME/bin/spark-submit count.py

Output - La salida para el comando anterior es -

Number of elements in RDD → 8

recoger()

Se devuelven todos los elementos del RDD.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command - El comando para recolectar () es -

$SPARK_HOME/bin/spark-submit collect.py

Output - La salida para el comando anterior es -

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (f)

Devuelve solo aquellos elementos que cumplen la condición de la función dentro de foreach. En el siguiente ejemplo, llamamos a una función de impresión en foreach, que imprime todos los elementos en el RDD.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command - El comando para foreach (f) es -

$SPARK_HOME/bin/spark-submit foreach.py

Output - La salida para el comando anterior es -

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filtro (f)

Se devuelve un nuevo RDD que contiene los elementos, que satisface la función dentro del filtro. En el siguiente ejemplo, filtramos las cadenas que contienen "chispa".

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command - El comando para el filtro (f) es -

$SPARK_HOME/bin/spark-submit filter.py

Output - La salida para el comando anterior es -

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map (f, preservesPartitioning = False)

Se devuelve un nuevo RDD aplicando una función a cada elemento del RDD. En el siguiente ejemplo, formamos un par clave-valor y asignamos cada cadena con un valor de 1.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command - El comando para map (f, preservesPartitioning = False) es -

$SPARK_HOME/bin/spark-submit map.py

Output - La salida del comando anterior es -

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

reducir (f)

Después de realizar la operación binaria conmutativa y asociativa especificada, se devuelve el elemento en el RDD. En el siguiente ejemplo, estamos importando add package del operador y aplicándolo en 'num' para realizar una operación de suma simple.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command - El comando para reducir (f) es -

$SPARK_HOME/bin/spark-submit reduce.py

Output - La salida del comando anterior es -

Adding all the elements -> 15

unirse (otro, numPartitions = None)

Devuelve RDD con un par de elementos con las claves coincidentes y todos los valores para esa clave en particular. En el siguiente ejemplo, hay dos pares de elementos en dos RDD diferentes. Después de unir estos dos RDD, obtenemos un RDD con elementos que tienen claves coincidentes y sus valores.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command - El comando para unirse (otro, numPartitions = None) es -

$SPARK_HOME/bin/spark-submit join.py

Output - La salida para el comando anterior es -

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache()

Conserve este RDD con el nivel de almacenamiento predeterminado (MEMORY_ONLY). También puede verificar si el RDD está en caché o no.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command - El comando para cache () es -

$SPARK_HOME/bin/spark-submit cache.py

Output - La salida para el programa anterior es -

Words got cached -> True

Estas fueron algunas de las operaciones más importantes que se realizan en PySpark RDD.

Para el procesamiento paralelo, Apache Spark usa variables compartidas. Una copia de la variable compartida va en cada nodo del clúster cuando el controlador envía una tarea al ejecutor en el clúster, para que pueda usarse para realizar tareas.

Apache Spark admite dos tipos de variables compartidas:

  • Broadcast
  • Accumulator

Entendamos en detalle.

Transmitir

Las variables de difusión se utilizan para guardar la copia de datos en todos los nodos. Esta variable se almacena en caché en todas las máquinas y no se envía a las máquinas con tareas. El siguiente bloque de código tiene los detalles de una clase Broadcast para PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

El siguiente ejemplo muestra cómo utilizar una variable de difusión. Una variable de difusión tiene un atributo llamado valor, que almacena los datos y se utiliza para devolver un valor de difusión.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - El comando para una variable de difusión es el siguiente:

$SPARK_HOME/bin/spark-submit broadcast.py

Output - El resultado del siguiente comando se muestra a continuación.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Acumulador

Las variables acumuladoras se utilizan para agregar la información mediante operaciones asociativas y conmutativas. Por ejemplo, puede usar un acumulador para una operación de suma o contadores (en MapReduce). El siguiente bloque de código tiene los detalles de una clase Accumulator para PySpark.

class pyspark.Accumulator(aid, value, accum_param)

El siguiente ejemplo muestra cómo utilizar una variable de acumulador. Una variable acumuladora tiene un atributo llamado valor que es similar a lo que tiene una variable de difusión. Almacena los datos y se usa para devolver el valor del acumulador, pero solo se puede usar en un programa controlador.

En este ejemplo, varios trabajadores utilizan una variable acumuladora y devuelve un valor acumulado.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - El comando para una variable de acumulador es el siguiente -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - El resultado del comando anterior se muestra a continuación.

Accumulated value is -> 150

Para ejecutar una aplicación Spark en el clúster / local, debe establecer algunas configuraciones y parámetros, esto es con lo que ayuda SparkConf. Proporciona configuraciones para ejecutar una aplicación Spark. El siguiente bloque de código tiene los detalles de una clase SparkConf para PySpark.

class pyspark.SparkConf (
   loadDefaults = True, 
   _jvm = None, 
   _jconf = None
)

Inicialmente, crearemos un objeto SparkConf con SparkConf (), que cargará los valores de spark.*También propiedades del sistema Java. Ahora puede establecer diferentes parámetros usando el objeto SparkConf y sus parámetros tendrán prioridad sobre las propiedades del sistema.

En una clase SparkConf, hay métodos de establecimiento que admiten el encadenamiento. Por ejemplo, puedes escribirconf.setAppName(“PySpark App”).setMaster(“local”). Una vez que pasamos un objeto SparkConf a Apache Spark, ningún usuario no puede modificarlo.

A continuación, se muestran algunos de los atributos más utilizados de SparkConf:

  • set(key, value) - Para establecer una propiedad de configuración.

  • setMaster(value) - Para configurar la URL maestra.

  • setAppName(value) - Para establecer un nombre de aplicación.

  • get(key, defaultValue=None) - Obtener un valor de configuración de una clave.

  • setSparkHome(value) - Para establecer la ruta de instalación de Spark en los nodos trabajadores.

Consideremos el siguiente ejemplo de uso de SparkConf en un programa PySpark. En este ejemplo, configuramos el nombre de la aplicación Spark comoPySpark App y configurar la URL maestra para una aplicación Spark en → spark://master:7077.

El siguiente bloque de código tiene las líneas, cuando se agregan en el archivo Python, establece las configuraciones básicas para ejecutar una aplicación PySpark.

---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------

En Apache Spark, puede cargar sus archivos usando sc.addFile (sc es su SparkContext predeterminado) y obtenga la ruta en un trabajador usando SparkFiles.get. Por lo tanto, SparkFiles resuelve las rutas a los archivos agregados a través deSparkContext.addFile().

SparkFiles contienen los siguientes métodos de clase:

  • get(filename)
  • getrootdirectory()

Entendamos en detalle.

get (nombre de archivo)

Especifica la ruta del archivo que se agrega a través de SparkContext.addFile ().

getrootdirectory ()

Especifica la ruta al directorio raíz, que contiene el archivo que se agrega a través de SparkContext.addFile ().

----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------

Command - El comando es el siguiente -

$SPARK_HOME/bin/spark-submit sparkfiles.py

Output - La salida para el comando anterior es -

Absolute Path -> 
   /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R

StorageLevel decide cómo se debe almacenar RDD. En Apache Spark, StorageLevel decide si RDD debe almacenarse en la memoria o debe almacenarse en el disco, o ambos. También decide si serializar RDD y si replicar particiones RDD.

El siguiente bloque de código tiene la definición de clase de StorageLevel:

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

Ahora, para decidir el almacenamiento de RDD, existen diferentes niveles de almacenamiento, que se dan a continuación:

  • DISK_ONLY = StorageLevel (Verdadero, Falso, Falso, Falso, 1)

  • DISK_ONLY_2 = StorageLevel (Verdadero, Falso, Falso, Falso, 2)

  • MEMORY_AND_DISK = StorageLevel (Verdadero, Verdadero, Falso, Falso, 1)

  • MEMORY_AND_DISK_2 = StorageLevel (Verdadero, Verdadero, Falso, Falso, 2)

  • MEMORY_AND_DISK_SER = StorageLevel (Verdadero, Verdadero, Falso, Falso, 1)

  • MEMORY_AND_DISK_SER_2 = StorageLevel (Verdadero, Verdadero, Falso, Falso, 2)

  • MEMORY_ONLY = StorageLevel (Falso, Verdadero, Falso, Falso, 1)

  • MEMORY_ONLY_2 = StorageLevel (Falso, Verdadero, Falso, Falso, 2)

  • MEMORY_ONLY_SER = StorageLevel (Falso, Verdadero, Falso, Falso, 1)

  • MEMORY_ONLY_SER_2 = StorageLevel (Falso, Verdadero, Falso, Falso, 2)

  • OFF_HEAP = StorageLevel (Verdadero, Verdadero, Verdadero, Falso, 1)

Consideremos el siguiente ejemplo de StorageLevel, donde usamos el nivel de almacenamiento MEMORY_AND_DISK_2, lo que significa que las particiones RDD tendrán la replicación de 2.

------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
   "local", 
   "storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------

Command - El comando es el siguiente -

$SPARK_HOME/bin/spark-submit storagelevel.py

Output - La salida para el comando anterior se da a continuación -

Disk Memory Serialized 2x Replicated

Apache Spark ofrece una API de aprendizaje automático llamada MLlib. PySpark también tiene esta API de aprendizaje automático en Python. Admite diferentes tipos de algoritmos, que se mencionan a continuación:

  • mllib.classification - el spark.mllibEl paquete admite varios métodos de clasificación binaria, clasificación multiclase y análisis de regresión. Algunos de los algoritmos de clasificación más populares sonRandom Forest, Naive Bayes, Decision Treeetc.

  • mllib.clustering - La agrupación en clústeres es un problema de aprendizaje no supervisado, en el que el objetivo es agrupar subconjuntos de entidades entre sí en función de alguna noción de similitud.

  • mllib.fpm- La coincidencia de patrones frecuente consiste en extraer elementos frecuentes, conjuntos de elementos, subsecuencias u otras subestructuras que generalmente se encuentran entre los primeros pasos para analizar un conjunto de datos a gran escala. Este ha sido un tema de investigación activo en la minería de datos durante años.

  • mllib.linalg - Utilidades MLlib para álgebra lineal.

  • mllib.recommendation- El filtrado colaborativo se usa comúnmente para los sistemas de recomendación. Estas técnicas tienen como objetivo completar las entradas faltantes de una matriz de asociación de elementos de usuario.

  • spark.mllib- Actualmente es compatible con el filtrado colaborativo basado en modelos, en el que los usuarios y los productos se describen mediante un pequeño conjunto de factores latentes que se pueden utilizar para predecir las entradas que faltan. spark.mllib utiliza el algoritmo de mínimos cuadrados alternos (ALS) para conocer estos factores latentes.

  • mllib.regression- La regresión lineal pertenece a la familia de los algoritmos de regresión. El objetivo de la regresión es encontrar relaciones y dependencias entre variables. La interfaz para trabajar con modelos de regresión lineal y resúmenes de modelos es similar al caso de regresión logística.

Hay otros algoritmos, clases y funciones también como parte del paquete mllib. A partir de ahora, entendamos una demostración enpyspark.mllib.

El siguiente ejemplo es de filtrado colaborativo utilizando el algoritmo ALS para construir el modelo de recomendación y evaluarlo en datos de entrenamiento.

Dataset used - test.data

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)
   
   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))
   
   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------

Command - El comando será el siguiente -

$SPARK_HOME/bin/spark-submit recommend.py

Output - La salida del comando anterior será -

Mean Squared Error = 1.20536041839e-05

La serialización se usa para ajustar el rendimiento en Apache Spark. Todos los datos que se envían a través de la red o se escriben en el disco o se conservan en la memoria deben serializarse. La serialización juega un papel importante en operaciones costosas.

PySpark admite serializadores personalizados para ajustar el rendimiento. Los siguientes dos serializadores son compatibles con PySpark:

MarshalSerializer

Serializa objetos usando el serializador Marshal de Python. Este serializador es más rápido que PickleSerializer, pero admite menos tipos de datos.

class pyspark.MarshalSerializer

PickleSerializer

Serializa objetos usando el serializador Pickle de Python. Este serializador es compatible con casi cualquier objeto de Python, pero puede que no sea tan rápido como los serializadores más especializados.

class pyspark.PickleSerializer

Veamos un ejemplo sobre la serialización de PySpark. Aquí, serializamos los datos usando MarshalSerializer.

--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------

Command - El comando es el siguiente -

$SPARK_HOME/bin/spark-submit serializing.py

Output - La salida del comando anterior es -

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]