scala - started - ¿Cómo cambiar los tipos de columna en el DataFrame de Spark SQL?
pyspark (19)
Supongamos que estoy haciendo algo como:
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()
root
|-- year: string (nullable = true)
|-- make: string (nullable = true)
|-- model: string (nullable = true)
|-- comment: string (nullable = true)
|-- blank: string (nullable = true)
df.show()
year make model comment blank
2012 Tesla S No comment
1997 Ford E350 Go get one now th...
pero realmente quería el
year
como
Int
(y tal vez transformar algunas otras columnas).
Lo mejor que se me ocurre es
df.withColumn("year2", ''year.cast("Int")).select(''year2 as ''year, ''make, ''model, ''comment, ''blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]
lo cual es un poco complicado.
Vengo de R y estoy acostumbrado a poder escribir, p. Ej.
df2 <- df %>%
mutate(year = year %>% as.integer,
make = make %>% toupper)
Es probable que me falte algo, ya que debería haber una mejor manera de hacer esto en spark / scala ...
Editar: versión más nueva
Desde spark 2.x puedes usar
.withColumn
.
Consulta los documentos aquí:
Respuesta más antigua
Desde Spark versión 1.4, puede aplicar el método de conversión con DataType en la columna:
import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
.drop("year")
.withColumnRenamed("yearTmp", "year")
Si está utilizando expresiones sql, también puede hacer:
val df2 = df.selectExpr("cast(year as int) year",
"make",
"model",
"comment",
"blank")
Para obtener más información, consulte los documentos: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame
Código Java para modificar el tipo de datos del DataFrame de String a Integer
df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
Simplemente lanzará el (tipo de datos de cadena) existente a Integer.
Como la operación de
cast
está disponible para las
Column
chispa (y personalmente no estoy a favor de las
udf
propuestas por @
Svend
en este momento), ¿qué tal si:
df.select( df("year").cast(IntegerType).as("year"), ... )
para emitir al tipo solicitado?
Como un efecto secundario claro, los valores no convertibles / "convertibles" en ese sentido, serán
null
.
En caso de que necesite esto como método auxiliar , use:
object DFHelper{
def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
df.withColumn( cn, df(cn).cast(tpe) )
}
}
que se usa como:
import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
De otra manera:
// Generate a simple dataset containing five values and convert int to string type
val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")
En caso de que tenga que cambiar el nombre de docenas de columnas dadas por su nombre, el siguiente ejemplo toma el enfoque de @dnlbrky y lo aplica a varias columnas a la vez:
df.selectExpr(df.columns.map(cn => {
if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
else cn
}):_*)
Las columnas no emitidas se mantienen sin cambios. Todas las columnas permanecen en su orden original.
Este método eliminará la columna anterior y creará nuevas columnas con los mismos valores y un nuevo tipo de datos. Mis tipos de datos originales cuando se creó el DataFrame fueron: -
root
|-- id: integer (nullable = true)
|-- flag1: string (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag3: string (nullable = true)
Después de esto, ejecuté el siguiente código para cambiar el tipo de datos: -
df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)
Después de esto, mi resultado resultó ser: -
root
|-- id: integer (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag1: boolean (nullable = true)
|-- flag3: boolean (nullable = true)
Genere un conjunto de datos simple que contenga cinco valores y convierta
int
a tipo de
string
:
val df = spark.range(5).select( col("id").cast("string") )
Para convertir el año de cadena a int, puede agregar la siguiente opción al lector csv: "inferSchema" -> "true", consulte la documentación de DataBricks
Por lo tanto, esto solo funciona si tiene problemas para guardar en un controlador jdbc como sqlserver, pero es realmente útil para los errores que encontrará con la sintaxis y los tipos.
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
// case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
case _ => throw new IllegalArgumentException(s"Don''t know how to save ${dt.json} to JDBC")
}
}
JdbcDialects.registerDialect(SQLServerDialect)
Puede usar
selectExpr
para hacerlo un poco más limpio:
df.selectExpr("cast(year as int) as year", "upper(make) as make",
"model", "comment", "blank")
Puedes usar el siguiente código.
df.withColumn("year", df("year").cast(IntegerType))
Lo que convertirá la columna del
año en
columna
IntegerType
.
Se puede cambiar el tipo de datos de una columna utilizando cast in spark sql. el nombre de la tabla es table y solo tiene dos columnas column1 y column2 y el tipo de datos column1 se debe cambiar. ex-spark.sql ("seleccione cast (column1 como Double) column1NewName, column2 from table") En lugar de double escriba su tipo de datos.
Usando Spark Sql 2.4.0 puedes hacer eso:
spark.sql("SELECT STRING(NULLIF(column,'''')) as column_string")
[EDITAR: marzo de 2016: gracias por los votos!
Aunque realmente, esta no es la mejor respuesta, creo que las soluciones basadas en
withColumn
,
withColumnRenamed
y el
cast
presentadas por msemelman, Martin Senne y otros son más simples y limpias].
Creo que su enfoque está bien, recuerde que un Spark
DataFrame
es un RDD (inmutable) de filas, por lo que nunca estamos
reemplazando
realmente una columna, simplemente creando un nuevo
DataFrame
cada vez con un nuevo esquema.
Suponiendo que tiene un df original con el siguiente esquema:
scala> df.printSchema
root
|-- Year: string (nullable = true)
|-- Month: string (nullable = true)
|-- DayofMonth: string (nullable = true)
|-- DayOfWeek: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Distance: string (nullable = true)
|-- CRSDepTime: string (nullable = true)
Y algunos UDF definidos en una o varias columnas:
import org.apache.spark.sql.functions._
val toInt = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt )
val days_since_nearest_holidays = udf(
(year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
)
Cambiar los tipos de columna o incluso crear un nuevo DataFrame a partir de otro se puede escribir de esta manera:
val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour", toHour(df("CRSDepTime")))
.withColumn("dayOfWeek", toInt(df("DayOfWeek")))
.withColumn("dayOfMonth", toInt(df("DayofMonth")))
.withColumn("month", toInt(df("Month")))
.withColumn("distance", toDouble(df("Distance")))
.withColumn("nearestHoliday", days_since_nearest_holidays(
df("Year"), df("Month"), df("DayofMonth"))
)
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth",
"month", "distance", "nearestHoliday")
cuyos rendimientos:
scala> df.printSchema
root
|-- departureDelay: double (nullable = true)
|-- departureHour: integer (nullable = true)
|-- dayOfWeek: integer (nullable = true)
|-- dayOfMonth: integer (nullable = true)
|-- month: integer (nullable = true)
|-- distance: double (nullable = true)
|-- nearestHoliday: integer (nullable = true)
Esto está bastante cerca de su propia solución.
Simplemente, mantener los cambios de tipo y otras transformaciones como
udf val
separados hace que el código sea más legible y reutilizable.
las respuestas sugieren usar cast, para su información, el método de lanzamiento en spark 1.4.1 está roto.
por ejemplo, un marco de datos con una columna de cadena que tiene el valor "8182175552014127960" cuando se convierte en bigint tiene el valor "8182175552014128100"
df.show
+-------------------+
| a|
+-------------------+
|8182175552014127960|
+-------------------+
df.selectExpr("cast(a as bigint) a").show
+-------------------+
| a|
+-------------------+
|8182175552014128100|
+-------------------+
Tuvimos que enfrentar muchos problemas antes de encontrar este error porque teníamos grandes columnas en producción.
Primero , si quieres emitir un tipo, entonces esto:
import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))
Con el mismo nombre de columna, la columna será reemplazada por una nueva. No necesita agregar ni eliminar pasos.
Segundo
, sobre Scala vs R.
Este es el código que más parecido a RI puede tener:
val df2 = df.select(
df.columns.map {
case year @ "year" => df(year).cast(IntegerType).as(year)
case make @ "make" => functions.upper(df(make)).as(make)
case other => df(other)
}: _*
)
Aunque la longitud del código es un poco más larga que la de R.
Eso no tiene nada que ver con la verbosidad del lenguaje.
En R, la
mutate
es una función especial para el marco de datos R, mientras que en Scala puede ad-hoc fácilmente gracias a su poder expresivo.
En resumen, evita soluciones específicas, porque la base es lo suficientemente buena como para que pueda crear de manera rápida y fácil sus propias funciones de lenguaje de dominio.
nota al margen:
df.columns
es sorprendentemente un
Array[String]
lugar de
Array[Column]
, tal vez quieran que se vea como el marco de datos de los pandas de Python.
val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
//Schema to be applied to the table
val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)
val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()
df.select($"long_col".cast(IntegerType).as("int_col"))
Another solution is as follows:
1) Keep "inferSchema" as False
2) While running ''Map'' functions on the row, you can read ''asString'' (row.getString...)
<Code>
//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema","false")
.load(args[0]);
JavaRDD<Box> vertices = enginesDataSet
.select("BOX","BOX_CD")
.toJavaRDD()
.map(new Function<Row, Box>() {
@Override
public Box call(Row row) throws Exception {
return new Box((String)row.getString(0),(String)row.get(1));
}
});
</Code>