arrays csv apache-spark

arrays - Spark 2.0.x vuelca un archivo csv desde un marco de datos que contiene una matriz de tipo cadena



apache-spark (5)

Aquí hay un método para convertir todas las ArrayType (de cualquier tipo subyacente) de un DataFrame en columnas StringType :

def stringifyArrays(dataFrame: DataFrame): DataFrame = { val colsToStringify = dataFrame.schema.filter(p => p.dataType.typeName == "array").map(p => p.name) colsToStringify.foldLeft(dataFrame)((df, c) => { df.withColumn(c, concat(lit("["), concat_ws(", ", col(c).cast("array<string>")), lit("]"))) }) }

Tampoco usa un UDF.

Tengo un df marco de datos que contiene una columna de tipo array

df.show() parece a

|ID|ArrayOfString|Age|Gender| +--+-------------+---+------+ |1 | [A,B,D] |22 | F | |2 | [A,Y] |42 | M | |3 | [X] |60 | F | +--+-------------+---+------+

Intento volcar ese df en un archivo csv como sigue:

val dumpCSV = df.write.csv(path="/home/me/saveDF")

No está funcionando debido a la columna ArrayOfString . Me sale el error:

El origen de datos CSV no admite el tipo de datos de cadena de matriz

El código funciona si ArrayOfString la columna ArrayOfString . ¡Pero necesito mantener ArrayOfString !

¿Cuál sería la mejor manera de volcar el marco de datos csv, incluida la columna ArrayOfString (ArrayOfString debe volcarse como una columna en el archivo CSV)?


CSV no es el formato de exportación ideal, pero si solo desea inspeccionar visualmente sus datos, esto funcionará [Scala]. Solución rápida y sucia.

case class example ( id: String, ArrayOfString: String, Age: String, Gender: String) df.rdd.map{line => example(line(0).toString, line(1).toString, line(2).toString , line(3).toString) }.toDF.write.csv("/tmp/example.csv")


Implementación Pyspark:

En este ejemplo, cambie el campo column_as_array a column_as_string antes de guardar.

from pyspark.sql.functions import udf from pyspark.sql.types import StringType def array_to_string(my_list): return ''['' + '',''.join([str(elem) for elem in my_list]) + '']'' array_to_string_udf = udf(array_to_string,StringType()) df = df.withColumn(''column_as_str'',array_to_string_udf(d["column_as_array"]))

Luego puede eliminar la columna anterior (tipo de matriz) antes de guardar.

df.drop("column_as_array").write.csv(...)


La razón por la que está recibiendo este error es que el formato de archivo csv no admite tipos de matriz, tendrá que expresarlo como una cadena para poder guardar.

Intente lo siguiente:

import org.apache.spark.sql.functions._ val stringify = udf((vs: Seq[String]) => vs match { case null => null case _ => s"""[${vs.mkString(",")}]""" }) df.withColumn("ArrayOfString", stringify($"ArrayOfString")).write.csv(...)

o

import org.apache.spark.sql.Column def stringify(c: Column) = concat(lit("["), concat_ws(",", c), lit("]")) df.withColumn("ArrayOfString", stringify($"ArrayOfString")).write.csv(...)


No es necesario un UDF si ya sabe qué campos contienen matrices. Simplemente puedes usar la función de lanzamiento de Spark:

val dumpCSV = df.withColumn("ArrayOfString", lit("ArrayOfString).cast("string")) .write .csv(path="/home/me/saveDF" )

Espero que ayude.