spark read files apache-spark spark-streaming avro spark-avro

apache-spark - files - read avro pyspark



Manejo de cambios de esquema al ejecutar la aplicaciĆ³n Spark Streaming (1)

Creo que he resuelto esto. Estoy usando el registro de esquema de Confluent y KafkaAvroDecoder. El código simplificado se ve así:

// Get the latest schema here. This schema will be used inside the // closure below to ensure that all executors are using the same // version for this time slice. val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000) val m = sr.getLatestSchemaMetadata(subject) val schemaId = m.getId val schemaString = m.getSchema val outRdd = rdd.mapPartitions(partitions => { // Note: we cannot use the schema registry from above because this code // will execute on remote machines, requiring the schema registry to be // serialized. We could use a pool of these. val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000) val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry) val parser = new Schema.Parser() val avroSchema = parser.parse(schemaString) val avroRecordConverter = AvroSchemaConverter.createConverterToSQL(avroSchema) partitions.map(input => { // Decode the message using the latest version of the schema. // This will apply Avro''s standard schema evolution rules // (for compatible schemas) to migrate the message to the // latest version of the schema. val record = decoder.fromBytes(messageBytes, avroSchema).asInstanceOf[GenericData.Record] // Convert record into a DataFrame with columns according to the schema avroRecordConverter(record).asInstanceOf[Row] }) }) // Get a Spark StructType representation of the schema to apply // to the DataFrame. val sparkSchema = AvroSchemaConverter.toSqlType( new Schema.Parser().parse(schemaString) ).dataType.asInstanceOf[StructType] sqlContext.createDataFrame(outRdd, sparkSchema)

Estoy buscando construir una aplicación Spark Streaming usando la API DataFrames en Spark 1.6. Antes de llegar demasiado lejos en el agujero del conejo, esperaba que alguien me ayudara a entender cómo trata DataFrames los datos que tienen un esquema diferente.

La idea es que los mensajes fluyan a Kafka con un esquema de Avro. Deberíamos poder evolucionar el esquema de forma compatible hacia atrás sin tener que reiniciar la aplicación de transmisión (la lógica de la aplicación seguirá funcionando).

Parece trivial deserializar nuevas versiones de mensajes utilizando un registro de esquema y el identificador de esquema incrustado en el mensaje usando KafkaUtils para crear una transmisión directa y el AvroKafkaDecoder (de Confluent). Eso me lleva a tener un DStream.

Problema n. ° 1: dentro de ese DStream habrá objetos con diferentes versiones del esquema. Entonces, cuando traduzco cada uno en un objeto Row, debería pasar un esquema de lector que es el más reciente para migrar los datos correctamente, y debo pasar el último esquema a la llamada sqlContext.createDataFrame (rowRdd, schema). Los objetos en DStream son del tipo GenericData.Record, y hasta donde puedo decir, no hay una manera fácil de decir cuál es la versión más reciente. Veo 2 soluciones posibles, una es llamar al registro de esquema para obtener la última versión del esquema en cada microbatch. El otro es modificar el decodificador para adjuntar el identificador de esquema. Podría iterar sobre el rdd para encontrar el ID más alto y obtener el esquema de un caché local.

Esperaba que alguien ya hubiera resuelto esto de forma reutilizable.

Problema / Pregunta n. ° 2: Spark va a tener un ejecutor diferente que saque de Kafka para cada partición. ¿Qué sucede con mi aplicación cuando un ejecutor recibe un esquema diferente "último" que los demás? El DataFrame creado por un ejecutor tendrá un esquema diferente que otro para la misma ventana de tiempo. Realmente no sé si esto es un problema real o no. Tengo problemas para visualizar el flujo de datos y qué tipos de operaciones presentarían problemas. Si es un problema, implicaría que debe haber algún intercambio de datos entre los ejecutores y que suena complicado e ineficiente.

¿Debo preocuparme por esto? Si lo hago, ¿cómo puedo resolver las diferencias de esquema?

Gracias, --Ben