arrays - Spark 2.0.x vuelca un archivo csv desde un marco de datos que contiene una matriz de tipo cadena
apache-spark (5)
Aquí hay un método para convertir todas las ArrayType
(de cualquier tipo subyacente) de un DataFrame
en columnas StringType
:
def stringifyArrays(dataFrame: DataFrame): DataFrame = {
val colsToStringify = dataFrame.schema.filter(p => p.dataType.typeName == "array").map(p => p.name)
colsToStringify.foldLeft(dataFrame)((df, c) => {
df.withColumn(c, concat(lit("["), concat_ws(", ", col(c).cast("array<string>")), lit("]")))
})
}
Tampoco usa un UDF.
Tengo un df
marco de datos que contiene una columna de tipo array
df.show()
parece a
|ID|ArrayOfString|Age|Gender|
+--+-------------+---+------+
|1 | [A,B,D] |22 | F |
|2 | [A,Y] |42 | M |
|3 | [X] |60 | F |
+--+-------------+---+------+
Intento volcar ese df
en un archivo csv como sigue:
val dumpCSV = df.write.csv(path="/home/me/saveDF")
No está funcionando debido a la columna ArrayOfString
. Me sale el error:
El origen de datos CSV no admite el tipo de datos de cadena de matriz
El código funciona si ArrayOfString
la columna ArrayOfString
. ¡Pero necesito mantener ArrayOfString
!
¿Cuál sería la mejor manera de volcar el marco de datos csv, incluida la columna ArrayOfString (ArrayOfString debe volcarse como una columna en el archivo CSV)?
CSV no es el formato de exportación ideal, pero si solo desea inspeccionar visualmente sus datos, esto funcionará [Scala]. Solución rápida y sucia.
case class example ( id: String, ArrayOfString: String, Age: String, Gender: String)
df.rdd.map{line => example(line(0).toString, line(1).toString, line(2).toString , line(3).toString) }.toDF.write.csv("/tmp/example.csv")
Implementación Pyspark:
En este ejemplo, cambie el campo column_as_array
a column_as_string
antes de guardar.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def array_to_string(my_list):
return ''['' + '',''.join([str(elem) for elem in my_list]) + '']''
array_to_string_udf = udf(array_to_string,StringType())
df = df.withColumn(''column_as_str'',array_to_string_udf(d["column_as_array"]))
Luego puede eliminar la columna anterior (tipo de matriz) antes de guardar.
df.drop("column_as_array").write.csv(...)
La razón por la que está recibiendo este error es que el formato de archivo csv no admite tipos de matriz, tendrá que expresarlo como una cadena para poder guardar.
Intente lo siguiente:
import org.apache.spark.sql.functions._
val stringify = udf((vs: Seq[String]) => vs match {
case null => null
case _ => s"""[${vs.mkString(",")}]"""
})
df.withColumn("ArrayOfString", stringify($"ArrayOfString")).write.csv(...)
o
import org.apache.spark.sql.Column
def stringify(c: Column) = concat(lit("["), concat_ws(",", c), lit("]"))
df.withColumn("ArrayOfString", stringify($"ArrayOfString")).write.csv(...)
No es necesario un UDF si ya sabe qué campos contienen matrices. Simplemente puedes usar la función de lanzamiento de Spark:
val dumpCSV = df.withColumn("ArrayOfString", lit("ArrayOfString).cast("string"))
.write
.csv(path="/home/me/saveDF"
)
Espero que ayude.