sqlcontext spark read into scala elasticsearch apache-spark etl apache-spark-sql

scala - spark - Cómo agregar una nueva columna Struct a un DataFrame



spark scala sql (2)

Prueba esto:

import org.apache.spark.sql.functions._ df.registerTempTable("dt") dfres = sql("select struct(lat,lon) as colName from dt")

Actualmente estoy intentando extraer una base de datos de MongoDB y usar Spark para ingerir en ElasticSearch con geo_points .

La base de datos Mongo tiene valores de latitud y longitud, pero ElasticSearch requiere que se conviertan en el tipo geo_point .

¿Hay una manera en Spark para copiar las columnas lat y lon a una nueva columna que es una array o struct ?

Cualquier ayuda es apreciada!


Supongo que empiezas con algún tipo de esquema plano como este:

root |-- lat: double (nullable = false) |-- long: double (nullable = false) |-- key: string (nullable = false)

Primero vamos a crear datos de ejemplo:

import org.apache.spark.sql.Row import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types._ val rdd = sc.parallelize( Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil) val schema = StructType( StructField("lat", DoubleType, false) :: StructField("long", DoubleType, false) :: StructField("key", StringType, false) ::Nil) val df = sqlContext.createDataFrame(rdd, schema)

Una forma fácil es usar una clase udf y case:

case class Location(lat: Double, long: Double) val makeLocation = udf((lat: Double, long: Double) => Location(lat, long)) val dfRes = df. withColumn("location", makeLocation(col("lat"), col("long"))). drop("lat"). drop("long") dfRes.printSchema

y obtenemos

root |-- key: string (nullable = false) |-- location: struct (nullable = true) | |-- lat: double (nullable = false) | |-- long: double (nullable = false)

Una manera difícil es transformar sus datos y aplicar el esquema después:

val rddRes = df. map{case Row(lat, long, key) => Row(key, Row(lat, long))} val schemaRes = StructType( StructField("key", StringType, false) :: StructField("location", StructType( StructField("lat", DoubleType, false) :: StructField("long", DoubleType, false) :: Nil ), true) :: Nil ) sqlContext.createDataFrame(rddRes, schemaRes).show

y obtenemos una salida esperada

+------+-------------+ | key| location| +------+-------------+ |Warsaw|[52.23,21.01]| | Corte| [42.3,9.15]| +------+-------------+

Crear esquemas anidados desde cero puede ser tedioso, así que si puede recomendar el primer enfoque. Se puede extender fácilmente si necesita una estructura más sofisticada:

case class Pin(location: Location) val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long)) df. withColumn("pin", makePin(col("lat"), col("long"))). drop("lat"). drop("long"). printSchema

y obtenemos la salida esperada:

root |-- key: string (nullable = false) |-- pin: struct (nullable = true) | |-- location: struct (nullable = true) | | |-- lat: double (nullable = false) | | |-- long: double (nullable = false)

Desafortunadamente, no tiene control sobre el campo que puede contener nullable , por lo que si es importante para su proyecto, tendrá que especificar el esquema.

Finalmente puedes usar la función struct introducida en 1.4:

import org.apache.spark.sql.functions.struct df.select($"key", struct($"lat", $"long").alias("location"))