Inferir el esquema usando la reflexión

Este método utiliza la reflexión para generar el esquema de un RDD que contiene tipos específicos de objetos. La interfaz de Scala para Spark SQL admite la conversión automática de un RDD que contiene clases de casos en un DataFrame. loscase classdefine el esquema de la tabla. Los nombres de los argumentos de la clase de caso se leen mediante reflexión y se convierten en los nombres de las columnas.

Las clases de casos también pueden estar anidadas o contener tipos complejos como secuencias o matrices. Este RDD se puede convertir implícitamente en un DataFrame y luego registrarlo como una tabla. Las tablas se pueden utilizar en sentencias SQL posteriores.

Ejemplo

Consideremos un ejemplo de registros de empleados en un archivo de texto llamado employee.txt. Cree un RDD leyendo los datos del archivo de texto y conviértalo en DataFrame usando las funciones SQL predeterminadas.

Given Data - Eche un vistazo a los siguientes datos de un archivo llamado employee.txt lo colocó en el directorio respectivo actual donde se está ejecutando el punto de chispa.

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

Los siguientes ejemplos explican cómo generar un esquema utilizando Reflections.

Iniciar Spark Shell

Inicie Spark Shell con el siguiente comando.

$ spark-shell

Crear SQLContext

Genere SQLContext usando el siguiente comando. Aquí,sc significa objeto SparkContext.

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Importar funciones SQL

Utilice el siguiente comando para importar todas las funciones SQL utilizadas para convertir implícitamente un RDD en un DataFrame.

scala> import sqlContext.implicts._

Crear clase de caso

A continuación, tenemos que definir un esquema para los datos del registro de empleados usando una clase de caso. El siguiente comando se utiliza para declarar la clase de caso en función de los datos proporcionados (id, nombre, edad).

scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee

Cree RDD y aplique transformaciones

Utilice el siguiente comando para generar un RDD llamado empl leyendo los datos de employee.txt y convertirlo en DataFrame, usando las funciones de Mapa.

Aquí, se definen dos funciones de mapa. Uno es para dividir el registro de texto en campos (.map(_.split(“,”))) y la segunda función de mapa para convertir campos individuales (id, nombre, edad) en un objeto de clase de caso (.map(e(0).trim.toInt, e(1), e(2).trim.toInt)).

Al final, toDF() El método se utiliza para convertir el objeto de clase de caso con esquema en un DataFrame.

scala> val empl=sc.textFile("employee.txt")
.map(_.split(","))
.map(e⇒ employee(e(0).trim.toInt,e(1), e(2).trim.toInt))
.toDF()

Salida

empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]

Almacene los datos del DataFrame en una tabla

Use el siguiente comando para almacenar los datos del DataFrame en una tabla llamada employee. Después de este comando, podemos aplicarle todo tipo de sentencias SQL.

scala> empl.registerTempTable("employee")

La mesa de empleados está lista. Pasemos ahora algunas consultas SQL en la tabla usandoSQLContext.sql() método.

Seleccione Consulta en DataFrame

Utilice el siguiente comando para seleccionar todos los registros del employeemesa. Aquí usamos la variableallrecordspara capturar todos los datos de los registros. Para mostrar esos registros, llameshow() método en él.

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")

Para ver los datos de resultados de allrecords DataFrame, use el siguiente comando.

scala> allrecords.show()

Salida

+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1203 | amith   | 39 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

Where Cláusula Consulta SQL en DataFrame

Utilice el siguiente comando para aplicar wheredeclaración en una tabla. Aquí, la variableagefilter almacena los registros de los empleados cuyas edades están entre 20 y 35 años.

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")

Para ver los datos de resultados de agefilter DataFrame, use el siguiente comando.

scala> agefilter.show()

Salida

<console>:25, took 0.112757 s
+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

Las dos consultas anteriores se pasaron a toda la tabla DataFrame. Ahora intentemos recuperar datos del DataFrame resultante aplicandoTransformations en eso.

Obtener valores de ID de agefilter DataFrame usando el índice de columna

La siguiente declaración se utiliza para obtener los valores de ID de agefilter Resultado RDD, usando índice de campo.

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

Salida

<console>:25, took 0.093844 s
ID: 1201
ID: 1202
ID: 1204
ID: 1205

Este enfoque basado en la reflexión conduce a un código más conciso y funciona bien cuando ya conoce el esquema mientras escribe su aplicación Spark.