scala - read - Chispa: Escribir en un archivo avro
spark avro github (3)
Estoy en Spark, tengo un RDD de un archivo Avro. Ahora quiero hacer algunas transformaciones en ese RDD y guardarlo como un archivo Avro:
val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))
rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
.saveAsNewAPIHadoopFile(outputPath,
classOf[AvroKey[GenericRecord]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[GenericRecord]],
job.getConfiguration)
Al ejecutar este Spark, se queja de que Schema $ recordSchema no es serializable.
Si elimino el comentario de la llamada .map (y solo tengo rdd.saveAsNewAPIHadoopFile), la llamada se realiza correctamente.
¿Qué estoy haciendo mal aquí?
¿Alguna idea?
Con dataframe es muy sencillo crear avro usando la biblioteca databrics.
dataframe.write.format ("com.databricks.spark.avro"). avro ($ hdfs_path)
En su caso, la entrada es avro por lo que tendrá un esquema asociado para que pueda leer directamente avro en el marco de datos y después de su transformación, puede escribir en avro utilizando el código anterior.
Para leer avro en dataframe:
Spark 1.6
val dataframe = sqlContext.read.avro ($ hdfs_path) O val dataframe = sqlContext.read.format ("com.databricks.spark.avro"). load ($ hdfs_path)
Chispa 2.1
val dataframe = sparkSession.read.avro ($ hdfs_path) O val dataframe = sparkSession.read.format ("com.databricks.spark.avro"). load ($ hdfs_path)
El problema aquí está relacionado con la no serializabilidad de la clase avro.Schema utilizada en el Trabajo. La excepción se produce cuando intenta hacer referencia al objeto de esquema desde el código dentro de la función de mapa.
Por ejemplo, si intenta hacer lo siguiente, obtendrá la excepción "Tarea no serializable" :
val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
// reference to the schema object declared outside
val record = new GenericData.Record(schema)
})
Puede hacer que todo funcione simplemente creando una nueva instancia del esquema dentro del bloque de funciones:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it''s for other purposes
...
rdd.map(t => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
val record = new GenericData.Record(innserSchema)
...
})
Ya que no le gustaría analizar el esquema avro para cada registro que maneje, una mejor solución será analizar el esquema en el nivel de partición. Lo siguiente también funciona:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it''s for other purposes
...
rdd.mapPartitions(tuples => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
tuples.map(t => {
val record = new GenericData.Record(innserSchema)
...
// this closure will be bundled together with the outer one
// (no serialization issues)
})
})
El código anterior funciona siempre que proporcione una referencia portátil al archivo jsonSchema, ya que la función de mapa será ejecutada por varios ejecutores remotos. Puede ser una referencia a un archivo en HDFS o se puede empaquetar junto con la aplicación en el JAR (utilizará las funciones del cargador de clases para obtener su contenido en este último caso).
Para aquellos que están tratando de usar Avro con Spark, note que todavía hay algunos problemas de compilación no resueltos y tiene que usar la siguiente importación en Maven POM:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
<classifier>hadoop2</classifier>
<dependency>
Tenga en cuenta el clasificador "hadoop2"
. Puede seguir el problema en https://issues.apache.org/jira/browse/SPARK-3039 .
El serializador predeterminado utilizado por Spark es la serialización de Java. Entonces, para todos los tipos de Java, intentará serializar usando la serialización de Java. AvroKey no es serializable, por lo que está recibiendo errores.
Puede usar KryoSerializer, o plugin en su serialización personalizada (como Avro). Puedes leer más sobre la serialización aquí. http://spark-project.org/docs/latest/tuning.html
También puedes envolver tu objeto con algo que sea externalizable. Echa un vistazo, por ejemplo, al SparkFlumeEvent que contiene AvroFlumeEvent aquí: https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala