iloc - python pandas dataframe index
ActualizaciĆ³n de una columna de marco de datos en chispa (4)
Mirando la nueva API de marco de datos de chispa, no está claro si es posible modificar las columnas del marco de datos.
¿Cómo haría para cambiar un valor en la fila
x
columna
y
de un marco de datos?
En
pandas
esto sería
df.ix[x,y] = new_value
Editar: al consolidar lo que se dijo a continuación, no puede modificar el marco de datos existente ya que es inmutable, pero puede devolver un nuevo marco de datos con las modificaciones deseadas.
Si solo desea reemplazar un valor en una columna en función de una condición, como
np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col(''update_col'') == replace_val, new_value)
.otherwise(F.col(''update_col'')))
df = df.withColumn(''new_column_name'', update_func)
Si desea realizar alguna operación en una columna y crear una nueva columna que se agrega al marco de datos:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn(''new_column_name'', my_udf(''update_col''))
Si desea que la nueva columna tenga el mismo nombre que la columna anterior, puede agregar el paso adicional:
df = df.drop(''update_col'').withColumnRenamed(''new_column_name'', ''update_col'')
Comúnmente al actualizar una columna, queremos asignar un valor antiguo a un nuevo valor. Aquí hay una manera de hacerlo en pyspark sin UDF:
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
Si bien no puede modificar una columna como tal, puede operar en una columna y devolver un nuevo DataFrame que refleje ese cambio.
Para eso, primero debe crear una función
UserDefinedFunction
el
UserDefinedFunction
implemente la operación para aplicar y luego aplicar selectivamente esa función solo a la columna de destino.
En Python:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = ''target_column''
udf = UserDefinedFunction(lambda x: ''new_value'', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
ahora tiene el mismo esquema que
old_df
(suponiendo que
old_df.target_column
fuera también del tipo
StringType
) pero todos los valores en la columna
target_column
serán
target_column
.
Tal como dice
, puede crear un nuevo DataFrame a partir del resultado de un mapa aplicado al antiguo DataFrame.
Un ejemplo para un DataFrame
df
dado con dos filas:
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
Tenga en cuenta que si los tipos de las columnas cambian, debe darle un esquema correcto en lugar de
df.schema
.
Consulte la api de
org.apache.spark.sql.Row
para ver los métodos disponibles:
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[Actualización] O usando UDF en Scala:
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
y si el nombre de la columna debe permanecer igual, puede cambiarle el nombre:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
DataFrames
se basan en RDD.
Los RDD son estructuras inmutables y no permiten actualizar elementos en el sitio.
Para cambiar los valores, deberá crear un nuevo DataFrame transformando el original, ya sea utilizando las operaciones DSL o RDD tipo SQL como el
map
.
Una plataforma de diapositivas muy recomendada: Presentación de DataFrames en Spark para ciencia de datos a gran escala .