apache spark - TypeError: la columna no es iterable-¿Cómo iterar sobre ArrayType()?
apache-spark pyspark (2)
Considere el siguiente DataFrame:
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
Que se puede crear con el siguiente código:
import pyspark.sql.functions as f
data = [
(''person'', [''john'', ''sam'', ''jane'']),
(''pet'', [''whiskers'', ''rover'', ''fido''])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
¿Hay alguna manera de modificar directamente los
"names"
columna
ArrayType()
aplicando una función a cada elemento, sin usar un
udf
?
Por ejemplo, supongamos que quisiera aplicar la función
foo
a la columna
"names"
.
(
str.upper
el ejemplo donde
foo
es
str.upper
solo con fines ilustrativos, pero mi pregunta se refiere a cualquier función válida que pueda aplicarse a los elementos de un iterable).
foo = lambda x: x.upper() # defining it as str.upper as an example
df.withColumn(''X'', [foo(x) for x in f.col("names")]).show()
TypeError: la columna no es iterable
Podría hacer esto usando un
udf
:
foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn(''names'', foo_udf(f.col(''names''))).show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
En este ejemplo específico,
podría
evitar el
udf
al explotar la columna, llamar a
pyspark.sql.functions.upper()
y luego
groupBy
y
collect_list
:
df.select(''type'', f.explode(''names'').alias(''name''))/
.withColumn(''name'', f.upper(f.col(''name'')))/
.groupBy(''type'')/
.agg(f.collect_list(''name'').alias(''names''))/
.show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
Pero esto es mucho código para hacer algo simple.
¿Existe una forma más directa de iterar sobre los elementos de un
ArrayType()
utilizando funciones de marco de datos de chispa?
En Spark <2.4 puede usar una función definida por el usuario:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType
def transform(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@udf(ArrayType(t))
def _(xs):
if xs is not None:
return [f(x) for x in xs]
return _
foo_udf = transform(str.upper)
df.withColumn(''names'', foo_udf(f.col(''names''))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
Teniendo en cuenta el alto costo de la expresión
explode
+
collect_list
, este enfoque se prefiere casi exclusivamente, a pesar de su costo intrínseco.
En
Spark 2.4
o posterior puedes usar
transform
* con
upper
(ver
SPARK-23909
):
from pyspark.sql.functions import expr
df.withColumn(
''names'', expr(''transform(names, x -> upper(x))'')
).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
También es posible usar
pandas_udf
from pyspark.sql.functions import pandas_udf, PandasUDFType
def transform_pandas(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
def _(xs):
return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
return _
foo_udf_pandas = transform_pandas(str.upper)
df.withColumn(''names'', foo_udf(f.col(''names''))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
aunque solo las últimas combinaciones Arrow / PySpark admiten el manejo de columnas
ArrayType
(
SPARK-24259
,
SPARK-21187
).
No obstante, esta opción debería ser más eficiente que la UDF estándar (especialmente con una sobrecarga de servidor menor) al tiempo que admite funciones arbitrarias de Python.
*
También se admiten otras funciones de orden superior
, que incluyen, entre otras,
filter
y
aggregate
.
Ver por ejemplo
- Consultar Spark SQL DataFrame con tipos complejos
- ¿Cómo cortar y sumar elementos de la columna de matriz?
- Filtrar contenido de columna de matriz
- Spark Scala promedio en filas por manejo nulo .
- ¿Cómo usar transformar la función de orden superior? .
Sí, puede hacerlo convirtiéndolo a RDD y luego nuevamente a DF.
>>> df.show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
>>> df.rdd.mapValues(lambda x: [y.upper() for y in x]).toDF(["type","names"]).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+