elasticsearch - español - que es spark
Guardar Spark Dataframe en Elasticsearch: no se puede manejar la excepción de tipo (1)
He diseñado un trabajo simple para leer datos de MySQL y guardarlos en Elasticsearch con Spark.
Aquí está el código:
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
Puedes ver que el código es muy sencillo.
Lee los datos en un DataFrame, selecciona algunas columnas y luego realiza un
count
como una acción básica en el Dataframe.
Todo funciona bien hasta este punto.
Luego intenta guardar los datos en Elasticsearch, pero falla porque no puede manejar algún tipo. Puede ver el registro de errores here .
No estoy seguro de por qué no puede manejar ese tipo. ¿Alguien sabe por qué ocurre esto?
Estoy usando Apache Spark 1.5.0, Elasticsearch 1.4.4 y elaticsearch-hadoop 2.1.1
EDITAR:
- He actualizado el enlace esencial con un conjunto de datos de muestra junto con el código fuente.
- También he intentado usar las compilaciones de desarrollo elasticsearch-hadoop como se menciona en @costin en la lista de correo.
La respuesta a esta pregunta fue complicada, pero gracias a samklr , me las arreglé para averiguar cuál era el problema.
Sin embargo, la solución no es sencilla y podría considerar algunas transformaciones "innecesarias".
Primero hablemos de la serialización .
Hay dos aspectos de la serialización a considerar en la serialización de datos de Spark y la serialización de funciones. En este caso, se trata de la serialización de datos y, por lo tanto, de la deserialización.
Desde la perspectiva de Spark, lo único que se requiere es configurar la serialización: Spark se basa por defecto en la serialización de Java, que es conveniente pero bastante ineficiente.
Esta es la razón por la cual Hadoop introdujo su propio mecanismo de serialización y sus propios tipos, es decir,
Writables
.
Como tal,
InputFormat
y
OutputFormats
son necesarios para devolver
Writables
que, fuera de la caja, Spark no entiende.
Con el conector elasticsearch-spark uno debe habilitar una serialización diferente (Kryo) que maneja la conversión automáticamente y también lo hace de manera bastante eficiente.
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
Incluso dado que Kryo no requiere que una clase implemente una interfaz particular para ser serializada, lo que significa que los POJO se pueden usar en RDD sin ningún trabajo adicional más allá de habilitar la serialización de Kryo.
Dicho esto, @samklr me señaló que Kryo necesita registrar clases antes de usarlas.
Esto se debe a que Kryo escribe una referencia a la clase del objeto que se está serializando (se escribe una referencia para cada objeto escrito), que es solo un identificador entero si la clase se ha registrado, pero de lo contrario es el nombre completo de la clase. Spark registra las clases Scala y muchas otras clases de framework (como Avro Generic o Thrift) en su nombre.
Registrar clases con Kryo es sencillo.
Cree una subclase de KryoRegistrator y anule el método
registerClasses()
:
public class MyKryoRegistrator implements KryoRegistrator, Serializable {
@Override
public void registerClasses(Kryo kryo) {
// Product POJO associated to a product Row from the DataFrame
kryo.register(Product.class);
}
}
Finalmente, en su programa de controlador, establezca la propiedad spark.kryo.registrator en el nombre de clase completo de su implementación de KryoRegistrator:
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
En segundo lugar, incluso aunque el serializador Kryo esté configurado y la clase registrada, con los cambios realizados en Spark 1.5, y por alguna razón Elasticsearch no pudo
des serializar
el Dataframe porque no puede inferir el
SchemaType
del Dataframe en el conector.
Entonces tuve que convertir el Dataframe a un JavaRDD
JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
public Product call(Row row) throws Exception {
long id = row.getLong(0);
String title = row.getString(1);
String description = row.getString(2);
int merchantId = row.getInt(3);
double price = row.getDecimal(4).doubleValue();
String keywords = row.getString(5);
long brandId = row.getLong(6);
int categoryId = row.getInt(7);
return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
}
});
Ahora los datos están listos para escribirse en elasticsearch:
JavaEsSpark.saveToEs(products, "test/test");
Referencias
- documentation soporte Apache Spark de Elasticsearch.
- Guía definitiva de Hadoop, Capítulo 19. Spark, ed. 4 - Tom White.
- Usuario samklr .