tutorial spark read example espaƱol apache-spark apache-spark-sql spark-dataframe rdd apache-spark-dataset

apache-spark - read - spark sql example



Diferencia entre DataSet API y DataFrame API (2)

Apache Spark proporciona tres tipos de API

  1. RDD
  2. Marco de datos
  3. Dataset

Aquí está la comparación de las API entre RDD, Dataframe y Dataset.

RDD

La abstracción principal que proporciona Spark es un conjunto de datos distribuidos (RDD) resiliente, que es una colección de elementos divididos en los nodos del clúster que pueden operarse en paralelo.

Características de RDD:

  • Colección distribuida:
    El RDD utiliza las operaciones de MapReduce, que se adoptan ampliamente para procesar y generar grandes conjuntos de datos con un algoritmo paralelo distribuido en un clúster. Permite a los usuarios escribir cómputos en paralelo, utilizando un conjunto de operadores de alto nivel, sin tener que preocuparse por la distribución del trabajo y la tolerancia a fallas.

  • Inmutables: RDD compuestos por una colección de registros que están particionados. Una partición es una unidad básica de paralelismo en un RDD, y cada partición es una división lógica de datos que es inmutable y se crea a través de algunas transformaciones en particiones existentes. La imutabilidad ayuda a lograr coherencia en los cálculos.

  • Tolerante a fallas: en caso de que perdamos alguna partición de RDD, podemos reproducir la transformación en esa partición en linaje para lograr el mismo cálculo, en lugar de replicar datos en múltiples nodos. Esta característica es el mayor beneficio de RDD porque ahorra una gran cantidad de esfuerzos en la gestión y replicación de datos y, por lo tanto, logra cálculos más rápidos.

  • Evaluaciones vagas: todas las transformaciones en Spark son flojas, ya que no calculan sus resultados de inmediato. En su lugar, solo recuerdan las transformaciones aplicadas a algún conjunto de datos base. Las transformaciones solo se computan cuando una acción requiere que se devuelva un resultado al programa del controlador.

  • Transformaciones funcionales: los RDD admiten dos tipos de operaciones: transformaciones, que crean un nuevo conjunto de datos a partir de uno existente, y acciones, que devuelven un valor al programa del controlador después de ejecutar un cálculo en el conjunto de datos.

  • Formatos de procesamiento de datos:
    Puede procesar de manera fácil y eficiente los datos que están estructurados, así como los datos no estructurados.

  • Lenguajes de programación compatibles:
    La API de RDD está disponible en Java, Scala, Python y R.

Limitaciones de RDD: -

  • Sin motor de optimización incorporado: cuando se trabaja con datos estructurados, los RDD no pueden aprovechar las ventajas de los optimizadores avanzados de Spark, incluidos el optimizador de catalizador y el motor de ejecución de Tungsten. Los desarrolladores necesitan optimizar cada RDD en función de sus atributos.

  • Manejo de datos estructurados: a diferencia de Dataframe y datasets, los RDD no infieren el esquema de los datos ingeridos y requieren que el usuario lo especifique.

Dataframes

Spark introdujo Dataframes en la versión Spark 1.3. Dataframe supera los principales desafíos que tenían los RDD.

Un DataFrame es una colección distribuida de datos organizados en columnas con nombre. Es conceptualmente equivalente a una tabla en una base de datos relacional o un Dataframe de R / Python. Junto con Dataframe, Spark también introdujo el optimizador de catalizador, que aprovecha las funciones avanzadas de programación para crear un optimizador de consultas extensible.

Características de Dataframe:

  • Colección distribuida de Row Object: Un DataFrame es una colección distribuida de datos organizados en columnas con nombre. Es conceptualmente equivalente a una tabla en una base de datos relacional, pero con optimizaciones más ricas bajo el capó.

  • Procesamiento de datos: Procesamiento de formatos de datos estructurados y no estructurados (Avro, CSV, búsqueda elástica y Cassandra) y sistemas de almacenamiento (HDFS, tablas HIVE, MySQL, etc.). Puede leer y escribir desde todos estos diversos orígenes de datos.

  • Optimización utilizando el optimizador de catalizador: potencia las consultas SQL y la API de DataFrame. Marco de datos utiliza el marco de transformación del árbol catalizador en cuatro fases,

    1.Analyzing a logical plan to resolve references 2.Logical plan optimization 3.Physical planning 4.Code generation to compile parts of the query to Java bytecode.

  • Compatibilidad con Hive: con Spark SQL, puede ejecutar consultas Hive no modificadas en sus almacenes Hive existentes. Reutiliza Hive frontend y MetaStore y te ofrece compatibilidad total con datos, consultas y UDF existentes de Hive.

  • Tungsteno: tungsteno proporciona un backend de ejecución física que administra explícitamente la memoria y genera código de bytes dinámicamente para la evaluación de la expresión.

  • Lenguajes de programación compatibles:
    Dataframe API está disponible en Java, Scala, Python y R.

Limitaciones del marco de datos: -

  • Seguridad del tipo de tiempo de compilación: como se discutió, Dataframe API no es compatible con la seguridad del tiempo de compilación, que le impide manipular datos cuando no se conoce la estructura. El siguiente ejemplo funciona durante el tiempo de compilación. Sin embargo, obtendrá una excepción Runtime cuando ejecute este código.

