tutorial started spark getting example scala apache-spark apache-spark-sql

scala - started - ¿Cómo cambiar los tipos de columna en el DataFrame de Spark SQL?



pyspark (19)

Supongamos que estoy haciendo algo como:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true")) df.printSchema() root |-- year: string (nullable = true) |-- make: string (nullable = true) |-- model: string (nullable = true) |-- comment: string (nullable = true) |-- blank: string (nullable = true) df.show() year make model comment blank 2012 Tesla S No comment 1997 Ford E350 Go get one now th...

pero realmente quería el year como Int (y tal vez transformar algunas otras columnas).

Lo mejor que se me ocurre es

df.withColumn("year2", ''year.cast("Int")).select(''year2 as ''year, ''make, ''model, ''comment, ''blank) org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

lo cual es un poco complicado.

Vengo de R y estoy acostumbrado a poder escribir, p. Ej.

df2 <- df %>% mutate(year = year %>% as.integer, make = make %>% toupper)

Es probable que me falte algo, ya que debería haber una mejor manera de hacer esto en spark / scala ...


Editar: versión más nueva

Desde spark 2.x puedes usar .withColumn . Consulta los documentos aquí:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

Respuesta más antigua

Desde Spark versión 1.4, puede aplicar el método de conversión con DataType en la columna:

import org.apache.spark.sql.types.IntegerType val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType)) .drop("year") .withColumnRenamed("yearTmp", "year")

Si está utilizando expresiones sql, también puede hacer:

val df2 = df.selectExpr("cast(year as int) year", "make", "model", "comment", "blank")

Para obtener más información, consulte los documentos: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame


Código Java para modificar el tipo de datos del DataFrame de String a Integer

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))

Simplemente lanzará el (tipo de datos de cadena) existente a Integer.


Como la operación de cast está disponible para las Column chispa (y personalmente no estoy a favor de las udf propuestas por @ Svend en este momento), ¿qué tal si:

df.select( df("year").cast(IntegerType).as("year"), ... )

para emitir al tipo solicitado? Como un efecto secundario claro, los valores no convertibles / "convertibles" en ese sentido, serán null .

En caso de que necesite esto como método auxiliar , use:

object DFHelper{ def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = { df.withColumn( cn, df(cn).cast(tpe) ) } }

que se usa como:

import DFHelper._ val df2 = castColumnTo( df, "year", IntegerType )


De otra manera:

// Generate a simple dataset containing five values and convert int to string type val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")


En caso de que tenga que cambiar el nombre de docenas de columnas dadas por su nombre, el siguiente ejemplo toma el enfoque de @dnlbrky y lo aplica a varias columnas a la vez:

df.selectExpr(df.columns.map(cn => { if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn" else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn" else cn }):_*)

Las columnas no emitidas se mantienen sin cambios. Todas las columnas permanecen en su orden original.


Este método eliminará la columna anterior y creará nuevas columnas con los mismos valores y un nuevo tipo de datos. Mis tipos de datos originales cuando se creó el DataFrame fueron: -

