apache spark - ¿Cómo particionar y escribir DataFrame en Spark sin eliminar particiones sin datos nuevos?
apache-spark spark-dataframe (2)
Estoy tratando de guardar un DataFrame
en HDFS en formato Parquet usando DataFrameWriter
, dividido en tres valores de columna, como este:
dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
Como se mencionó en esta pregunta , partitionBy
eliminará toda la jerarquía existente de particiones en la path
y las reemplazará con las particiones en dataFrame
. Como los nuevos datos incrementales para un día en particular vendrán periódicamente, lo que quiero es reemplazar solo las particiones en la jerarquía para la cual dataFrame
tiene datos, dejando las otras sin tocar.
Para hacer esto, parece que necesito guardar cada partición individualmente usando su ruta completa, algo como esto:
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
Sin embargo, tengo problemas para entender la mejor manera de organizar los datos en DataFrame
sola partición para poder escribirlos utilizando su ruta completa. Una idea fue algo como:
dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
Pero foreachPartition
opera en un Iterator[Row]
que no es ideal para escribir en formato Parquet.
También consideré usar una fecha de select...distinct eventdate, hour, processtime
para obtener la lista de particiones, y luego filtrar el marco de datos original por cada una de esas particiones y guardar los resultados en su ruta particionada completa. Pero la consulta distinta más un filtro para cada partición no parece muy eficiente ya que sería una gran cantidad de operaciones de filtro / escritura.
Espero que haya una forma más limpia de preservar las particiones existentes para las que dataFrame
no tiene datos.
Gracias por leer.
Versión Spark: 2.1
La opción de modo Append
tiene un retén!
df.write.partitionBy("y","m","d")
.mode(SaveMode.Append)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName)
He probado y vi que esto mantendrá los archivos de partición existentes. Sin embargo, el problema esta vez es el siguiente: si ejecuta el mismo código dos veces (con los mismos datos), creará nuevos archivos de parquet en lugar de reemplazar los existentes por los mismos datos (Spark 1.6). Entonces, en lugar de utilizar Append
, aún podemos resolver este problema con Overwrite
. En lugar de sobrescribir en el nivel de tabla, debemos sobrescribir en el nivel de partición.
df.write.mode(SaveMode.Overwrite)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day)
Vea el siguiente enlace para más información:
Sobrescriba particiones específicas en el método de escritura de marcos de datos de chispa
(He actualizado mi respuesta después del comentario de suriyanto. Thnx.)
Sé que esto es muy viejo. Como no puedo ver ninguna solución publicada, seguiré adelante y publicaré una. Este enfoque asume que tiene una tabla hive sobre el directorio en el que desea escribir. Una forma de lidiar con este problema es crear una vista temporal desde dataFrame
que se debe agregar a la tabla y luego usar la insert overwrite table ...
tipo colmena normal insert overwrite table ...
comando:
dataFrame.createOrReplaceTempView("temp_view")
spark.sql("insert overwrite table table_name partition (''eventdate'', ''hour'', ''processtime'')select * from temp_view")
Conserva las particiones antiguas mientras escribe (sobrescribe) solo en particiones nuevas.