python - que - ¿Cómo agrego una nueva columna a un Spark DataFrame(usando PySpark)?
rdd spark (8)
La forma más sencilla de agregar una columna es usar "withColumn". Dado que el marco de datos se crea utilizando sqlContext, debe especificar el esquema o, de forma predeterminada, puede estar disponible en el conjunto de datos. Si se especifica el esquema, la carga de trabajo se vuelve tediosa al cambiar cada vez.
A continuación se muestra un ejemplo que puede considerar:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default
# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")
# Check the change
Data.printSchema()
Tengo un Spark DataFrame (usando PySpark 1.5.1) y me gustaría agregar una nueva columna.
He intentado lo siguiente sin ningún éxito:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=[''new_col''])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
También recibí un error al usar esto:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
Entonces, ¿cómo agrego una nueva columna (basada en el vector Python) a un DataFrame existente con PySpark?
Me gustaría ofrecer un ejemplo generalizado para un caso de uso muy similar:
Caso de uso: tengo un csv que consta de:
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
Necesito realizar algunas transformaciones y el csv final debe verse como
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
Necesito hacer esto porque este es el esquema definido por algún modelo y necesito que mis datos finales sean interoperables con inserciones masivas de SQL y esas cosas.
entonces:
1) Leí el csv original usando spark.read y lo llamo "df".
2) Hago algo a los datos.
3) Agrego las columnas nulas usando este script:
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias(''{0}''.format(column)))
df = df.select(outcols)
De esta manera, puede estructurar su esquema después de cargar un csv (también funcionaría para reordenar columnas si tiene que hacer esto para muchas tablas).
No puede agregar una columna arbitraria a un
DataFrame
en Spark.
Las nuevas columnas solo se pueden crear usando literales (se describen otros tipos de literales en
¿Cómo agregar una columna constante en un Spark DataFrame?
)
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
transformando una columna existente:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
incluido usando
join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
o generado con la función / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
Las funciones integradas en cuanto al
pyspark.sql.functions
(
pyspark.sql.functions
), que se asignan a la expresión de Catalyst, generalmente se prefieren a las funciones definidas por el usuario de Python.
Si desea agregar contenido de un RDD arbitrario como una columna, puede
- agregar números de fila al marco de datos existente
-
llame a
zipWithIndex
en RDD yzipWithIndex
a marco de datos - une ambos usando index como una clave de combinación
Para Spark 2.0
# assumes schema has ''age'' column
df.select(''*'', (df.age + 10).alias(''agePlusTen''))
Para agregar una columna usando un UDF:
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return ''cat1''
elif value == 2: return ''cat2''
...
else: return ''n/a''
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
Podemos agregar columnas adicionales a DataFrame directamente con los siguientes pasos:
from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
Puede definir un nuevo
udf
al agregar un
column_name
:
u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias(''column_name'')
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn(''new_col'', func_name(df.old_col))