tutorial spark software examples example scala apache-spark dataframe apache-spark-sql apache-spark-ml

scala - software - Descartar una columna anidada de Spark DataFrame



spark software (4)

Ampliando la respuesta spektom. Con soporte para tipos de matriz:

object DataFrameUtils { private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { if (fullColName.equals(dropColName)) { None } else if (dropColName.startsWith(s"$fullColName.")) { colType match { case colType: StructType => Some(struct( colType.fields .flatMap(f => dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { case Some(x) => Some(x.alias(f.name)) case None => None }) : _*)) case colType: ArrayType => colType.elementType match { case innerType: StructType => Some(struct(innerType.fields .flatMap(f => dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { case Some(x) => Some(x.alias(f.name)) case None => None }) : _*)) } case other => Some(col) } } else { Some(col) } } protected def dropColumn(df: DataFrame, colName: String): DataFrame = { df.schema.fields .flatMap(f => { if (colName.startsWith(s"${f.name}.")) { dropSubColumn(col(f.name), f.dataType, f.name, colName) match { case Some(x) => Some((f.name, x)) case None => None } } else { None } }) .foldLeft(df.drop(colName)) { case (df, (colName, column)) => df.withColumn(colName, column) } } /** * Extended version of DataFrame that allows to operate on nested fields */ implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { /** * Drops nested field from DataFrame * * @param colName Dot-separated nested field name */ def dropNestedColumn(colName: String): DataFrame = { DataFrameUtils.dropColumn(df, colName) } } }

Tengo un DataFrame con el esquema

root |-- label: string (nullable = true) |-- features: struct (nullable = true) | |-- feat1: string (nullable = true) | |-- feat2: string (nullable = true) | |-- feat3: string (nullable = true)

Mientras, puedo filtrar el marco de datos usando

val data = rawData .filter( !(rawData("features.feat1") <=> "100") )

No puedo soltar las columnas usando

val data = rawData .drop("features.feat1")

¿Es algo que estoy haciendo mal aquí? También intenté (sin éxito) hacer drop(rawData("features.feat1")) , aunque no tiene mucho sentido hacerlo.

Gracias por adelantado,

Nikhil


Es solo un ejercicio de programación, pero puedes probar algo como esto:

import org.apache.spark.sql.{DataFrame, Column} import org.apache.spark.sql.types.{StructType, StructField} import org.apache.spark.sql.{functions => f} import scala.util.Try case class DFWithDropFrom(df: DataFrame) { def getSourceField(source: String): Try[StructField] = { Try(df.schema.fields.filter(_.name == source).head) } def getType(sourceField: StructField): Try[StructType] = { Try(sourceField.dataType.asInstanceOf[StructType]) } def genOutputCol(names: Array[String], source: String): Column = { f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*) } def dropFrom(source: String, toDrop: Array[String]): DataFrame = { getSourceField(source) .flatMap(getType) .map(_.fieldNames.diff(toDrop)) .map(genOutputCol(_, source)) .map(df.withColumn(source, _)) .getOrElse(df) } }

Ejemplo de uso:

scala> case class features(feat1: String, feat2: String, feat3: String) defined class features scala> case class record(label: String, features: features) defined class record scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>] scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show +-------+--------+ | label|features| +-------+--------+ |a_label| [f2,f3]| +-------+--------+ scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show +-------+----------+ | label| features| +-------+----------+ |a_label|[f1,f2,f3]| +-------+----------+ scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show +-------+----------+ | label| features| +-------+----------+ |a_label|[f1,f2,f3]| +-------+----------+

Agregue una conversión implícita y listo.


Esta versión le permite eliminar columnas anidadas en cualquier nivel:

