scala apache-spark dataframe apache-spark-sql

scala - Cómo columnas de "selección negativa" en el marco de datos de spark



apache-spark dataframe (7)

No puedo entenderlo, pero supongo que es simple. Tengo una base de datos de chispa df. Este df tiene columnas "A", "B" y "C". Ahora digamos que tengo una matriz que contiene el nombre de las columnas de este df:

column_names = Array("A","B","C")

Me gustaría hacer un df.select() de tal manera que pueda especificar qué columnas no seleccionar. Ejemplo: digamos que no quiero seleccionar las columnas "B". Lo intenté

df.select(column_names.filter(_!="B"))

pero esto no funciona, como

org.apache.spark.sql.DataFrame no se puede aplicar a (Array [String])

Entonces, here dice que debería funcionar con un Seq en su lugar. Sin embargo, intentando

df.select(column_names.filter(_!="B").toSeq)

resultados en

org.apache.spark.sql.DataFrame no se puede aplicar a (Seq [String]).

¿Qué estoy haciendo mal?


// selectWithout le permite especificar qué columnas omitir:

df.selectWithout("B")


En pyspark puedes hacer

df.select(list(set(df.columns) - set(["B"])))

Usando más de una línea también puedes hacer

cols = df.columns cols.remove("B") df.select(cols)


Está bien, es feo, pero esta rápida sesión de shell de chispa muestra algo que funciona:

scala> val myRDD = sc.parallelize(List.range(1,10)) myRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:21 scala> val myDF = myRDD.toDF("a") myDF: org.apache.spark.sql.DataFrame = [a: int] scala> val myOtherRDD = sc.parallelize(List.range(1,10)) myOtherRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:21 scala> val myotherDF = myRDD.toDF("b") myotherDF: org.apache.spark.sql.DataFrame = [b: int] scala> myDF.unionAll(myotherDF) res2: org.apache.spark.sql.DataFrame = [a: int] scala> myDF.join(myotherDF) res3: org.apache.spark.sql.DataFrame = [a: int, b: int] scala> val twocol = myDF.join(myotherDF) twocol: org.apache.spark.sql.DataFrame = [a: int, b: int] scala> val cols = Array("a", "b") cols: Array[String] = Array(a, b) scala> val selectedCols = cols.filter(_!="b") selectedCols: Array[String] = Array(a) scala> twocol.select(selectedCols.head, selectedCols.tail: _*) res4: org.apache.spark.sql.DataFrame = [a: int]

Las disposiciones de varargs para una función que requieren uno se tratan en other preguntas de SO . La firma de select está ahí para garantizar que su lista de columnas seleccionadas no esté vacía, lo que hace que la conversión de la lista de columnas seleccionadas a varargs un poco más compleja.




Desde Spark 1.4 puedes usar el método drop :

Scala :

case class Point(x: Int, y: Int) val df = sqlContext.createDataFrame(Point(0, 0) :: Point(1, 2) :: Nil) df.drop("y")

Python :

df = sc.parallelize([(0, 0), (1, 2)]).toDF(["x", "y"]) df.drop("y") ## DataFrame[x: bigint]


val columns = Seq("A","B","C") df.select(columns.diff(Seq("B")))