tutorial spark que instalar example español datacamp python apache-spark dataframe pyspark spark-dataframe apache-spark-sql

python - que - ¿Cómo agrego una nueva columna a un Spark DataFrame(usando PySpark)?



rdd spark (8)

La forma más sencilla de agregar una columna es usar "withColumn". Dado que el marco de datos se crea utilizando sqlContext, debe especificar el esquema o, de forma predeterminada, puede estar disponible en el conjunto de datos. Si se especifica el esquema, la carga de trabajo se vuelve tediosa al cambiar cada vez.

A continuación se muestra un ejemplo que puede considerar:

from pyspark.sql import SQLContext from pyspark.sql.types import * sqlContext = SQLContext(sc) # SparkContext will be sc by default # Read the dataset of your choice (Already loaded with schema) Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter") # For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following: Data = Data.withColumn("col31", "Code goes here") # Check the change Data.printSchema()

Tengo un Spark DataFrame (usando PySpark 1.5.1) y me gustaría agregar una nueva columna.

He intentado lo siguiente sin ningún éxito:

type(randomed_hours) # => list # Create in Python and transform to RDD new_col = pd.DataFrame(randomed_hours, columns=[''new_col'']) spark_new_col = sqlContext.createDataFrame(new_col) my_df_spark.withColumn("hours", spark_new_col["new_col"])

También recibí un error al usar esto:

my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))

Entonces, ¿cómo agrego una nueva columna (basada en el vector Python) a un DataFrame existente con PySpark?


Me gustaría ofrecer un ejemplo generalizado para un caso de uso muy similar:

Caso de uso: tengo un csv que consta de:

First|Third|Fifth data|data|data data|data|data ...billion more lines

Necesito realizar algunas transformaciones y el csv final debe verse como

First|Second|Third|Fourth|Fifth data|null|data|null|data data|null|data|null|data ...billion more lines

Necesito hacer esto porque este es el esquema definido por algún modelo y necesito que mis datos finales sean interoperables con inserciones masivas de SQL y esas cosas.

entonces:

1) Leí el csv original usando spark.read y lo llamo "df".

2) Hago algo a los datos.

3) Agrego las columnas nulas usando este script:

outcols = [] for column in MY_COLUMN_LIST: if column in df.columns: outcols.append(column) else: outcols.append(lit(None).cast(StringType()).alias(''{0}''.format(column))) df = df.select(outcols)

De esta manera, puede estructurar su esquema después de cargar un csv (también funcionaría para reordenar columnas si tiene que hacer esto para muchas tablas).


No puede agregar una columna arbitraria a un DataFrame en Spark. Las nuevas columnas solo se pueden crear usando literales (se describen otros tipos de literales en ¿Cómo agregar una columna constante en un Spark DataFrame? )

from pyspark.sql.functions import lit df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) df_with_x4 = df.withColumn("x4", lit(0)) df_with_x4.show() ## +---+---+-----+---+ ## | x1| x2| x3| x4| ## +---+---+-----+---+ ## | 1| a| 23.0| 0| ## | 3| B|-23.0| 0| ## +---+---+-----+---+

transformando una columna existente:

from pyspark.sql.functions import exp df_with_x5 = df_with_x4.withColumn("x5", exp("x3")) df_with_x5.show() ## +---+---+-----+---+--------------------+ ## | x1| x2| x3| x4| x5| ## +---+---+-----+---+--------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| ## | 3| B|-23.0| 0|1.026187963170189...| ## +---+---+-----+---+--------------------+

incluido usando join :

from pyspark.sql.functions import exp lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) df_with_x6 = (df_with_x5 .join(lookup, col("x1") == col("k"), "leftouter") .drop("k") .withColumnRenamed("v", "x6")) ## +---+---+-----+---+--------------------+----+ ## | x1| x2| x3| x4| x5| x6| ## +---+---+-----+---+--------------------+----+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo| ## | 3| B|-23.0| 0|1.026187963170189...|null| ## +---+---+-----+---+--------------------+----+

o generado con la función / udf:

from pyspark.sql.functions import rand df_with_x7 = df_with_x6.withColumn("x7", rand()) df_with_x7.show() ## +---+---+-----+---+--------------------+----+-------------------+ ## | x1| x2| x3| x4| x5| x6| x7| ## +---+---+-----+---+--------------------+----+-------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617| ## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873| ## +---+---+-----+---+--------------------+----+-------------------+

Las funciones integradas en cuanto al pyspark.sql.functions ( pyspark.sql.functions ), que se asignan a la expresión de Catalyst, generalmente se prefieren a las funciones definidas por el usuario de Python.

Si desea agregar contenido de un RDD arbitrario como una columna, puede

  • agregar números de fila al marco de datos existente
  • llame a zipWithIndex en RDD y zipWithIndex a marco de datos
  • une ambos usando index como una clave de combinación

Para Spark 2.0

# assumes schema has ''age'' column df.select(''*'', (df.age + 10).alias(''agePlusTen''))


Para agregar una columna usando un UDF:

df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) from pyspark.sql.functions import udf from pyspark.sql.types import * def valueToCategory(value): if value == 1: return ''cat1'' elif value == 2: return ''cat2'' ... else: return ''n/a'' # NOTE: it seems that calls to udf() must be after SparkContext() is called udfValueToCategory = udf(valueToCategory, StringType()) df_with_cat = df.withColumn("category", udfValueToCategory("x1")) df_with_cat.show() ## +---+---+-----+---------+ ## | x1| x2| x3| category| ## +---+---+-----+---------+ ## | 1| a| 23.0| cat1| ## | 3| B|-23.0| n/a| ## +---+---+-----+---------+


Podemos agregar columnas adicionales a DataFrame directamente con los siguientes pasos:

from pyspark.sql.functions import when df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"]) df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive")) df.show()


Puede definir un nuevo udf al agregar un column_name :

u_f = F.udf(lambda :yourstring,StringType()) a.select(u_f().alias(''column_name'')


from pyspark.sql.functions import udf from pyspark.sql.types import * func_name = udf( lambda val: val, # do sth to val StringType() ) df.withColumn(''new_col'', func_name(df.old_col))