import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructType, DataType} /** * Various Spark utilities and extensions of DataFrame */ object DataFrameUtils { private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { if (fullColName.equals(dropColName)) { None } else { colType match { case colType: StructType => if (dropColName.startsWith(s"${fullColName}.")) { Some(struct( colType.fields .flatMap(f => dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match { case Some(x) => Some(x.alias(f.name)) case None => None }) : _*)) } else { Some(col) } case other => Some(col) } } } protected def dropColumn(df: DataFrame, colName: String): DataFrame = { df.schema.fields .flatMap(f => { if (colName.startsWith(s"${f.name}.")) { dropSubColumn(col(f.name), f.dataType, f.name, colName) match { case Some(x) => Some((f.name, x)) case None => None } } else { None } }) .foldLeft(df.drop(colName)) { case (df, (colName, column)) => df.withColumn(colName, column) } } /** * Extended version of DataFrame that allows to operate on nested fields */ implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { /** * Drops nested field from DataFrame * * @param colName Dot-separated nested field name */ def dropNestedColumn(colName: String): DataFrame = { DataFrameUtils.dropColumn(df, colName) } } }

Uso:

import DataFrameUtils._ df.dropNestedColumn("a.b.c.d")


Siguiendo el fragmento de código de spektom para scala, he creado un código similar en Java. Como java 8 no tiene foldLeft, utilicé forEachOrdered. Este código es adecuado para spark 2.x (estoy usando 2.1) También noté que soltar una columna y agregarla usando Columna con el mismo nombre no funciona, así que solo estoy reemplazando la columna, y parece que trabajo.

El código no está completamente probado, espero que funcione :-)

public class DataFrameUtils { public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) { final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame); Arrays.stream(dataFrame.schema().fields()) .flatMap( f -> { if (columnName.startsWith(f.name() + ".")) { final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName); if (column.isPresent()) { return Stream.of(new Tuple2<>(f.name(), column)); } else { return Stream.empty(); } } else { return Stream.empty(); } }).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple)); return dataFrameFolder.getDF(); } private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) { Optional<Column> column = Optional.empty(); if (!fullColumnName.equals(dropColumnName)) { if (colType instanceof StructType) { if (dropColumnName.startsWith(fullColumnName + ".")) { column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName))); } } else { column = Optional.of(col); } } return column; } private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) { return Arrays.stream(colType.fields()) .flatMap(f -> { final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(), fullColumnName + "." + f.name(), dropColumnName); if (column.isPresent()) { return Stream.of(column.get().alias(f.name())); } else { return Stream.empty(); } } ).toArray(Column[]::new); } private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> { private Dataset<Row> df; public DataFrameFolder(Dataset<Row> df) { this.df = df; } public Dataset<Row> getDF() { return df; } @Override public void accept(Tuple2<String, Optional<Column>> colTuple) { if (!colTuple._2().isPresent()) { df = df.drop(colTuple._1()); } else { df = df.withColumn(colTuple._1(), colTuple._2().get()); } } }

Ejemplo de uso:

private class Pojo { private String str; private Integer number; private List<String> strList; private Pojo2 pojo2; public String getStr() { return str; } public Integer getNumber() { return number; } public List<String> getStrList() { return strList; } public Pojo2 getPojo2() { return pojo2; } } private class Pojo2 { private String str; private Integer number; private List<String> strList; public String getStr() { return str; } public Integer getNumber() { return number; } public List<String> getStrList() { return strList; } } SQLContext context = new SQLContext(new SparkContext("local[1]", "test")); Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class); Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str");

Estructura original:

root |-- number: integer (nullable = true) |-- pojo2: struct (nullable = true) | |-- number: integer (nullable = true) | |-- str: string (nullable = true) | |-- strList: array (nullable = true) | | |-- element: string (containsNull = true) |-- str: string (nullable = true) |-- strList: array (nullable = true) | |-- element: string (containsNull = true)

Después de la caída:

root |-- number: integer (nullable = true) |-- pojo2: struct (nullable = false) | |-- number: integer (nullable = true) | |-- strList: array (nullable = true) | | |-- element: string (containsNull = true) |-- str: string (nullable = true) |-- strList: array (nullable = true) | |-- element: string (containsNull = true)