scala - ¿Cómo consultar la columna de datos JSON usando Spark DataFrames?
apache-spark spark-dataframe (4)
Tengo una mesa Cassandra que por simplicidad se parece a:
key: text
jsonData: text
blobData: blob
Puedo crear un marco de datos básico para esto usando spark y el conector spark-cassandra usando:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
Sin embargo, estoy luchando por expandir los datos JSON en su estructura subyacente. En última instancia, quiero poder filtrar en función de los atributos dentro de la cadena json y devolver los datos del blob. Algo así como jsonData.foo = "bar" y devuelve blobData. ¿Es esto posible actualmente?
La cadena JSON subyacente es
"{ /"column_name1/":/"value1/",/"column_name2/":/"value2/",/"column_name3/":/"value3/",/"column_name5/":/"value5/"}";
A continuación se muestra el script para filtrar el JSON y cargar los datos necesarios en Cassandra.
sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
.write.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
.mode(SaveMode.Append)
.save()
La función
from_json
es exactamente lo que estás buscando.
Su código se verá más o menos así:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
//You can define whatever struct type that your json states
val schema = StructType(Seq(
StructField("key", StringType, true),
StructField("value", DoubleType, true)
))
df.withColumn("jsonData", from_json(col("jsonData"), schema))
Yo uso lo siguiente
(disponible desde 2.2.0, y supongo que su columna de cadena json está en el índice de columna 0)
def parse(df: DataFrame, spark: SparkSession): DataFrame = {
val stringDf = df.map((value: Row) => value.getString(0), Encoders.STRING)
spark.read.json(stringDf)
}
Inferirá automáticamente el esquema en su JSON. Documentado aquí: https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html
Chispa> = 2.4
Si es necesario, el esquema se puede determinar utilizando la función
schema_of_json
(tenga en cuenta que esto supone que una fila arbitraria es un representante válido del esquema).
import org.apache.spark.sql.functions.{lit, schema_of_json}
val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]()))
Chispa> = 2.1
Puede usar la función
from_json
:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))
df.withColumn("jsonData", from_json($"jsonData", schema))
Chispa> = 1.6
Puede usar
get_json_object
que toma una columna y una ruta:
import org.apache.spark.sql.functions.get_json_object
val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))
df.select($"*" +: exprs: _*)
y extrae campos a cadenas individuales que se pueden convertir a los tipos esperados.
El argumento de la
path
se expresa utilizando la sintaxis de puntos, con los principales
$.
denotando la raíz del documento (dado que el código anterior usa la interpolación de cadenas
$
tiene que escaparse, por lo tanto
$$.
.
Chispa <= 1.5 :
¿Es esto posible actualmente?
Hasta donde yo sé, no es directamente posible. Puedes probar algo similar a esto:
val df = sc.parallelize(Seq(
("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")
Supongo que el campo de
blob
no se puede representar en JSON.
De lo contrario, puede omitir la división y la unión:
import org.apache.spark.sql.Row
val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
case Row(key: String, json: String) =>
s"""{"key": "$key", "jsonData": $json}"""
})
val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema
// root
// |-- jsonData: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: double (nullable = true)
// |-- key: long (nullable = true)
// |-- blobData: string (nullable = true)
Un enfoque alternativo (más barato, aunque más complejo) es usar un UDF para analizar JSON y generar una
struct
o columna de
map
.
Por ejemplo algo como esto:
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show
// +---+--------------------+------------------+----------+
// |key| jsonData| blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]|
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]|
// +---+--------------------+------------------+----------+
parsed.printSchema
// root
// |-- key: string (nullable = true)
// |-- jsonData: string (nullable = true)
// |-- blobData: string (nullable = true)
// |-- parsedJSON: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: integer (nullable = false)