structtype spark examples crear python apache-spark dataframe pyspark spark-dataframe apache-spark-sql

examples - python spark dataframe



¿Cómo agregar una columna constante en un Spark DataFrame? (2)

Quiero agregar una columna en un DataFrame con algún valor arbitrario (que es lo mismo para cada fila). Me sale un error cuando uso withColumn siguiente manera:

dt.withColumn(''new_column'', 10).head(5)

--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-50-a6d0257ca2be> in <module>() 1 dt = (messages 2 .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt"))) ----> 3 dt.withColumn(''new_column'', 10).head(5) /Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col) 1166 [Row(age=2, name=u''Alice'', age2=4), Row(age=5, name=u''Bob'', age2=7)] 1167 """ -> 1168 return self.select(''*'', col.alias(colName)) 1169 1170 @ignore_unicode_prefix AttributeError: ''int'' object has no attribute ''alias''

Parece que puedo engañar a la función para que funcione como quiero al sumar y restar una de las otras columnas (para que sumen a cero) y luego sumar el número que quiero (10 en este caso):

dt.withColumn(''new_column'', dt.messagetype - dt.messagetype + 10).head(5)

[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10), Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10), Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10), Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10), Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]

Esto es supremamente hacky, ¿verdad? ¿Asumo que hay una manera más legítima de hacer esto?


En spark 2.2 hay dos formas de agregar valor constante en una columna en DataFrame:

1) Usando lit

2) Usando typedLit .

La diferencia entre los dos es que typedLit también puede manejar tipos de typedLit parametrizados, por ejemplo, List, Seq y Map

Marco de datos de muestra:

val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1") +---+----+ | id|col1| +---+----+ | 0| a| | 1| b| +---+----+

1) Usando lit : agregando valor de cadena constante en una nueva columna llamada newcol:

import org.apache.spark.sql.functions.lit val newdf = df.withColumn("newcol",lit("myval"))

Resultado:

+---+----+------+ | id|col1|newcol| +---+----+------+ | 0| a| myval| | 1| b| myval| +---+----+------+

2) Usando typedLit :

import org.apache.spark.sql.functions.typedLit df.withColumn("newcol", typedLit(("sample", 10, .044)))

Resultado:

+---+----+-----------------+ | id|col1| newcol| +---+----+-----------------+ | 0| a|[sample,10,0.044]| | 1| b|[sample,10,0.044]| | 2| c|[sample,10,0.044]| +---+----+-----------------+


Spark 2.2+

Spark 2.2 presenta typedLit para admitir Seq , Map y Tuples ( SPARK-19254 ) y se deben SPARK-19254 siguientes llamadas (Scala):

import org.apache.spark.sql.functions.typedLit df.withColumn("some_array", typedLit(Seq(1, 2, 3))) df.withColumn("some_struct", typedLit(("foo", 1, .0.3))) df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3+ ( lit ), 1.4+ ( array , struct ), 2.0+ ( map ):

El segundo argumento para DataFrame.withColumn debe ser una Column por lo que debe usar un literal:

from pyspark.sql.functions import lit df.withColumn(''new_column'', lit(10))

Si necesita columnas complejas, puede construirlas usando bloques como array :

from pyspark.sql.functions import array, create_map, struct df.withColumn("some_array", array(lit(1), lit(2), lit(3))) df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3))) df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

Se pueden usar exactamente los mismos métodos en Scala.

import org.apache.spark.sql.functions.{array, lit, map, struct} df.withColumn("new_column", lit(10)) df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

Para proporcionar nombres para structs use cualquiera de los alias en cada campo:

df.withColumn( "some_struct", struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z")) )

o cast sobre todo el objeto

df.withColumn( "some_struct", struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>") )

También es posible, aunque más lento, usar un UDF.

Nota :

Se pueden usar las mismas construcciones para pasar argumentos constantes a UDF o funciones SQL.