withcolumn tutorial spark org español downloads scala apache-spark spark-dataframe

scala - tutorial - spark sql español



Cambiar la propiedad anulable de la columna en el marco de datos de chispa (5)

Estoy creando manualmente un marco de datos para algunas pruebas. El código para crearlo es:

case class input(id:Long, var1:Int, var2:Int, var3:Double) val inputDF = sqlCtx .createDataFrame(List(input(1110,0,1001,-10.00), input(1111,1,1001,10.00), input(1111,0,1002,10.00)))

Entonces el esquema se ve así:

root |-- id: long (nullable = false) |-- var1: integer (nullable = false) |-- var2: integer (nullable = false) |-- var3: double (nullable = false)

Quiero hacer ''nullable = true'' para cada una de estas variables. ¿Cómo lo declaro desde el principio o lo cambio en un nuevo marco de datos después de haber sido creado?


Esta es una respuesta tardía, pero quería dar una solución alternativa para las personas que vienen aquí. Puede hacer que una Column DataFrame sea DataFrame desde el principio mediante la siguiente modificación de su código:

case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double) val inputDF = sqlContext .createDataFrame(List(input(Some(1110),Some(0),1001,-10.00), input(Some(1111),Some(1),1001,10.00), input(Some(1111),Some(0),1002,10.00))) inputDF.printSchema

Esto producirá:

root |-- id: long (nullable = true) |-- var1: integer (nullable = true) |-- var2: integer (nullable = false) |-- var3: double (nullable = false) defined class input inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]

Esencialmente, si declara un campo como una Option usando Some([element]) o None como las entradas reales, entonces ese campo puede ser anulado. De lo contrario, el campo no será anulable. ¡Espero que esto ayude!


Versión más compacta de la configuración de todas las columnas parámetro nulable

En lugar del case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m) se puede usar _.copy(nullable = nullable) . Entonces toda la función se puede escribir como:

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = { df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable)))) }


Responder

Con las importaciones

import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext}

puedes usar

/** * Set nullable property of column. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { // get schema val schema = df.schema // modify [[StructField] with name `cn` val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m) case y: StructField => y }) // apply new schema df.sqlContext.createDataFrame( df.rdd, newSchema ) }

directamente.

También puede hacer que el método esté disponible a través del patrón de biblioteca "pimp my library" (consulte mi publicación SO ¿Cuál es la mejor manera de definir métodos personalizados en un DataFrame? ), De modo que pueda llamar

val df = .... val df2 = df.setNullableStateOfColumn( "id", true )

Editar

Solución alternativa 1

Utilice una ligera versión modificada de setNullableStateOfColumn

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = { // get schema val schema = df.schema // modify [[StructField] with name `cn` val newSchema = StructType(schema.map { case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m) }) // apply new schema df.sqlContext.createDataFrame( df.rdd, newSchema ) }

Solución alternativa 2

Defina explícitamente el esquema. (Use la reflexión para crear una solución que sea más general)

configuredUnitTest(".") { sparkContext => case class Input(id:Long, var1:Int, var2:Int, var3:Double) val sqlContext = new SQLContext(sparkContext) import sqlContext.implicits._ // use this to set the schema explicitly or // use refelection on the case class member to construct the schema val schema = StructType( Seq ( StructField( "id", LongType, true), StructField( "var1", IntegerType, true), StructField( "var2", IntegerType, true), StructField( "var3", DoubleType, true) )) val is: List[Input] = List( Input(1110, 0, 1001,-10.00), Input(1111, 1, 1001, 10.00), Input(1111, 0, 1002, 10.00) ) val rdd: RDD[Input] = sparkContext.parallelize( is ) val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3)) val inputDF = sqlContext.createDataFrame( rowRDD, schema ) inputDF.printSchema inputDF.show() }


Solo use java.lang.Integer en lugar de scala.Int en su clase de caso.

case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)


Otra opción, si necesita cambiar el marco de datos en el lugar, y la recreación es imposible, puede hacer algo como esto:

.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))

Spark pensará que esta columna puede contener null , y la nulabilidad se establecerá en true . Además, puede usar udf para ajustar sus valores en Option . Funciona bien incluso para casos de transmisión.