apache spark - Reemplazo SPARK SQL para la función agregada mysql GROUP_CONCAT
apache-spark aggregate-functions (6)
Tengo una tabla de dos columnas de tipo cadena (nombre de usuario, amigo) y para cada nombre de usuario, quiero recopilar todos sus amigos en una fila, concatenados como cadenas (''nombre de usuario1'', ''amigos1, amigos2, amigos3''). Sé que MySql hace esto por GROUP_CONCAT, ¿hay alguna forma de hacer esto con SPARK SQL?
Gracias
Antes de continuar: Esta operación es otro grupo más por
groupByKey
.
Si bien tiene múltiples aplicaciones legítimas, es relativamente costoso, así que asegúrese de usarlo solo cuando sea necesario.
No es una solución exactamente concisa o eficiente, pero puede usar la función
UserDefinedAggregateFunction
introducida en Spark 1.5.0:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
Ejemplo de uso:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
También puede crear un contenedor de Python como se muestra en Spark: ¿Cómo mapear Python con Scala o las funciones definidas por el usuario de Java?
En la práctica, puede ser más rápido extraer RDD,
groupByKey
,
mkString
y reconstruir DataFrame.
Puede obtener un efecto similar combinando la función
collect_list
(Spark> = 1.6.0) con
concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
Aquí hay una función que puede usar en PySpark:
import pyspark.sql.functions as F
def group_concat(col, distinct=False, sep='',''):
if distinct:
collect = F.collect_set(col.cast(StringType()))
else:
collect = F.collect_list(col.cast(StringType()))
return F.concat_ws(sep, collect)
table.groupby(''username'').agg(F.group_concat(''friends'').alias(''friends''))
En SQL:
select username, concat_ws('','', collect_list(friends)) as friends
from table
group by username
Debajo del código basado en python que logra la funcionalidad group_concat.
Datos de entrada:
Cust_No, Cust_Cars
1, Toyota
2, BMW
1, Audi
2, Hyundai
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
spark = SparkSession.builder.master(''yarn'').getOrCreate()
# Udf to join all list elements with "|"
def combine_cars(car_list,sep=''|''):
collect = sep.join(car_list)
return collect
test_udf = udf(combine_cars,StringType())
car_list_per_customer.groupBy("Cust_No").agg(F.collect_list("Cust_Cars").alias("car_list")).select("Cust_No",test_udf("car_list").alias("Final_List")).show(20,False)
Datos de salida: Cust_No, Final_List
1, Toyota | Audi
2, BMW | Hyundai
Puedes probar la función collect_list
sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A
O puede registrar un UDF como
sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))
y puedes usar esta función en la consulta
sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")
Una forma de hacerlo con pyspark <1.6, que desafortunadamente no admite la función agregada definida por el usuario:
byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)
y si quieres convertirlo nuevamente en un marco de datos:
sqlContext.createDataFrame(byUsername, ["username", "friends"])
A partir de 1.6, puede usar collect_list y luego unirse a la lista creada:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
join_ = F.udf(lambda x: ", ".join(x), StringType())
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))
Idioma : versión de Scala Spark : 1.5.2
Tuve el mismo problema y también traté de resolverlo usando
udfs
pero, desafortunadamente, esto ha provocado más problemas más adelante en el código debido a inconsistencias de tipo.
Pude solucionar este problema al convertir primero el
DF
a un
RDD
luego
agrupar
y manipular los datos de la manera deseada y luego convertir el
RDD
a un
DF
siguiente manera:
val df = sc
.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")))
.toDF("username", "friend")
+---------+-------+
| username| friend|
+---------+-------+
|username1|friend1|
|username1|friend2|
|username2|friend1|
|username2|friend3|
+---------+-------+
val dfGRPD = df.map(Row => (Row(0), Row(1)))
.groupByKey()
.map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))}
.toDF("username", "groupOfFriends")
+---------+---------------+
| username| groupOfFriends|
+---------+---------------+
|username1|friend2,friend1|
|username2|friend3,friend1|
+---------+---------------+