scala - Cómo columnas de "selección negativa" en el marco de datos de spark
apache-spark dataframe (7)
No puedo entenderlo, pero supongo que es simple. Tengo una base de datos de chispa df. Este df tiene columnas "A", "B" y "C". Ahora digamos que tengo una matriz que contiene el nombre de las columnas de este df:
column_names = Array("A","B","C")
Me gustaría hacer un df.select()
de tal manera que pueda especificar qué columnas no seleccionar. Ejemplo: digamos que no quiero seleccionar las columnas "B". Lo intenté
df.select(column_names.filter(_!="B"))
pero esto no funciona, como
org.apache.spark.sql.DataFrame no se puede aplicar a (Array [String])
Entonces, here dice que debería funcionar con un Seq en su lugar. Sin embargo, intentando
df.select(column_names.filter(_!="B").toSeq)
resultados en
org.apache.spark.sql.DataFrame no se puede aplicar a (Seq [String]).
¿Qué estoy haciendo mal?
// selectWithout le permite especificar qué columnas omitir:
df.selectWithout("B")
En pyspark puedes hacer
df.select(list(set(df.columns) - set(["B"])))
Usando más de una línea también puedes hacer
cols = df.columns
cols.remove("B")
df.select(cols)
Está bien, es feo, pero esta rápida sesión de shell de chispa muestra algo que funciona:
scala> val myRDD = sc.parallelize(List.range(1,10))
myRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:21
scala> val myDF = myRDD.toDF("a")
myDF: org.apache.spark.sql.DataFrame = [a: int]
scala> val myOtherRDD = sc.parallelize(List.range(1,10))
myOtherRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:21
scala> val myotherDF = myRDD.toDF("b")
myotherDF: org.apache.spark.sql.DataFrame = [b: int]
scala> myDF.unionAll(myotherDF)
res2: org.apache.spark.sql.DataFrame = [a: int]
scala> myDF.join(myotherDF)
res3: org.apache.spark.sql.DataFrame = [a: int, b: int]
scala> val twocol = myDF.join(myotherDF)
twocol: org.apache.spark.sql.DataFrame = [a: int, b: int]
scala> val cols = Array("a", "b")
cols: Array[String] = Array(a, b)
scala> val selectedCols = cols.filter(_!="b")
selectedCols: Array[String] = Array(a)
scala> twocol.select(selectedCols.head, selectedCols.tail: _*)
res4: org.apache.spark.sql.DataFrame = [a: int]
Las disposiciones de varargs para una función que requieren uno se tratan en other preguntas de SO . La firma de select está ahí para garantizar que su lista de columnas seleccionadas no esté vacía, lo que hace que la conversión de la lista de columnas seleccionadas a varargs un poco más compleja.
Se podrá hacer a través de la especificación de columna REGEX [SPARK-12139] para consultas de Hive
Tuve el mismo problema y lo resolví de esta manera (oaffdf es un marco de datos):
val dropColNames = Seq("col7","col121")
val featColNames = oaffdf.columns.diff(dropColNames)
val featCols = featColNames.map(cn => org.apache.spark.sql.functions.col(cn))
val featsdf = oaffdf.select(featCols: _*)
https://forums.databricks.com/questions/2808/select-dataframe-columns-from-a-sequence-of-string.html
Desde Spark 1.4 puedes usar el método drop
:
Scala :
case class Point(x: Int, y: Int)
val df = sqlContext.createDataFrame(Point(0, 0) :: Point(1, 2) :: Nil)
df.drop("y")
Python :
df = sc.parallelize([(0, 0), (1, 2)]).toDF(["x", "y"])
df.drop("y")
## DataFrame[x: bigint]
val columns = Seq("A","B","C")
df.select(columns.diff(Seq("B")))