GroupByKey con datasets en Spark 2.0 usando Java
apache-spark group-by (3)
Tengo un conjunto de datos que contiene datos como los siguientes:
|c1| c2|
---------
| 1 | a |
| 1 | b |
| 1 | c |
| 2 | a |
| 2 | b |
...
Ahora, quiero agrupar los datos de la siguiente manera (col1: String Key, col2: List) :
| c1| c2 |
-----------
| 1 |a,b,c|
| 2 | a, b|
...
Pensé que usar goupByKey sería una solución suficiente, pero no puedo encontrar ningún ejemplo de cómo usarlo.
¿Alguien puede ayudarme a encontrar una solución usando groupByKey o usando cualquier otra combinación de transformaciones y acciones para obtener esta salida utilizando datasets, no RDD?
Con un DataFrame en Spark 2.0:
scala> val data = List((1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b")).toDF("c1", "c2")
data: org.apache.spark.sql.DataFrame = [c1: int, c2: string]
scala> data.groupBy("c1").agg(collect_list("c2")).collect.foreach(println)
[1,WrappedArray(a, b, c)]
[2,WrappedArray(a, b)]
Aquí está Spark 2.0 y el ejemplo de Java con Dataset.
public class SparkSample {
public static void main(String[] args) {
//SparkSession
SparkSession spark = SparkSession
.builder()
.appName("SparkSample")
.config("spark.sql.warehouse.dir", "/file:C:/temp")
.master("local")
.getOrCreate();
//input data
List<Tuple2<Integer,String>> inputList = new ArrayList<Tuple2<Integer,String>>();
inputList.add(new Tuple2<Integer,String>(1, "a"));
inputList.add(new Tuple2<Integer,String>(1, "b"));
inputList.add(new Tuple2<Integer,String>(1, "c"));
inputList.add(new Tuple2<Integer,String>(2, "a"));
inputList.add(new Tuple2<Integer,String>(2, "b"));
//dataset
Dataset<Row> dataSet = spark.createDataset(inputList, Encoders.tuple(Encoders.INT(), Encoders.STRING())).toDF("c1","c2");
dataSet.show();
//groupBy and aggregate
Dataset<Row> dataSet1 = dataSet.groupBy("c1").agg(org.apache.spark.sql.functions.collect_list("c2")).toDF("c1","c2");
dataSet1.show();
//stop
spark.stop();
}
}
Esto leerá la tabla en la variable del conjunto de datos
Dataset<Row> datasetNew = dataset.groupBy("c1").agg(functions.collect_list("c2"));
datasetNew.show()