tutorial spark software learn for data big scala apache-spark dataframe apache-spark-sql

software - spark scala tutorial



¿Cómo detecto si un Spark DataFrame tiene una columna? (8)

En realidad, ni siquiera necesita llamar a select para usar columnas, solo puede llamarlo en el marco de datos en sí

// define test data case class Test(a: Int, b: Int) val testList = List(Test(1,2), Test(3,4)) val testDF = sqlContext.createDataFrame(testList) // define the hasColumn function def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName) // then you can just use it on the DF with a given column name hasColumn(testDF, "a") // <-- true hasColumn(testDF, "c") // <-- false

Alternativamente, puede definir una clase implícita utilizando el patrón pimp my library para que el método hasColumn esté disponible en sus marcos de datos directamente

implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) { def hasColumn(colName: String) = df.columns.contains(colName) }

Entonces puedes usarlo como:

testDF.hasColumn("a") // <-- true testDF.hasColumn("c") // <-- false

Cuando creo un DataFrame partir de un archivo JSON en Spark SQL, ¿cómo puedo saber si existe una columna determinada antes de llamar a .select

Ejemplo de esquema JSON:

{ "a": { "b": 1, "c": 2 } }

Esto es lo que quiero hacer:

potential_columns = Seq("b", "c", "d") df = sqlContext.read.json(filename) potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))

pero no puedo encontrar una buena función para hasColumn . Lo más cerca que he llegado es probar si la columna está en esta matriz algo incómoda:

scala> df.select("a.*").columns res17: Array[String] = Array(b, c)


Otra opción que normalmente uso es

df.columns.contains("column-name-to-check")

Esto devuelve un valor booleano.


Para aquellos que se encuentran con esto en busca de una solución de Python, uso:

if ''column_name_to_check'' in df.columns: # do something

Cuando probé la respuesta de @Jai Prakash de df.columns.contains(''column-name-to-check'') usando Python, obtuve AttributeError: ''list'' object has no attribute ''contains'' .


Si destruye su json usando una definición de esquema cuando lo carga, entonces no necesita verificar la columna. si no está en la fuente json, aparecerá como una columna nula.

val schemaJson = """ { "type": "struct", "fields": [ { "name": field1 "type": "string", "nullable": true, "metadata": {} }, { "name": field2 "type": "string", "nullable": true, "metadata": {} } ] } """ val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType] val djson = sqlContext.read .schema(schema ) .option("badRecordsPath", readExceptionPath) .json(dataPath)


Simplemente asuma que existe y deje que falle con Try . Simple y llanamente y soporta un anidamiento arbitrario:

import scala.util.Try import org.apache.spark.sql.DataFrame def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess val df = sqlContext.read.json(sc.parallelize( """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil)) hasColumn(df, "foobar") // Boolean = false hasColumn(df, "foo") // Boolean = true hasColumn(df, "foo.bar") // Boolean = true hasColumn(df, "foo.bar.foobar") // Boolean = true hasColumn(df, "foo.bar.foobaz") // Boolean = false

O incluso más simple:

val columns = Seq( "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz") columns.flatMap(c => Try(df(c)).toOption) // Seq[org.apache.spark.sql.Column] = List( // foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)

Python equivalente:

from pyspark.sql.utils import AnalysisException from pyspark.sql import Row def has_column(df, col): try: df[col] return True except AnalysisException: return False df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF() has_column(df, "foobar") ## False has_column(df, "foo") ## True has_column(df, "foo.bar") ## True has_column(df, "foo.bar.foobar") ## True has_column(df, "foo.bar.foobaz") ## False


Su otra opción para esto sería hacer alguna manipulación de matriz (en este caso, una intersect ) en las df.columns y sus df.columns potential_columns .

val potential_columns = Seq("a.b", "a.c", "a.d") // Our object model case class Document( a: String, b: String, c: String) case class Document2( a: Document, b: String, c: String) // And some data... val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF // We go through each of the fields in the schema. // For StructTypes we return an array of parentName.fieldName // For everything else we return an array containing just the field name // We then flatten the complete list of field names // Then we intersect that with our potential_columns leaving us just a list of column we want // we turn the array of strings into column objects // Finally turn the result into a vararg (: _*) df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show

Por desgracia, esto no funcionará para el escenario de objeto interno anterior. Tendrá que mirar el esquema para eso.

Voy a cambiar sus columnas_ potential_columns a nombres de columna totalmente calificados

df.schema.fieldNames.contains("column_name")

Esto solo tiene un nivel de profundidad, por lo que para hacerlo genérico, tendría que hacer más trabajo.


Try no es óptimo ya que evaluará la expresión dentro de Try antes de tomar la decisión.

Para conjuntos de datos grandes, use lo siguiente en Scala :

// Loading some data (so you can just copy & paste right into spark-shell) case class Document( a: String, b: String, c: String) val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF // The columns we want to extract val potential_columns = Seq("b", "c", "d") // Get the intersect of the potential columns and the actual columns, // we turn the array of strings into column objects // Finally turn the result into a vararg (: _*) df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show


def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = Try(df.select(colName)).isSuccess

Use la función mencionada anteriormente para verificar la existencia de una columna, incluido el nombre de columna anidada.