python - Cambiar el nombre del campo anidado en el marco de datos de chispa
apache-spark dataframe (1)
Pitón
No es posible modificar un solo campo anidado.
Tienes que recrear una estructura completa.
En este caso particular, la solución más simple es usar
cast
.
Primero un montón de importaciones:
from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
ArrayType, LongType, StringType, StructField, StructType)
y datos de ejemplo:
Record = namedtuple("Record", ["a", "b", "c"])
df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])
Confirmemos que el esquema es el mismo que en su caso:
df.printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
Puede definir un nuevo esquema, por ejemplo, como una cadena:
str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select(col("array_field").cast(str_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
o un
DataType
:
struct_schema = ArrayType(StructType([
StructField("a_renamed", StringType()),
StructField("b", LongType()),
StructField("c", LongType())
]))
df.select(col("array_field").cast(struct_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
Scala
Las mismas técnicas se pueden usar en Scala:
case class Record(a: String, b: Long, c: Long)
val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")
val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select($"array_field".cast(strSchema))
o
import org.apache.spark.sql.types._
val structSchema = ArrayType(StructType(Seq(
StructField("a_renamed", StringType),
StructField("b", LongType),
StructField("c", LongType)
)))
df.select($"array_field".cast(structSchema))
Posibles mejoras :
Si utiliza una manipulación de datos expresiva o una biblioteca de procesamiento JSON, podría ser más fácil volcar los tipos de datos en
dict
o cadena JSON y tomarlos desde allí, por ejemplo (Python /
toolz
):
from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter
# Update name to "a_updated" if name is "a"
rename_field = update_in(
keys=["name"], func=lambda x: "a_updated" if x == "a" else x)
updated_schema = pipe(
# Get schema of the field as a dict
df.schema["array_field"].jsonValue(),
# Update fields with rename
update_in(
keys=["type", "elementType", "fields"],
func=lambda x: pipe(x, map(rename_field), list)),
# Load schema from dict
StructField.fromJson,
# Get data type
attrgetter("dataType"))
df.select(col("array_field").cast(updated_schema)).printSchema()
Tener un marco de datos
df
en Spark:
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
¿Cómo cambiar el nombre del campo
array_field.a
a
array_field.a_renamed
?
[Actualizar]:
.withColumnRenamed()
no funciona con campos anidados, así que probé este método hacky e inseguro:
# First alter the schema:
schema = df.schema
schema[''array_field''].dataType.elementType[''a''].name = ''a_renamed''
ind = schema[''array_field''].dataType.elementType.names.index(''a'')
schema[''array_field''].dataType.elementType.names[ind] = ''a_renamed''
# Then set dataframe''s schema with altered schema
df._schema = schema
Sé que establecer un atributo privado no es una buena práctica, pero no conozco otra forma de configurar el esquema para df
Creo que estoy en el camino correcto, pero
df.printSchema()
todavía muestra el nombre anterior de
array_field.a
, aunque
df.schema == schema
es
True