index iloc example data columns column python apache-spark pyspark apache-spark-sql spark-dataframe

iloc - python pandas dataframe index



ActualizaciĆ³n de una columna de marco de datos en chispa (4)

Mirando la nueva API de marco de datos de chispa, no está claro si es posible modificar las columnas del marco de datos.

¿Cómo haría para cambiar un valor en la fila x columna y de un marco de datos?

En pandas esto sería df.ix[x,y] = new_value

Editar: al consolidar lo que se dijo a continuación, no puede modificar el marco de datos existente ya que es inmutable, pero puede devolver un nuevo marco de datos con las modificaciones deseadas.

Si solo desea reemplazar un valor en una columna en función de una condición, como np.where :

from pyspark.sql import functions as F update_func = (F.when(F.col(''update_col'') == replace_val, new_value) .otherwise(F.col(''update_col''))) df = df.withColumn(''new_column_name'', update_func)

Si desea realizar alguna operación en una columna y crear una nueva columna que se agrega al marco de datos:

import pyspark.sql.functions as F import pyspark.sql.types as T def my_func(col): do stuff to column here return transformed_value # if we assume that my_func returns a string my_udf = F.UserDefinedFunction(my_func, T.StringType()) df = df.withColumn(''new_column_name'', my_udf(''update_col''))

Si desea que la nueva columna tenga el mismo nombre que la columna anterior, puede agregar el paso adicional:

df = df.drop(''update_col'').withColumnRenamed(''new_column_name'', ''update_col'')


Comúnmente al actualizar una columna, queremos asignar un valor antiguo a un nuevo valor. Aquí hay una manera de hacerlo en pyspark sin UDF:

# update df[update_col], mapping old_value --> new_value from pyspark.sql import functions as F df = df.withColumn(update_col, F.when(df[update_col]==old_value,new_value). otherwise(df[update_col])).


Si bien no puede modificar una columna como tal, puede operar en una columna y devolver un nuevo DataFrame que refleje ese cambio. Para eso, primero debe crear una función UserDefinedFunction el UserDefinedFunction implemente la operación para aplicar y luego aplicar selectivamente esa función solo a la columna de destino. En Python:

from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import StringType name = ''target_column'' udf = UserDefinedFunction(lambda x: ''new_value'', StringType()) new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df ahora tiene el mismo esquema que old_df (suponiendo que old_df.target_column fuera también del tipo StringType ) pero todos los valores en la columna target_column serán target_column .


Tal como dice , puede crear un nuevo DataFrame a partir del resultado de un mapa aplicado al antiguo DataFrame. Un ejemplo para un DataFrame df dado con dos filas:

val newDf = sqlContext.createDataFrame(df.map(row => Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Tenga en cuenta que si los tipos de las columnas cambian, debe darle un esquema correcto en lugar de df.schema . Consulte la api de org.apache.spark.sql.Row para ver los métodos disponibles: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Actualización] O usando UDF en Scala:

import org.apache.spark.sql.functions._ val toLong = udf[Long, String] (_.toLong) val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

y si el nombre de la columna debe permanecer igual, puede cambiarle el nombre:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")


DataFrames se basan en RDD. Los RDD son estructuras inmutables y no permiten actualizar elementos en el sitio. Para cambiar los valores, deberá crear un nuevo DataFrame transformando el original, ya sea utilizando las operaciones DSL o RDD tipo SQL como el map .

Una plataforma de diapositivas muy recomendada: Presentación de DataFrames en Spark para ciencia de datos a gran escala .