apache-spark aggregate-functions apache-spark-sql

apache spark - Reemplazo SPARK SQL para la función agregada mysql GROUP_CONCAT



apache-spark aggregate-functions (6)

Tengo una tabla de dos columnas de tipo cadena (nombre de usuario, amigo) y para cada nombre de usuario, quiero recopilar todos sus amigos en una fila, concatenados como cadenas (''nombre de usuario1'', ''amigos1, amigos2, amigos3''). Sé que MySql hace esto por GROUP_CONCAT, ¿hay alguna forma de hacer esto con SPARK SQL?

Gracias


Antes de continuar: Esta operación es otro grupo más por groupByKey . Si bien tiene múltiples aplicaciones legítimas, es relativamente costoso, así que asegúrese de usarlo solo cuando sea necesario.

No es una solución exactamente concisa o eficiente, pero puede usar la función UserDefinedAggregateFunction introducida en Spark 1.5.0:

object GroupConcat extends UserDefinedAggregateFunction { def inputSchema = new StructType().add("x", StringType) def bufferSchema = new StructType().add("buff", ArrayType(StringType)) def dataType = StringType def deterministic = true def initialize(buffer: MutableAggregationBuffer) = { buffer.update(0, ArrayBuffer.empty[String]) } def update(buffer: MutableAggregationBuffer, input: Row) = { if (!input.isNullAt(0)) buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0)) } def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)) } def evaluate(buffer: Row) = UTF8String.fromString( buffer.getSeq[String](0).mkString(",")) }

Ejemplo de uso:

val df = sc.parallelize(Seq( ("username1", "friend1"), ("username1", "friend2"), ("username2", "friend1"), ("username2", "friend3") )).toDF("username", "friend") df.groupBy($"username").agg(GroupConcat($"friend")).show ## +---------+---------------+ ## | username| friends| ## +---------+---------------+ ## |username1|friend1,friend2| ## |username2|friend1,friend3| ## +---------+---------------+

También puede crear un contenedor de Python como se muestra en Spark: ¿Cómo mapear Python con Scala o las funciones definidas por el usuario de Java?

En la práctica, puede ser más rápido extraer RDD, groupByKey , mkString y reconstruir DataFrame.

Puede obtener un efecto similar combinando la función collect_list (Spark> = 1.6.0) con concat_ws :

import org.apache.spark.sql.functions.{collect_list, udf, lit} df.groupBy($"username") .agg(concat_ws(",", collect_list($"friend")).alias("friends"))


Aquí hay una función que puede usar en PySpark:

import pyspark.sql.functions as F def group_concat(col, distinct=False, sep='',''): if distinct: collect = F.collect_set(col.cast(StringType())) else: collect = F.collect_list(col.cast(StringType())) return F.concat_ws(sep, collect) table.groupby(''username'').agg(F.group_concat(''friends'').alias(''friends''))

En SQL:

select username, concat_ws('','', collect_list(friends)) as friends from table group by username


Debajo del código basado en python que logra la funcionalidad group_concat.

Datos de entrada:

Cust_No, Cust_Cars

1, Toyota

2, BMW

1, Audi

2, Hyundai

from pyspark.sql import SparkSession from pyspark.sql.types import StringType from pyspark.sql.functions import udf import pyspark.sql.functions as F spark = SparkSession.builder.master(''yarn'').getOrCreate() # Udf to join all list elements with "|" def combine_cars(car_list,sep=''|''): collect = sep.join(car_list) return collect test_udf = udf(combine_cars,StringType()) car_list_per_customer.groupBy("Cust_No").agg(F.collect_list("Cust_Cars").alias("car_list")).select("Cust_No",test_udf("car_list").alias("Final_List")).show(20,False)

Datos de salida: Cust_No, Final_List

1, Toyota | Audi

2, BMW | Hyundai


Puedes probar la función collect_list

sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A

O puede registrar un UDF como

sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))

y puedes usar esta función en la consulta

sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")


Una forma de hacerlo con pyspark <1.6, que desafortunadamente no admite la función agregada definida por el usuario:

byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)

y si quieres convertirlo nuevamente en un marco de datos:

sqlContext.createDataFrame(byUsername, ["username", "friends"])

A partir de 1.6, puede usar collect_list y luego unirse a la lista creada:

from pyspark.sql import functions as F from pyspark.sql.types import StringType join_ = F.udf(lambda x: ", ".join(x), StringType()) df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))


Idioma : versión de Scala Spark : 1.5.2

Tuve el mismo problema y también traté de resolverlo usando udfs pero, desafortunadamente, esto ha provocado más problemas más adelante en el código debido a inconsistencias de tipo. Pude solucionar este problema al convertir primero el DF a un RDD luego agrupar y manipular los datos de la manera deseada y luego convertir el RDD a un DF siguiente manera:

val df = sc .parallelize(Seq( ("username1", "friend1"), ("username1", "friend2"), ("username2", "friend1"), ("username2", "friend3"))) .toDF("username", "friend") +---------+-------+ | username| friend| +---------+-------+ |username1|friend1| |username1|friend2| |username2|friend1| |username2|friend3| +---------+-------+ val dfGRPD = df.map(Row => (Row(0), Row(1))) .groupByKey() .map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))} .toDF("username", "groupOfFriends") +---------+---------------+ | username| groupOfFriends| +---------+---------------+ |username1|friend2,friend1| |username2|friend3,friend1| +---------+---------------+