scala apache-spark dataframe apache-spark-sql spark-dataframe user-defined-functions

scala - Derivar mĂșltiples columnas de una sola columna en un Spark DataFrame



apache-spark apache-spark-sql (5)

En general, lo que quieres no es directamente posible. UDF puede devolver solo una columna a la vez. Hay dos formas diferentes de superar esta limitación:

  1. Devuelve una columna de tipo complejo. La solución más general es un StructType pero también puede considerar ArrayType o MapType .

    import org.apache.spark.sql.functions.udf val df = Seq( (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c") ).toDF("x", "y", "z") case class Foobar(foo: Double, bar: Double) val foobarUdf = udf((x: Long, y: Double, z: String) => Foobar(x * y, z.head.toInt * y)) val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z")) df1.show // +---+----+---+------------+ // | x| y| z| foobar| // +---+----+---+------------+ // | 1| 3.0| a| [3.0,291.0]| // | 2|-1.0| b|[-2.0,-98.0]| // | 3| 0.0| c| [0.0,0.0]| // +---+----+---+------------+ df1.printSchema // root // |-- x: long (nullable = false) // |-- y: double (nullable = false) // |-- z: string (nullable = true) // |-- foobar: struct (nullable = true) // | |-- foo: double (nullable = false) // | |-- bar: double (nullable = false)

    Esto se puede aplanar fácilmente más tarde, pero generalmente no hay necesidad de eso.

  2. Cambie a RDD, remodele y reconstruya DF:

    import org.apache.spark.sql.types._ import org.apache.spark.sql.Row def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = Seq(x * y, z.head.toInt * y) val schema = StructType(df.schema.fields ++ Array(StructField("foo", DoubleType), StructField("bar", DoubleType))) val rows = df.rdd.map(r => Row.fromSeq( r.toSeq ++ foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z")))) val df2 = sqlContext.createDataFrame(rows, schema) df2.show // +---+----+---+----+-----+ // | x| y| z| foo| bar| // +---+----+---+----+-----+ // | 1| 3.0| a| 3.0|291.0| // | 2|-1.0| b|-2.0|-98.0| // | 3| 0.0| c| 0.0| 0.0| // +---+----+---+----+-----+

Tengo un DF con una gran cantidad de metadatos analizables como una sola columna de cadena en un Dataframe, llamémoslo DFA, con ColmnA.

Me gustaría dividir esta columna, ColmnA en varias columnas a través de una función, ClassXYZ = Func1 (ColmnA). Esta función devuelve una clase ClassXYZ, con múltiples variables, y cada una de estas variables ahora debe asignarse a una nueva Columna, como ColmnA1, ColmnA2, etc.

¿Cómo haría una transformación de 1 Dataframe a otra con estas columnas adicionales al llamar a este Func1 solo una vez, y no tener que repetirlo para crear todas las columnas?

Es fácil de resolver si tuviera que llamar a esta gran función cada vez para agregar una nueva columna, pero eso es lo que deseo evitar.

Por favor avise con un código de trabajo o seudocódigo.

Gracias

Sanjay


Esto se puede lograr fácilmente utilizando la función pivote

df4.groupBy("year").pivot("course").sum("earnings").collect()


Opté por crear una función para aplanar una columna y luego simplemente llamarla simultáneamente con el udf.

Primero defina esto:

implicit class DfOperations(df: DataFrame) { def flattenColumn(col: String) = { def addColumns(df: DataFrame, cols: Array[String]): DataFrame = { if (cols.isEmpty) df else addColumns( df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)), cols.tail ) } val field = df.select(col).schema.fields(0) val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name) addColumns(df, newCols).drop(col) } def withColumnMany(colName: String, col: Column) = { df.withColumn(colName, col).flattenColumn(colName) } }

Entonces el uso es muy simple:

case class MyClass(a: Int, b: Int) val df = sc.parallelize(Seq( (0), (1) )).toDF("x") val f = udf((x: Int) => MyClass(x*2,x*3)) df.withColumnMany("test", f($"x")).show() // +---+------+------+ // | x|test_a|test_b| // +---+------+------+ // | 0| 0| 0| // | 1| 2| 3| // +---+------+------+


Si las columnas resultantes tendrán la misma longitud que la original, puede crear columnas nuevas con la función withColumn y aplicando un udf. Después de esto, puede soltar su columna original, por ejemplo:

val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn"))) .withColumn("newCol2", myFun2(myDf("originalColumn")) .drop(myDf("originalColumn"))

donde myFun es un udf definido así:

def myFun= udf( (originalColumnContent : String) => { // do something with your original column content and return a new one } )


Suponga que después de su función habrá una secuencia de elementos, dando un ejemplo como el siguiente:

val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age") df.show +------------------+---+ | infoComb|age| +------------------+---+ |Mike,1986,Toronto| 30| | Andre,1980,Ottawa| 36| | jill,1989,London| 27| +------------------+---+

ahora lo que puedes hacer con esta infoComb es que puedes comenzar a dividir la cadena y obtener más columnas con:

df.select(expr("(split(infoComb, '',''))[0]").cast("string").as("name"), expr("(split(infoComb, '',''))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, '',''))[2]").cast("string").as("city"), $"age").show +-----+----------+-------+---+ | name|yearOfBorn| city|age| +-----+----------+-------+---+ |Mike| 1986|Toronto| 30| |Andre| 1980| Ottawa| 36| | jill| 1989| London| 27| +-----+----------+-------+---+

Espero que esto ayude.