Ejemplo:

case class Person(name : String , age : Int) val dataframe = sqlContext.read.json("people.json") dataframe.filter("salary > 10000").show => throws Exception : cannot resolve ''salary'' given input age , name

Esto es un desafío especialmente cuando se trabaja con varios pasos de transformación y agregación.

  • No se puede operar en el objeto de dominio (objeto de dominio perdido): una vez que haya transformado un objeto de dominio en un marco de datos, no podrá regenerarlo desde él. En el siguiente ejemplo, una vez que hemos creado personDF desde personRDD, no recuperaremos el RDD original de la clase Person (RDD [Person]).

Ejemplo:

case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContext.createDataframe(personRDD) personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

API de conjuntos de datos

Dataset API es una extensión de DataFrames que proporciona una interfaz de programación orientada a objetos segura para tipos. Es una colección inmutable de objetos fuertemente tipados que se asignan a un esquema relacional.

En el núcleo del Dataset, API es un nuevo concepto llamado codificador, que se encarga de convertir objetos JVM y representación tabular. La representación tabular se almacena utilizando el formato binario de tungsteno interno de Spark, lo que permite operaciones en datos serializados y una mejor utilización de la memoria. Spark 1.6 viene con soporte para la generación automática de codificadores para una amplia variedad de tipos, incluidos los tipos primitivos (por ejemplo, String, Integer, Long), clases de caso Scala y Java Beans.

Características del conjunto de datos:

  • Proporciona lo mejor de RDD y Dataframe: RDD (programación funcional, tipo seguro), DataFrame (modelo relacional, optimización de consultas, ejecución de tungsteno, clasificación y mezcla)

  • Encoders: con el uso de Encoders, es fácil convertir cualquier objeto JVM en un Dataset, lo que permite a los usuarios trabajar con datos estructurados y no estructurados a diferencia de Dataframe.

  • Lenguajes de programación admitidos: la API de conjuntos de datos actualmente solo está disponible en Scala y Java. Python y R actualmente no son compatibles con la versión 1.6. El soporte de Python está programado para la versión 2.0.

  • Tipo de seguridad: la API de conjuntos de datos proporciona seguridad de tiempo de compilación que no estaba disponible en Dataframes. En el siguiente ejemplo, podemos ver cómo Dataset puede operar en objetos de dominio con funciones de compilación lambda.

Ejemplo:

case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContext.createDataframe(personRDD) val ds:Dataset[Person] = personDF.as[Person] ds.filter(p => p.age > 25) ds.filter(p => p.salary > 25) // error : value salary is not a member of person ds.rdd // returns RDD[Person]

  • Interoperable: Datasets le permite convertir fácilmente sus RDD y Dataframes existentes en conjuntos de datos sin un código repetitivo.

Limitación de la API de conjuntos de datos: -

  • Requiere conversión de tipo a Cadena: consultar los datos de datasets actualmente nos exige especificar los campos de la clase como una cadena. Una vez que hemos consultado los datos, nos vemos obligados a enviar la columna al tipo de datos requerido. Por otro lado, si usamos la operación de mapa en Datasets, no usará el optimizador Catalyst.

Ejemplo:

ds.select(col("name").as[String], $"age".as[Int]).collect()

No es compatible con Python y R: a partir del release 1.6, los conjuntos de datos solo son compatibles con Scala y Java. El soporte de Python se introducirá en Spark 2.0.

La API Datasets ofrece varias ventajas con respecto a la API existente de RDD y Dataframe con mejor seguridad y programación funcional. Con el desafío de los requisitos de conversión de tipo en la API, aún no se requiere la seguridad de tipo requerida y hará que su código sea frágil.

Esta pregunta ya tiene una respuesta aquí:

¿Alguien puede ayudarme a entender la diferencia entre la API de DataSet y la API de DataFrame con el ejemplo? ¿Por qué era necesario introducir esta API?


Debido a que DataFrame está débilmente tipado y los desarrolladores no obtienen los beneficios del sistema de tipos. Por ejemplo, supongamos que quiere leer algo de SQL y ejecutar una agregación en él:

val people = sqlContext.read.parquet("...") val department = sqlContext.read.parquet("...") people.filter("age > 30") .join(department, people("deptId") === department("id")) .groupBy(department("name"), "gender") .agg(avg(people("salary")), max(people("age")))

Cuando dices people("deptId") , no people("deptId") un Int , o un Long , people("deptId") un objeto Column que debes operar. En los lenguajes con sistemas de tipo enriquecido como Scala, termina perdiendo todo tipo de seguridad, lo que aumenta el número de errores de tiempo de ejecución para las cosas que podrían descubrirse en tiempo de compilación.

Por el contrario, DataSet[T] está escrito. Cuando tu lo hagas:

val people: People = val people = sqlContext.read.parquet("...").as[People]

En realidad, está recuperando un objeto People , donde deptId es un tipo integral real y no un tipo de columna, aprovechando así el sistema de tipos.

A partir de Spark 2.0, las API de DataFrame y DataSet estarán unificadas, donde DataFrame será un alias de tipo para DataSet[Row] .