scala apache-spark apache-kafka apache-spark-sql spark-structured-streaming

scala - ¿Cómo leer registros en formato JSON de Kafka usando Structured Streaming?



apache-spark apache-kafka (1)

Desde la perspectiva de Spark, el value es solo una secuencia de bytes. No tiene conocimiento sobre el formato o contenido de serialización. Para poder extraer el archivo, primero debe analizarlo.

Si los datos se serializan como una cadena JSON, tiene dos opciones. Puede cast value a StringType y usar from_json y proporcionar un esquema:

import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.from_json val schema: StructType = StructType(Seq( StructField("column1", ???), StructField("column2", ???) )) rawKafkaDF.select(from_json($"value".cast(StringType), schema))

o StringType a StringType , extraer campos por ruta usando get_json_object :

import org.apache.spark.sql.functions.get_json_object val columns: Seq[String] = ??? val exprs = columns.map(c => get_json_object($"value", s"$$.$c")) rawKafkaDF.select(exprs: _*)

y luego cast a los tipos deseados.

Estoy tratando de usar el enfoque de transmisión estructurada usando Spark-Streaming basado en DataFrame / Dataset API para cargar una transmisión de datos desde Kafka.

Yo suelo:

  • Spark 2.10
  • Kafka 0.10
  • chispa-sql-kafka-0-10

Spark Kafka DataSource ha definido un esquema subyacente:

|key|value|topic|partition|offset|timestamp|timestampType|

Mis datos vienen en formato json y se almacenan en la columna de valor . Estoy buscando una manera de cómo extraer el esquema subyacente de la columna de valor y actualizar el marco de datos recibido a las columnas almacenadas en el valor . Intenté el siguiente enfoque pero no funciona:

val columns = Array("column1", "column2") // column names val rawKafkaDF = sparkSession.sqlContext.readStream .format("kafka") .option("kafka.bootstrap.servers","localhost:9092") .option("subscribe",topic) .load() val columnsToSelect = columns.map( x => new Column("value." + x)) val kafkaDF = rawKafkaDF.select(columnsToSelect:_*) // some analytics using stream dataframe kafkaDF val query = kafkaDF.writeStream.format("console").start() query.awaitTermination()

Aquí org.apache.spark.sql.AnalysisException: Can''t extract value from value#337; excepción org.apache.spark.sql.AnalysisException: Can''t extract value from value#337; porque en el momento de la creación de la secuencia, no se conocen los valores internos ...

¿Tienes alguna sugerencia?