tutorial spark que español elasticsearch apache-spark elasticsearch-hadoop apache-spark-1.5

elasticsearch - español - que es spark



Guardar Spark Dataframe en Elasticsearch: no se puede manejar la excepción de tipo (1)

He diseñado un trabajo simple para leer datos de MySQL y guardarlos en Elasticsearch con Spark.

Aquí está el código:

JavaSparkContext sc = new JavaSparkContext( new SparkConf().setAppName("MySQLtoEs") .set("es.index.auto.create", "true") .set("es.nodes", "127.0.0.1:9200") .set("es.mapping.id", "id") .set("spark.serializer", KryoSerializer.class.getName())); SQLContext sqlContext = new SQLContext(sc); // Data source options Map<String, String> options = new HashMap<>(); options.put("driver", MYSQL_DRIVER); options.put("url", MYSQL_CONNECTION_URL); options.put("dbtable", "OFFERS"); options.put("partitionColumn", "id"); options.put("lowerBound", "10001"); options.put("upperBound", "499999"); options.put("numPartitions", "10"); // Load MySQL query result as DataFrame LOGGER.info("Loading DataFrame"); DataFrame jdbcDF = sqlContext.load("jdbc", options); DataFrame df = jdbcDF.select("id", "title", "description", "merchantId", "price", "keywords", "brandId", "categoryId"); df.show(); LOGGER.info("df.count : " + df.count()); EsSparkSQL.saveToEs(df, "offers/product");

Puedes ver que el código es muy sencillo. Lee los datos en un DataFrame, selecciona algunas columnas y luego realiza un count como una acción básica en el Dataframe. Todo funciona bien hasta este punto.

Luego intenta guardar los datos en Elasticsearch, pero falla porque no puede manejar algún tipo. Puede ver el registro de errores here .

No estoy seguro de por qué no puede manejar ese tipo. ¿Alguien sabe por qué ocurre esto?

Estoy usando Apache Spark 1.5.0, Elasticsearch 1.4.4 y elaticsearch-hadoop 2.1.1

EDITAR:

  • He actualizado el enlace esencial con un conjunto de datos de muestra junto con el código fuente.
  • También he intentado usar las compilaciones de desarrollo elasticsearch-hadoop como se menciona en @costin en la lista de correo.

La respuesta a esta pregunta fue complicada, pero gracias a samklr , me las arreglé para averiguar cuál era el problema.

Sin embargo, la solución no es sencilla y podría considerar algunas transformaciones "innecesarias".

Primero hablemos de la serialización .

Hay dos aspectos de la serialización a considerar en la serialización de datos de Spark y la serialización de funciones. En este caso, se trata de la serialización de datos y, por lo tanto, de la deserialización.

Desde la perspectiva de Spark, lo único que se requiere es configurar la serialización: Spark se basa por defecto en la serialización de Java, que es conveniente pero bastante ineficiente. Esta es la razón por la cual Hadoop introdujo su propio mecanismo de serialización y sus propios tipos, es decir, Writables . Como tal, InputFormat y OutputFormats son necesarios para devolver Writables que, fuera de la caja, Spark no entiende.

Con el conector elasticsearch-spark uno debe habilitar una serialización diferente (Kryo) que maneja la conversión automáticamente y también lo hace de manera bastante eficiente.

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

Incluso dado que Kryo no requiere que una clase implemente una interfaz particular para ser serializada, lo que significa que los POJO se pueden usar en RDD sin ningún trabajo adicional más allá de habilitar la serialización de Kryo.

Dicho esto, @samklr me señaló que Kryo necesita registrar clases antes de usarlas.

Esto se debe a que Kryo escribe una referencia a la clase del objeto que se está serializando (se escribe una referencia para cada objeto escrito), que es solo un identificador entero si la clase se ha registrado, pero de lo contrario es el nombre completo de la clase. Spark registra las clases Scala y muchas otras clases de framework (como Avro Generic o Thrift) en su nombre.

Registrar clases con Kryo es sencillo. Cree una subclase de KryoRegistrator y anule el método registerClasses() :

public class MyKryoRegistrator implements KryoRegistrator, Serializable { @Override public void registerClasses(Kryo kryo) { // Product POJO associated to a product Row from the DataFrame kryo.register(Product.class); } }

Finalmente, en su programa de controlador, establezca la propiedad spark.kryo.registrator en el nombre de clase completo de su implementación de KryoRegistrator:

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

En segundo lugar, incluso aunque el serializador Kryo esté configurado y la clase registrada, con los cambios realizados en Spark 1.5, y por alguna razón Elasticsearch no pudo des serializar el Dataframe porque no puede inferir el SchemaType del Dataframe en el conector.

Entonces tuve que convertir el Dataframe a un JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() { public Product call(Row row) throws Exception { long id = row.getLong(0); String title = row.getString(1); String description = row.getString(2); int merchantId = row.getInt(3); double price = row.getDecimal(4).doubleValue(); String keywords = row.getString(5); long brandId = row.getLong(6); int categoryId = row.getInt(7); return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId); } });

Ahora los datos están listos para escribirse en elasticsearch:

JavaEsSpark.saveToEs(products, "test/test");

Referencias

  • documentation soporte Apache Spark de Elasticsearch.
  • Guía definitiva de Hadoop, Capítulo 19. Spark, ed. 4 - Tom White.
  • Usuario samklr .