apache-spark pyspark spark-dataframe pyspark-sql

apache spark - TypeError: la columna no es iterable-¿Cómo iterar sobre ArrayType()?



apache-spark pyspark (2)

Considere el siguiente DataFrame:

+------+-----------------------+ |type |names | +------+-----------------------+ |person|[john, sam, jane] | |pet |[whiskers, rover, fido]| +------+-----------------------+

Que se puede crear con el siguiente código:

import pyspark.sql.functions as f data = [ (''person'', [''john'', ''sam'', ''jane'']), (''pet'', [''whiskers'', ''rover'', ''fido'']) ] df = sqlCtx.createDataFrame(data, ["type", "names"]) df.show(truncate=False)

¿Hay alguna manera de modificar directamente los "names" columna ArrayType() aplicando una función a cada elemento, sin usar un udf ?

Por ejemplo, supongamos que quisiera aplicar la función foo a la columna "names" . ( str.upper el ejemplo donde foo es str.upper solo con fines ilustrativos, pero mi pregunta se refiere a cualquier función válida que pueda aplicarse a los elementos de un iterable).

foo = lambda x: x.upper() # defining it as str.upper as an example df.withColumn(''X'', [foo(x) for x in f.col("names")]).show()

TypeError: la columna no es iterable

Podría hacer esto usando un udf :

foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType())) df.withColumn(''names'', foo_udf(f.col(''names''))).show(truncate=False) #+------+-----------------------+ #|type |names | #+------+-----------------------+ #|person|[JOHN, SAM, JANE] | #|pet |[WHISKERS, ROVER, FIDO]| #+------+-----------------------+

En este ejemplo específico, podría evitar el udf al explotar la columna, llamar a pyspark.sql.functions.upper() y luego groupBy y collect_list :

df.select(''type'', f.explode(''names'').alias(''name''))/ .withColumn(''name'', f.upper(f.col(''name'')))/ .groupBy(''type'')/ .agg(f.collect_list(''name'').alias(''names''))/ .show(truncate=False) #+------+-----------------------+ #|type |names | #+------+-----------------------+ #|person|[JOHN, SAM, JANE] | #|pet |[WHISKERS, ROVER, FIDO]| #+------+-----------------------+

Pero esto es mucho código para hacer algo simple. ¿Existe una forma más directa de iterar sobre los elementos de un ArrayType() utilizando funciones de marco de datos de chispa?


En Spark <2.4 puede usar una función definida por el usuario:

from pyspark.sql.functions import udf from pyspark.sql.types import ArrayType, DataType, StringType def transform(f, t=StringType()): if not isinstance(t, DataType): raise TypeError("Invalid type {}".format(type(t))) @udf(ArrayType(t)) def _(xs): if xs is not None: return [f(x) for x in xs] return _ foo_udf = transform(str.upper) df.withColumn(''names'', foo_udf(f.col(''names''))).show(truncate=False)

+------+-----------------------+ |type |names | +------+-----------------------+ |person|[JOHN, SAM, JANE] | |pet |[WHISKERS, ROVER, FIDO]| +------+-----------------------+

Teniendo en cuenta el alto costo de la expresión explode + collect_list , este enfoque se prefiere casi exclusivamente, a pesar de su costo intrínseco.

En Spark 2.4 o posterior puedes usar transform * con upper (ver SPARK-23909 ):

from pyspark.sql.functions import expr df.withColumn( ''names'', expr(''transform(names, x -> upper(x))'') ).show(truncate=False)

+------+-----------------------+ |type |names | +------+-----------------------+ |person|[JOHN, SAM, JANE] | |pet |[WHISKERS, ROVER, FIDO]| +------+-----------------------+

También es posible usar pandas_udf

from pyspark.sql.functions import pandas_udf, PandasUDFType def transform_pandas(f, t=StringType()): if not isinstance(t, DataType): raise TypeError("Invalid type {}".format(type(t))) @pandas_udf(ArrayType(t), PandasUDFType.SCALAR) def _(xs): return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs) return _ foo_udf_pandas = transform_pandas(str.upper) df.withColumn(''names'', foo_udf(f.col(''names''))).show(truncate=False)

+------+-----------------------+ |type |names | +------+-----------------------+ |person|[JOHN, SAM, JANE] | |pet |[WHISKERS, ROVER, FIDO]| +------+-----------------------+

aunque solo las últimas combinaciones Arrow / PySpark admiten el manejo de columnas ArrayType ( SPARK-24259 , SPARK-21187 ). No obstante, esta opción debería ser más eficiente que la UDF estándar (especialmente con una sobrecarga de servidor menor) al tiempo que admite funciones arbitrarias de Python.

* También se admiten otras funciones de orden superior , que incluyen, entre otras, filter y aggregate . Ver por ejemplo

  • Consultar Spark SQL DataFrame con tipos complejos
  • ¿Cómo cortar y sumar elementos de la columna de matriz?
  • Filtrar contenido de columna de matriz
  • Spark Scala promedio en filas por manejo nulo .
  • ¿Cómo usar transformar la función de orden superior? .

Sí, puede hacerlo convirtiéndolo a RDD y luego nuevamente a DF.

>>> df.show(truncate=False) +------+-----------------------+ |type |names | +------+-----------------------+ |person|[john, sam, jane] | |pet |[whiskers, rover, fido]| +------+-----------------------+ >>> df.rdd.mapValues(lambda x: [y.upper() for y in x]).toDF(["type","names"]).show(truncate=False) +------+-----------------------+ |type |names | +------+-----------------------+ |person|[JOHN, SAM, JANE] | |pet |[WHISKERS, ROVER, FIDO]| +------+-----------------------+