Apache Spark - Programación central
Spark Core es la base de todo el proyecto. Proporciona despacho de tareas distribuidas, programación y funcionalidades básicas de E / S. Spark usa una estructura de datos fundamental especializada conocida como RDD (Resilient Distributed Datasets) que es una colección lógica de datos particionados entre máquinas. Los RDD se pueden crear de dos formas; una es hacer referencia a conjuntos de datos en sistemas de almacenamiento externos y la segunda es aplicar transformaciones (por ejemplo, mapa, filtro, reductor, unión) en los RDD existentes.
La abstracción RDD se expone a través de una API integrada en el lenguaje. Esto simplifica la complejidad de la programación porque la forma en que las aplicaciones manipulan los RDD es similar a la manipulación de colecciones locales de datos.
Spark Shell
Spark proporciona un shell interactivo, una poderosa herramienta para analizar datos de forma interactiva. Está disponible en lenguaje Scala o Python. La abstracción principal de Spark es una colección distribuida de elementos denominada Resilient Distributed Dataset (RDD). Los RDD se pueden crear a partir de formatos de entrada de Hadoop (como archivos HDFS) o transformando otros RDD.
Abrir Spark Shell
El siguiente comando se usa para abrir Spark shell.
$ spark-shell
Crear RDD simple
Creemos un RDD simple a partir del archivo de texto. Utilice el siguiente comando para crear un RDD simple.
scala> val inputfile = sc.textFile(“input.txt”)
La salida para el comando anterior es
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
La API Spark RDD presenta algunas Transformations y pocos Actions manipular RDD.
Transformaciones RDD
Las transformaciones RDD devuelven el puntero a un RDD nuevo y le permiten crear dependencias entre RDD. Cada RDD en la cadena de dependencia (Cadena de dependencias) tiene una función para calcular sus datos y tiene un puntero (dependencia) a su RDD principal.
Spark es perezoso, por lo que no se ejecutará nada a menos que llame a alguna transformación o acción que desencadene la creación y ejecución de trabajos. Mire el siguiente fragmento del ejemplo de recuento de palabras.
Por lo tanto, la transformación RDD no es un conjunto de datos, sino un paso en un programa (podría ser el único paso) que le dice a Spark cómo obtener datos y qué hacer con ellos.
A continuación se muestra una lista de transformaciones RDD.
S. No | Transformaciones y significado |
---|---|
1 |
map(func) Devuelve un nuevo conjunto de datos distribuido, formado al pasar cada elemento de la fuente a través de una función. func. |
2 |
filter(func) Devuelve un nuevo conjunto de datos formado seleccionando aquellos elementos de la fuente en los que func devuelve verdadero. |
3 |
flatMap(func) Similar al mapa, pero cada elemento de entrada se puede asignar a 0 o más elementos de salida (por lo que func debería devolver una secuencia en lugar de un solo elemento). |
4 |
mapPartitions(func) Similar al mapa, pero se ejecuta por separado en cada partición (bloque) del RDD, por lo que func debe ser de tipo Iterator <T> ⇒ Iterator <U> cuando se ejecuta en un RDD de tipo T. |
5 |
mapPartitionsWithIndex(func) Similar a las particiones de mapa, pero también proporciona func con un valor entero que representa el índice de la partición, entonces func debe ser de tipo (Int, Iterator <T>) ⇒ Iterator <U> cuando se ejecuta en un RDD de tipo T. |
6 |
sample(withReplacement, fraction, seed) Muestra un fraction de los datos, con o sin reemplazo, utilizando una semilla generadora de números aleatorios dada. |
7 |
union(otherDataset) Devuelve un nuevo conjunto de datos que contiene la unión de los elementos del conjunto de datos de origen y el argumento. |
8 |
intersection(otherDataset) Devuelve un nuevo RDD que contiene la intersección de elementos en el conjunto de datos de origen y el argumento. |
9 |
distinct([numTasks]) Devuelve un nuevo conjunto de datos que contiene los distintos elementos del conjunto de datos de origen. |
10 |
groupByKey([numTasks]) Cuando se llama a un conjunto de datos de pares (K, V), devuelve un conjunto de datos de pares (K, Iterable <V>). Note - Si está agrupando para realizar una agregación (como una suma o un promedio) sobre cada clave, el uso de reduceByKey o aggregateByKey producirá un rendimiento mucho mejor. |
11 |
reduceByKey(func, [numTasks]) Cuando se llama en un conjunto de datos de pares (K, V), devuelve un conjunto de datos de pares (K, V), donde los valores para cada tecla se agregan utilizando la función dada reducir func , que debe ser de tipo (V, V) ⇒ V Como en groupByKey, la cantidad de tareas de reducción se puede configurar mediante un segundo argumento opcional. |
12 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) Cuando se invoca en un conjunto de datos de pares (K, V), devuelve un conjunto de datos de pares (K, U) donde los valores de cada clave se agregan utilizando las funciones combinadas dadas y un valor "cero" neutral. Permite un tipo de valor agregado que es diferente del tipo de valor de entrada, al tiempo que evita asignaciones innecesarias. Como en groupByKey, el número de tareas de reducción se puede configurar mediante un segundo argumento opcional. |
13 |
sortByKey([ascending], [numTasks]) Cuando se llama a un conjunto de datos de pares (K, V) donde K implementa Ordered, devuelve un conjunto de datos de pares (K, V) ordenados por claves en orden ascendente o descendente, como se especifica en el argumento ascendente booleano. |
14 |
join(otherDataset, [numTasks]) Cuando se llama a conjuntos de datos de tipo (K, V) y (K, W), devuelve un conjunto de datos de pares (K, (V, W)) con todos los pares de elementos para cada clave. Las combinaciones externas se admiten a través de leftOuterJoin, rightOuterJoin y fullOuterJoin. |
15 |
cogroup(otherDataset, [numTasks]) Cuando se invoca en conjuntos de datos de tipo (K, V) y (K, W), devuelve un conjunto de datos de (K, (Iterable <V>, Iterable <W>)) tuplas. Esta operación también se denomina grupo con. |
dieciséis |
cartesian(otherDataset) Cuando se invoca en conjuntos de datos de tipos T y U, devuelve un conjunto de datos de pares (T, U) (todos los pares de elementos). |
17 |
pipe(command, [envVars]) Canalice cada partición del RDD a través de un comando de shell, por ejemplo, un script en Perl o bash. Los elementos RDD se escriben en el stdin del proceso y las líneas de salida a su stdout se devuelven como un RDD de cadenas. |
18 |
coalesce(numPartitions) Disminuya el número de particiones en el RDD a numPartitions. Útil para ejecutar operaciones de manera más eficiente después de filtrar un gran conjunto de datos. |
19 |
repartition(numPartitions) Reorganice los datos en el RDD de forma aleatoria para crear más o menos particiones y equilibrarlas entre ellas. Esto siempre baraja todos los datos de la red. |
20 |
repartitionAndSortWithinPartitions(partitioner) Reparta el RDD de acuerdo con el particionador dado y, dentro de cada partición resultante, ordena los registros por sus claves. Esto es más eficiente que llamar a repartición y luego ordenar dentro de cada partición porque puede empujar la ordenación hacia abajo en la maquinaria de reproducción aleatoria. |
Comportamiento
La siguiente tabla proporciona una lista de acciones, que devuelven valores.
S. No | Acción y significado |
---|---|
1 |
reduce(func) Agregue los elementos del conjunto de datos usando una función func(que toma dos argumentos y devuelve uno). La función debe ser conmutativa y asociativa para que se pueda calcular correctamente en paralelo. |
2 |
collect() Devuelve todos los elementos del conjunto de datos como una matriz en el programa controlador. Esto suele ser útil después de un filtro u otra operación que devuelve un subconjunto suficientemente pequeño de datos. |
3 |
count() Devuelve el número de elementos del conjunto de datos. |
4 |
first() Devuelve el primer elemento del conjunto de datos (similar a take (1)). |
5 |
take(n) Devuelve una matriz con la primera n elementos del conjunto de datos. |
6 |
takeSample (withReplacement,num, [seed]) Devuelve una matriz con una muestra aleatoria de num elementos del conjunto de datos, con o sin reemplazo, opcionalmente pre-especificando una semilla generadora de números aleatorios. |
7 |
takeOrdered(n, [ordering]) Devuelve el primero n elementos del RDD utilizando su orden natural o un comparador personalizado. |
8 |
saveAsTextFile(path) Escribe los elementos del conjunto de datos como un archivo de texto (o un conjunto de archivos de texto) en un directorio dado en el sistema de archivos local, HDFS o cualquier otro sistema de archivos compatible con Hadoop. Spark llama a toString en cada elemento para convertirlo en una línea de texto en el archivo. |
9 |
saveAsSequenceFile(path) (Java and Scala) Escribe los elementos del conjunto de datos como un archivo de secuencia de Hadoop en una ruta determinada en el sistema de archivos local, HDFS o cualquier otro sistema de archivos compatible con Hadoop. Está disponible en RDD de pares clave-valor que implementan la interfaz de escritura de Hadoop. En Scala, también está disponible en tipos que son implícitamente convertibles a Writable (Spark incluye conversiones para tipos básicos como Int, Double, String, etc.). |
10 |
saveAsObjectFile(path) (Java and Scala) Escribe los elementos del conjunto de datos en un formato simple usando la serialización de Java, que luego se puede cargar usando SparkContext.objectFile (). |
11 |
countByKey() Solo disponible en RDD de tipo (K, V). Devuelve un mapa hash de pares (K, Int) con el recuento de cada clave. |
12 |
foreach(func) Ejecuta una función funcen cada elemento del conjunto de datos. Por lo general, esto se hace por efectos secundarios como actualizar un acumulador o interactuar con sistemas de almacenamiento externos. Note- La modificación de variables distintas de los acumuladores fuera de foreach () puede dar como resultado un comportamiento indefinido. Consulte Comprensión de los cierres para obtener más detalles. |
Programando con RDD
Veamos las implementaciones de algunas transformaciones y acciones de RDD en la programación de RDD con la ayuda de un ejemplo.
Ejemplo
Considere un ejemplo de recuento de palabras: cuenta cada palabra que aparece en un documento. Considere el siguiente texto como entrada y se guarda comoinput.txt archivo en un directorio de inicio.
input.txt - archivo de entrada.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Siga el procedimiento que se indica a continuación para ejecutar el ejemplo dado.
Abrir Spark-Shell
El siguiente comando se usa para abrir Spark Shell. Generalmente, Spark se construye con Scala. Por lo tanto, un programa Spark se ejecuta en un entorno Scala.
$ spark-shell
Si Spark shell se abre correctamente, encontrará el siguiente resultado. Mire la última línea de la salida "Contexto Spark disponible como sc" significa que el contenedor Spark se crea automáticamente como objeto de contexto Spark con el nombresc. Antes de comenzar el primer paso de un programa, se debe crear el objeto SparkContext.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Crea un RDD
Primero, tenemos que leer el archivo de entrada usando Spark-Scala API y crear un RDD.
El siguiente comando se utiliza para leer un archivo desde una ubicación determinada. Aquí, se crea un nuevo RDD con el nombre de inputfile. La cadena que se proporciona como argumento en el método textFile (“”) es la ruta absoluta para el nombre del archivo de entrada. Sin embargo, si solo se proporciona el nombre del archivo, significa que el archivo de entrada está en la ubicación actual.
scala> val inputfile = sc.textFile("input.txt")
Ejecutar transformación de recuento de palabras
Nuestro objetivo es contar las palabras de un archivo. Cree un mapa plano para dividir cada línea en palabras (flatMap(line ⇒ line.split(“ ”)).
A continuación, lea cada palabra como una clave con un valor ‘1’ (<clave, valor> = <palabra, 1>) usando la función de mapa (map(word ⇒ (word, 1)).
Finalmente, reduzca esas claves agregando valores de claves similares (reduceByKey(_+_)).
El siguiente comando se utiliza para ejecutar la lógica de recuento de palabras. Después de ejecutar esto, no encontrará ninguna salida porque esto no es una acción, es una transformación; apuntando un nuevo RDD o decirle a Spark qué hacer con los datos dados)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
RDD actual
Mientras trabaja con el RDD, si desea conocer el RDD actual, utilice el siguiente comando. Le mostrará la descripción sobre RDD actual y sus dependencias para la depuración.
scala> counts.toDebugString
Almacenamiento en caché de las transformaciones
Puede marcar un RDD para que se conserve utilizando los métodos persist () o cache () en él. La primera vez que se calcula en una acción, se mantendrá en la memoria de los nodos. Utilice el siguiente comando para almacenar las transformaciones intermedias en la memoria.
scala> counts.cache()
Aplicar la acción
La aplicación de una acción, como almacenar todas las transformaciones, da como resultado un archivo de texto. El argumento String para el método saveAsTextFile (“”) es la ruta absoluta de la carpeta de salida. Pruebe el siguiente comando para guardar la salida en un archivo de texto. En el siguiente ejemplo, la carpeta 'salida' está en la ubicación actual.
scala> counts.saveAsTextFile("output")
Comprobación de la salida
Abra otra terminal para ir al directorio de inicio (donde se ejecuta Spark en la otra terminal). Utilice los siguientes comandos para verificar el directorio de salida.
[[email protected] ~]$ cd output/
[[email protected] output]$ ls -1
part-00000
part-00001
_SUCCESS
El siguiente comando se usa para ver la salida de Part-00000 archivos.
[[email protected] output]$ cat part-00000
Salida
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
El siguiente comando se usa para ver la salida de Part-00001 archivos.
[[email protected] output]$ cat part-00001
Salida
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
ONU persiste el almacenamiento
Antes de UN-persistente, si desea ver el espacio de almacenamiento que se utiliza para esta aplicación, utilice la siguiente URL en su navegador.
http://localhost:4040
Verá la siguiente pantalla, que muestra el espacio de almacenamiento utilizado para la aplicación, que se ejecuta en el shell Spark.
Si desea anular la persistencia del espacio de almacenamiento de un RDD en particular, utilice el siguiente comando.
Scala> counts.unpersist()
Verá la salida de la siguiente manera:
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
Para verificar el espacio de almacenamiento en el navegador, use la siguiente URL.
http://localhost:4040/
Verá la siguiente pantalla. Muestra el espacio de almacenamiento utilizado para la aplicación, que se ejecuta en el shell Spark.