Especificar el esquema mediante programación

El segundo método para crear DataFrame es a través de una interfaz programática que le permite construir un esquema y luego aplicarlo a un RDD existente. Podemos crear un DataFrame mediante programación usando los siguientes tres pasos.

  • Cree un RDD de filas a partir de un RDD original.

  • Cree el esquema representado por un StructType que coincida con la estructura de Rows en el RDD creado en el Paso 1.

  • Aplique el esquema al RDD de filas a través del método createDataFrame proporcionado por SQLContext.

Ejemplo

Consideremos un ejemplo de registros de empleados en un archivo de texto llamado employee.txt. Cree un esquema usando DataFrame directamente leyendo los datos del archivo de texto.

Given Data - Mira los siguientes datos de un archivo llamado employee.txt colocado en el directorio actual respectivo donde se está ejecutando el punto de chispa.

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

Siga los pasos que se indican a continuación para generar un esquema mediante programación.

Abrir Spark Shell

Inicie el shell Spark con el siguiente ejemplo.

$ spark-shell

Crear objeto SQLContext

Genere SQLContext usando el siguiente comando. Aquí,sc significa objeto SparkContext.

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Leer entrada de archivo de texto

Cree un marco de datos RDD leyendo un dato del archivo de texto llamado employee.txt usando el siguiente comando.

scala> val employee = sc.textFile("employee.txt")

Crear un esquema codificado en formato de cadena

Utilice el siguiente comando para crear un esquema codificado en formato de cadena. Eso significa, asuma la estructura de campo de una tabla y pase los nombres de campo usando algún delimitador.

scala> val schemaString = "id name age"

Salida

schemaString: String = id name age

Importar API respectivas

Utilice el siguiente comando para importar funciones de fila y tipos de datos SQL.

scala> import org.apache.spark.sql.Row;
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType};

Generar esquema

El siguiente comando se usa para generar un esquema leyendo el schemaStringvariable. Significa que debe leer cada campo dividiendo la cadena completa con un espacio como delimitador y tomar cada tipo de campo como tipo Cadena, por defecto.

scala> val schema = StructType(schemaString.split(" ").map(fieldName ⇒ StructField(fieldName, StringType, true)))

Aplicar transformación para leer datos de un archivo de texto

Utilice el siguiente comando para convertir un RDD (empleado) en Filas. Significa que aquí estamos especificando la lógica para leer los datos RDD y almacenarlos en rowRDD. Aquí estamos usando dos funciones de mapa: una es un delimitador para dividir la cadena de registro (.map(_.split(","))) y la segunda función de mapa para definir una Fila con el valor del índice de campo (.map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))).

scala> val rowRDD = employee.map(_.split(",")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))

Aplicar RowRDD en datos de fila según el esquema

Use la siguiente declaración para crear un DataFrame usando rowRDD datos y schema (ESQUEMA) variable.

scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema)

Salida

employeeDF: org.apache.spark.sql.DataFrame = [id: string, name: string, age: string]

Almacenar datos de DataFrame en la tabla

Use el siguiente comando para almacenar el DataFrame en una tabla llamada employee.

scala> employeeDF.registerTempTable("employee")

los employeela mesa ya está lista. Pasemos algunas consultas SQL a la tabla usando el métodoSQLContext.sql().

Seleccione Consulta en DataFrame

Utilice la siguiente declaración para seleccionar todos los registros del employeemesa. Aquí usamos la variableallrecordspara capturar todos los datos de los registros. Para mostrar esos registros, llameshow() método en él.

scala> val allrecords = sqlContext.sql("SELECT * FROM employee")

Para ver los datos de resultados de allrecords DataFrame, use el siguiente comando.

scala> allrecords.show()

Salida

+------+--------+----+
|  id  | name   |age |
+------+--------+----+
| 1201 | satish | 25 |
| 1202 | krishna| 28 |
| 1203 | amith  | 39 |
| 1204 | javed  | 23 |
| 1205 | prudvi | 23 |
+------+--------+----+

El método sqlContext.sqlle permite construir DataFrames cuando las columnas y sus tipos no se conocen hasta el tiempo de ejecución. Ahora puede ejecutar diferentes consultas SQL en él.