software - spark scala tutorial
¿Cómo detecto si un Spark DataFrame tiene una columna? (8)
En realidad, ni siquiera necesita llamar a select para usar columnas, solo puede llamarlo en el marco de datos en sí
// define test data
case class Test(a: Int, b: Int)
val testList = List(Test(1,2), Test(3,4))
val testDF = sqlContext.createDataFrame(testList)
// define the hasColumn function
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)
// then you can just use it on the DF with a given column name
hasColumn(testDF, "a") // <-- true
hasColumn(testDF, "c") // <-- false
Alternativamente, puede definir una clase implícita utilizando el patrón pimp my library para que el método hasColumn esté disponible en sus marcos de datos directamente
implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
def hasColumn(colName: String) = df.columns.contains(colName)
}
Entonces puedes usarlo como:
testDF.hasColumn("a") // <-- true
testDF.hasColumn("c") // <-- false
Cuando creo un
DataFrame
partir de un archivo JSON en Spark SQL, ¿cómo puedo saber si existe una columna determinada antes de llamar a
.select
Ejemplo de esquema JSON:
{
"a": {
"b": 1,
"c": 2
}
}
Esto es lo que quiero hacer:
potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))
pero no puedo encontrar una buena función para
hasColumn
.
Lo más cerca que he llegado es probar si la columna está en esta matriz algo incómoda:
scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
Otra opción que normalmente uso es
df.columns.contains("column-name-to-check")
Esto devuelve un valor booleano.
Para aquellos que se encuentran con esto en busca de una solución de Python, uso:
if ''column_name_to_check'' in df.columns:
# do something
Cuando probé la respuesta de @Jai Prakash de
df.columns.contains(''column-name-to-check'')
usando Python, obtuve
AttributeError: ''list'' object has no attribute ''contains''
.
Si destruye su json usando una definición de esquema cuando lo carga, entonces no necesita verificar la columna. si no está en la fuente json, aparecerá como una columna nula.
val schemaJson = """
{
"type": "struct",
"fields": [
{
"name": field1
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": field2
"type": "string",
"nullable": true,
"metadata": {}
}
]
}
"""
val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]
val djson = sqlContext.read
.schema(schema )
.option("badRecordsPath", readExceptionPath)
.json(dataPath)
Simplemente asuma que existe y deje que falle con
Try
.
Simple y llanamente y soporta un anidamiento arbitrario:
import scala.util.Try
import org.apache.spark.sql.DataFrame
def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
val df = sqlContext.read.json(sc.parallelize(
"""{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))
hasColumn(df, "foobar")
// Boolean = false
hasColumn(df, "foo")
// Boolean = true
hasColumn(df, "foo.bar")
// Boolean = true
hasColumn(df, "foo.bar.foobar")
// Boolean = true
hasColumn(df, "foo.bar.foobaz")
// Boolean = false
O incluso más simple:
val columns = Seq(
"foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")
columns.flatMap(c => Try(df(c)).toOption)
// Seq[org.apache.spark.sql.Column] = List(
// foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)
Python equivalente:
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row
def has_column(df, col):
try:
df[col]
return True
except AnalysisException:
return False
df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()
has_column(df, "foobar")
## False
has_column(df, "foo")
## True
has_column(df, "foo.bar")
## True
has_column(df, "foo.bar.foobar")
## True
has_column(df, "foo.bar.foobaz")
## False
Su otra opción para esto sería hacer alguna manipulación de matriz (en este caso, una
intersect
) en las
df.columns
y sus
df.columns
potential_columns
.
val potential_columns = Seq("a.b", "a.c", "a.d")
// Our object model
case class Document( a: String, b: String, c: String)
case class Document2( a: Document, b: String, c: String)
// And some data...
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF
// We go through each of the fields in the schema.
// For StructTypes we return an array of parentName.fieldName
// For everything else we return an array containing just the field name
// We then flatten the complete list of field names
// Then we intersect that with our potential_columns leaving us just a list of column we want
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show
Por desgracia, esto no funcionará para el escenario de objeto interno anterior. Tendrá que mirar el esquema para eso.
Voy a cambiar sus columnas_
potential_columns
a nombres de columna totalmente calificados
df.schema.fieldNames.contains("column_name")
Esto solo tiene un nivel de profundidad, por lo que para hacerlo genérico, tendría que hacer más trabajo.
Try
no es óptimo ya que evaluará la expresión dentro de
Try
antes de tomar la decisión.
Para conjuntos de datos grandes, use lo siguiente en
Scala
:
// Loading some data (so you can just copy & paste right into spark-shell)
case class Document( a: String, b: String, c: String)
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF
// The columns we want to extract
val potential_columns = Seq("b", "c", "d")
// Get the intersect of the potential columns and the actual columns,
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
Try(df.select(colName)).isSuccess
Use la función mencionada anteriormente para verificar la existencia de una columna, incluido el nombre de columna anidada.