root |-- id: integer (nullable = true) |-- flag1: string (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag3: string (nullable = true)

Después de esto, ejecuté el siguiente código para cambiar el tipo de datos: -

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3 df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

Después de esto, mi resultado resultó ser: -

root |-- id: integer (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag1: boolean (nullable = true) |-- flag3: boolean (nullable = true)


Genere un conjunto de datos simple que contenga cinco valores y convierta int a tipo de string :

val df = spark.range(5).select( col("id").cast("string") )


Para convertir el año de cadena a int, puede agregar la siguiente opción al lector csv: "inferSchema" -> "true", consulte la documentación de DataBricks


Por lo tanto, esto solo funciona si tiene problemas para guardar en un controlador jdbc como sqlserver, pero es realmente útil para los errores que encontrará con la sintaxis y los tipos.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect} import org.apache.spark.sql.jdbc.JdbcType val SQLServerDialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver") override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT)) case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT)) case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE)) case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL)) case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) case _ => throw new IllegalArgumentException(s"Don''t know how to save ${dt.json} to JDBC") } } JdbcDialects.registerDialect(SQLServerDialect)


Puede usar selectExpr para hacerlo un poco más limpio:

df.selectExpr("cast(year as int) as year", "upper(make) as make", "model", "comment", "blank")


Puedes usar el siguiente código.

df.withColumn("year", df("year").cast(IntegerType))

Lo que convertirá la columna del año en columna IntegerType .


Se puede cambiar el tipo de datos de una columna utilizando cast in spark sql. el nombre de la tabla es table y solo tiene dos columnas column1 y column2 y el tipo de datos column1 se debe cambiar. ex-spark.sql ("seleccione cast (column1 como Double) column1NewName, column2 from table") En lugar de double escriba su tipo de datos.


Usando Spark Sql 2.4.0 puedes hacer eso:

spark.sql("SELECT STRING(NULLIF(column,'''')) as column_string")


[EDITAR: marzo de 2016: gracias por los votos! Aunque realmente, esta no es la mejor respuesta, creo que las soluciones basadas en withColumn , withColumnRenamed y el cast presentadas por msemelman, Martin Senne y otros son más simples y limpias].

Creo que su enfoque está bien, recuerde que un Spark DataFrame es un RDD (inmutable) de filas, por lo que nunca estamos reemplazando realmente una columna, simplemente creando un nuevo DataFrame cada vez con un nuevo esquema.

Suponiendo que tiene un df original con el siguiente esquema:

scala> df.printSchema root |-- Year: string (nullable = true) |-- Month: string (nullable = true) |-- DayofMonth: string (nullable = true) |-- DayOfWeek: string (nullable = true) |-- DepDelay: string (nullable = true) |-- Distance: string (nullable = true) |-- CRSDepTime: string (nullable = true)

Y algunos UDF definidos en una o varias columnas:

import org.apache.spark.sql.functions._ val toInt = udf[Int, String]( _.toInt) val toDouble = udf[Double, String]( _.toDouble) val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) val days_since_nearest_holidays = udf( (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12 )

Cambiar los tipos de columna o incluso crear un nuevo DataFrame a partir de otro se puede escribir de esta manera:

val featureDf = df .withColumn("departureDelay", toDouble(df("DepDelay"))) .withColumn("departureHour", toHour(df("CRSDepTime"))) .withColumn("dayOfWeek", toInt(df("DayOfWeek"))) .withColumn("dayOfMonth", toInt(df("DayofMonth"))) .withColumn("month", toInt(df("Month"))) .withColumn("distance", toDouble(df("Distance"))) .withColumn("nearestHoliday", days_since_nearest_holidays( df("Year"), df("Month"), df("DayofMonth")) ) .select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", "month", "distance", "nearestHoliday")

cuyos rendimientos:

scala> df.printSchema root |-- departureDelay: double (nullable = true) |-- departureHour: integer (nullable = true) |-- dayOfWeek: integer (nullable = true) |-- dayOfMonth: integer (nullable = true) |-- month: integer (nullable = true) |-- distance: double (nullable = true) |-- nearestHoliday: integer (nullable = true)

Esto está bastante cerca de su propia solución. Simplemente, mantener los cambios de tipo y otras transformaciones como udf val separados hace que el código sea más legible y reutilizable.


las respuestas sugieren usar cast, para su información, el método de lanzamiento en spark 1.4.1 está roto.

por ejemplo, un marco de datos con una columna de cadena que tiene el valor "8182175552014127960" cuando se convierte en bigint tiene el valor "8182175552014128100"

df.show +-------------------+ | a| +-------------------+ |8182175552014127960| +-------------------+ df.selectExpr("cast(a as bigint) a").show +-------------------+ | a| +-------------------+ |8182175552014128100| +-------------------+

Tuvimos que enfrentar muchos problemas antes de encontrar este error porque teníamos grandes columnas en producción.


Primero , si quieres emitir un tipo, entonces esto:

import org.apache.spark.sql df.withColumn("year", $"year".cast(sql.types.IntegerType))

Con el mismo nombre de columna, la columna será reemplazada por una nueva. No necesita agregar ni eliminar pasos.

Segundo , sobre Scala vs R.
Este es el código que más parecido a RI puede tener:

val df2 = df.select( df.columns.map { case year @ "year" => df(year).cast(IntegerType).as(year) case make @ "make" => functions.upper(df(make)).as(make) case other => df(other) }: _* )

Aunque la longitud del código es un poco más larga que la de R. Eso no tiene nada que ver con la verbosidad del lenguaje. En R, la mutate es una función especial para el marco de datos R, mientras que en Scala puede ad-hoc fácilmente gracias a su poder expresivo.
En resumen, evita soluciones específicas, porque la base es lo suficientemente buena como para que pueda crear de manera rápida y fácil sus propias funciones de lenguaje de dominio.

nota al margen: df.columns es sorprendentemente un Array[String] lugar de Array[Column] , tal vez quieran que se vea como el marco de datos de los pandas de Python.


val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd //Schema to be applied to the table val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType) val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()


df.select($"long_col".cast(IntegerType).as("int_col"))


Another solution is as follows: 1) Keep "inferSchema" as False 2) While running ''Map'' functions on the row, you can read ''asString'' (row.getString...) <Code> //Read CSV and create dataset Dataset<Row> enginesDataSet = sparkSession .read() .format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema","false") .load(args[0]); JavaRDD<Box> vertices = enginesDataSet .select("BOX","BOX_CD") .toJavaRDD() .map(new Function<Row, Box>() { @Override public Box call(Row row) throws Exception { return new Box((String)row.getString(0),(String)row.get(1)); } }); </Code>