spark read functions example scala apache-spark apache-spark-sql spark-dataframe

scala - read - spark streaming



Spark: Agregar columna al marco de datos condicionalmente (3)

¿Qué tal algo como esto?

val newDF = df.filter($"B" === "").take(1) match { case Array() => df case _ => df.withColumn("D", $"B" === "") }

El uso de take(1) debería tener un impacto mínimo

Estoy tratando de tomar mis datos de entrada:

A B C -------------- 4 blah 2 2 3 56 foo 3

Y agregue una columna al final según si B está vacío o no:

A B C D -------------------- 4 blah 2 1 2 3 0 56 foo 3 1

Puedo hacer esto fácilmente registrando el marco de datos de entrada como una tabla temporal y luego escribiendo una consulta SQL.

Pero realmente me gustaría saber cómo hacer esto con solo los métodos Scala y no tener que escribir una consulta SQL dentro de Scala.

He intentado .withColumn , pero no puedo lograr que haga lo que quiero.


Lo malo es que me había perdido una parte de la pregunta.

La mejor y más limpia forma es usar un UDF . Explicación dentro del código.

// create some example data...BY DataFrame // note, third record has an empty string case class Stuff(a:String,b:Int) val d= sc.parallelize(Seq( ("a",1),("b",2), ("",3) ,("d",4)).map { x => Stuff(x._1,x._2) }).toDF // now the good stuff. import org.apache.spark.sql.functions.udf // function that returns 0 is string empty val func = udf( (s:String) => if(s.isEmpty) 0 else 1 ) // create new dataframe with added column named "notempty" val r = d.select( $"a", $"b", func($"a").as("notempty") ) scala> r.show +---+---+--------+ | a| b|notempty| +---+---+--------+ | a| 1| 1111| | b| 2| 1111| | | 3| 0| | d| 4| 1111| +---+---+--------+


Pruebe con la withColumn con la función when indica a continuación:

val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // for `toDF` and $"" import org.apache.spark.sql.functions._ // for `when` val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5))) .toDF("A", "B", "C") val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))

newDf.show() muestra

+---+----+---+---+ | A| B| C| D| +---+----+---+---+ | 4|blah| 2| 1| | 2| | 3| 0| | 56| foo| 3| 1| |100|null| 5| 0| +---+----+---+---+

isNull la fila (100, null, 5) para probar el caso isNull .

Probé este código con Spark 1.6.0 pero como se comentó en el código de when , funciona en las versiones posteriores a 1.4.0 .