java - Codificador para el tipo de fila Spark Datasets
apache-spark apache-spark-sql (2)
Me gustaría escribir un codificador para un tipo de Row en DataSet, para una operación de mapa que estoy haciendo. Esencialmente, no entiendo cómo escribir codificadores.
A continuación se muestra un ejemplo de una operación de mapa:
In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>
Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
@Override
public Iterator<String> call(Row row) throws Exception {
ArrayList<String> obj = //some map operation
return obj.iterator();
}
},Encoders.STRING());
Entiendo que en lugar de una cadena, el codificador debe escribirse de la siguiente manera:
Encoder<Row> encoder = new Encoder<Row>() {
@Override
public StructType schema() {
return join.schema();
//return null;
}
@Override
public ClassTag<Row> clsTag() {
return null;
}
};
Sin embargo, no entiendo el clsTag () en el codificador, y estoy tratando de encontrar un ejemplo en ejecución que pueda demostrar algo similar (es decir, un codificador para un tipo de fila)
Editar - Esta no es una copia de la pregunta mencionada: error del codificador al intentar asignar la fila del marco de datos a la fila actualizada, ya que la respuesta habla sobre el uso de Spark 1.x en Spark 2.x (no lo estoy haciendo), también estoy mirando para un codificador para una clase Fila en lugar de resolver un error. Finalmente, estaba buscando una solución en Java, no en Scala.
La respuesta es usar un RowEncoder y el esquema del conjunto de datos usando StructType .
A continuación se muestra un ejemplo práctico de una operación de mapa plano con conjuntos de datos:
StructType structType = new StructType();
structType = structType.add("id1", DataTypes.LongType, false);
structType = structType.add("id2", DataTypes.LongType, false);
ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {
@Override
public Iterator<Row> call(Row row) throws Exception {
// a static map operation to demonstrate
List<Object> data = new ArrayList<>();
data.add(1l);
data.add(2l);
ArrayList<Row> list = new ArrayList<>();
list.add(RowFactory.create(data.toArray()));
return list.iterator();
}
}, encoder);
Tuve el mismo problema ... Encoders.kryo(Row.class))
trabajó para mí.
Como beneficio adicional, los documentos de ajuste de Apache Spark se refieren a Kryo it ya que es más rápido en la serialización "a menudo hasta 10x":