java apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

java - Codificador para el tipo de fila Spark Datasets



apache-spark apache-spark-sql (2)

Me gustaría escribir un codificador para un tipo de Row en DataSet, para una operación de mapa que estoy haciendo. Esencialmente, no entiendo cómo escribir codificadores.

A continuación se muestra un ejemplo de una operación de mapa:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() { @Override public Iterator<String> call(Row row) throws Exception { ArrayList<String> obj = //some map operation return obj.iterator(); } },Encoders.STRING());

Entiendo que en lugar de una cadena, el codificador debe escribirse de la siguiente manera:

Encoder<Row> encoder = new Encoder<Row>() { @Override public StructType schema() { return join.schema(); //return null; } @Override public ClassTag<Row> clsTag() { return null; } };

Sin embargo, no entiendo el clsTag () en el codificador, y estoy tratando de encontrar un ejemplo en ejecución que pueda demostrar algo similar (es decir, un codificador para un tipo de fila)

Editar - Esta no es una copia de la pregunta mencionada: error del codificador al intentar asignar la fila del marco de datos a la fila actualizada, ya que la respuesta habla sobre el uso de Spark 1.x en Spark 2.x (no lo estoy haciendo), también estoy mirando para un codificador para una clase Fila en lugar de resolver un error. Finalmente, estaba buscando una solución en Java, no en Scala.


La respuesta es usar un RowEncoder y el esquema del conjunto de datos usando StructType .

A continuación se muestra un ejemplo práctico de una operación de mapa plano con conjuntos de datos:

StructType structType = new StructType(); structType = structType.add("id1", DataTypes.LongType, false); structType = structType.add("id2", DataTypes.LongType, false); ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() { @Override public Iterator<Row> call(Row row) throws Exception { // a static map operation to demonstrate List<Object> data = new ArrayList<>(); data.add(1l); data.add(2l); ArrayList<Row> list = new ArrayList<>(); list.add(RowFactory.create(data.toArray())); return list.iterator(); } }, encoder);


Tuve el mismo problema ... Encoders.kryo(Row.class)) trabajó para mí.

Como beneficio adicional, los documentos de ajuste de Apache Spark se refieren a Kryo it ya que es más rápido en la serialización "a menudo hasta 10x":

https://spark.apache.org/docs/latest/tuning.html