scala apache-spark dataframe spark-dataframe apache-spark-sql spark-cassandra-connector

scala - ¿Cómo consultar la columna de datos JSON usando Spark DataFrames?



apache-spark spark-dataframe (4)

Tengo una mesa Cassandra que por simplicidad se parece a:

key: text jsonData: text blobData: blob

Puedo crear un marco de datos básico para esto usando spark y el conector spark-cassandra usando:

val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load()

Sin embargo, estoy luchando por expandir los datos JSON en su estructura subyacente. En última instancia, quiero poder filtrar en función de los atributos dentro de la cadena json y devolver los datos del blob. Algo así como jsonData.foo = "bar" y devuelve blobData. ¿Es esto posible actualmente?


La cadena JSON subyacente es

"{ /"column_name1/":/"value1/",/"column_name2/":/"value2/",/"column_name3/":/"value3/",/"column_name5/":/"value5/"}";

A continuación se muestra el script para filtrar el JSON y cargar los datos necesarios en Cassandra.

sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2") .write.format("org.apache.spark.sql.cassandra") .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name")) .mode(SaveMode.Append) .save()


La función from_json es exactamente lo que estás buscando. Su código se verá más o menos así:

val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load() //You can define whatever struct type that your json states val schema = StructType(Seq( StructField("key", StringType, true), StructField("value", DoubleType, true) )) df.withColumn("jsonData", from_json(col("jsonData"), schema))


Yo uso lo siguiente

(disponible desde 2.2.0, y supongo que su columna de cadena json está en el índice de columna 0)

def parse(df: DataFrame, spark: SparkSession): DataFrame = { val stringDf = df.map((value: Row) => value.getString(0), Encoders.STRING) spark.read.json(stringDf) }

Inferirá automáticamente el esquema en su JSON. Documentado aquí: https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html


Chispa> = 2.4

Si es necesario, el esquema se puede determinar utilizando la función schema_of_json (tenga en cuenta que esto supone que una fila arbitraria es un representante válido del esquema).

import org.apache.spark.sql.functions.{lit, schema_of_json} val schema = schema_of_json(lit(df.select($"jsonData").as[String].first)) df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]()))

Chispa> = 2.1

Puede usar la función from_json :

import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("k", StringType, true), StructField("v", DoubleType, true) )) df.withColumn("jsonData", from_json($"jsonData", schema))

Chispa> = 1.6

Puede usar get_json_object que toma una columna y una ruta:

import org.apache.spark.sql.functions.get_json_object val exprs = Seq("k", "v").map( c => get_json_object($"jsonData", s"$$.$c").alias(c)) df.select($"*" +: exprs: _*)

y extrae campos a cadenas individuales que se pueden convertir a los tipos esperados.

El argumento de la path se expresa utilizando la sintaxis de puntos, con los principales $. denotando la raíz del documento (dado que el código anterior usa la interpolación de cadenas $ tiene que escaparse, por lo tanto $$. .

Chispa <= 1.5 :

¿Es esto posible actualmente?

Hasta donde yo sé, no es directamente posible. Puedes probar algo similar a esto:

val df = sc.parallelize(Seq( ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") )).toDF("key", "jsonData", "blobData")

Supongo que el campo de blob no se puede representar en JSON. De lo contrario, puede omitir la división y la unión:

import org.apache.spark.sql.Row val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") val jsons = sqlContext.read.json(df.drop("blobData").map{ case Row(key: String, json: String) => s"""{"key": "$key", "jsonData": $json}""" }) val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") parsed.printSchema // root // |-- jsonData: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: double (nullable = true) // |-- key: long (nullable = true) // |-- blobData: string (nullable = true)

Un enfoque alternativo (más barato, aunque más complejo) es usar un UDF para analizar JSON y generar una struct o columna de map . Por ejemplo algo como esto:

import net.liftweb.json.parse case class KV(k: String, v: Int) val parseJson = udf((s: String) => { implicit val formats = net.liftweb.json.DefaultFormats parse(s).extract[KV] }) val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) parsed.show // +---+--------------------+------------------+----------+ // |key| jsonData| blobData|parsedJSON| // +---+--------------------+------------------+----------+ // | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| // | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| // +---+--------------------+------------------+----------+ parsed.printSchema // root // |-- key: string (nullable = true) // |-- jsonData: string (nullable = true) // |-- blobData: string (nullable = true) // |-- parsedJSON: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: integer (nullable = false)