san resumen proposito por mateo marcos lucas juan explicado evangelio capitulos apache-spark apache-spark-sql spark-dataframe

apache spark - resumen - Sobrescribir particiones específicas en el método de escritura de trama de datos de chispa



evangelio de san marcos resumen por capitulos (13)

¡Finalmente! Esta es ahora una característica en Spark 2.3.0: https://issues.apache.org/jira/browse/SPARK-20236

Para usarlo, debe establecer la configuración spark.sql.sources.partitionOverwriteMode en dinámica, el conjunto de datos debe particionarse y el modo de escritura sobrescribir . Ejemplo:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") data.write.mode("overwrite").insertInto("partitioned_table")

Recomiendo hacer una partición basada en su columna de partición antes de escribir, para que no termine con 400 archivos por carpeta.

Antes de Spark 2.3.0, la mejor solución sería lanzar sentencias SQL para eliminar esas particiones y luego escribirlas con mode append.

Quiero sobrescribir particiones específicas en lugar de todas en chispa. Estoy intentando el siguiente comando:

df.write.orc(''maprfs:///hdfs-base-path'',''overwrite'',partitionBy=''col4'')

donde df es un marco de datos que tiene los datos incrementales que se sobrescribirán.

hdfs-base-path contiene los datos maestros.

Cuando pruebo el comando anterior, elimina todas las particiones e inserta las presentes en df en la ruta hdfs.

Mi requisito es sobrescribir solo aquellas particiones presentes en df en la ruta hdfs especificada. ¿Puede alguien ayudarme en esto?


Agregar el parámetro ''overwrite = True'' en la instrucción insertInto resuelve esto:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") data.write.insertInto("partitioned_table", overwrite=True)

Por defecto overwrite=False . Cambiarlo a True nos permite sobrescribir particiones específicas contenidas en df y en la tabla_particionada. Esto nos ayuda a evitar sobrescribir todo el contenido de partioned_table con df .


Como jatin Wrote puede eliminar las particiones de la colmena y de la ruta y luego agregar datos Como estaba perdiendo demasiado tiempo con él, agregué el siguiente ejemplo para otros usuarios de spark. Usé Scala con chispa 2.2.1

import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession} case class DataExample(partition1: Int, partition2: String, someTest: String, id: Int) object Example extends App { //Prepare spark & Data val sparkConf = new SparkConf() sparkConf.setMaster(s"local[2]") val spark = SparkSession.builder().config(sparkConf).getOrCreate() val tableName = "my_table" val partitions1 = List(1, 2) val partitions2 = List("e1", "e2") val partitionColumns = List("partition1", "partition2") val myTablePath = "/tmp/some_example" val someText = List("text1", "text2") val ids = (0 until 5).toList val listData = partitions1.flatMap(p1 => { partitions2.flatMap(p2 => { someText.flatMap( text => { ids.map( id => DataExample(p1, p2, text, id) ) } ) } ) }) val asDataFrame = spark.createDataFrame(listData) //Delete path function def deletePath(path: String, recursive: Boolean): Unit = { val p = new Path(path) val fs = p.getFileSystem(new Configuration()) fs.delete(p, recursive) } def tableOverwrite(df: DataFrame, partitions: List[String], path: String): Unit = { if (spark.catalog.tableExists(tableName)) { //clean partitions val asColumns = partitions.map(c => new Column(c)) val relevantPartitions = df.select(asColumns: _*).distinct().collect() val partitionToRemove = relevantPartitions.map(row => { val fields = row.schema.fields s"ALTER TABLE ${tableName} DROP IF EXISTS PARTITION " + s"${fields.map(field => s"${field.name}=''${row.getAs(field.name)}''").mkString("(", ",", ")")} PURGE" }) val cleanFolders = relevantPartitions.map(partition => { val fields = partition.schema.fields path + fields.map(f => s"${f.name}=${partition.getAs(f.name)}").mkString("/") }) println(s"Going to clean ${partitionToRemove.size} partitions") partitionToRemove.foreach(partition => spark.sqlContext.sql(partition)) cleanFolders.foreach(partition => deletePath(partition, true)) } asDataFrame.write .options(Map("path" -> myTablePath)) .mode(SaveMode.Append) .partitionBy(partitionColumns: _*) .saveAsTable(tableName) } //Now test tableOverwrite(asDataFrame, partitionColumns, tableName) spark.sqlContext.sql(s"select * from $tableName").show(1000) tableOverwrite(asDataFrame, partitionColumns, tableName) import spark.implicits._ val asLocalSet = spark.sqlContext.sql(s"select * from $tableName").as[DataExample].collect().toSet if (asLocalSet == listData.toSet) { println("Overwrite is working !!!") }

}


En lugar de escribir directamente en la tabla de destino, le sugiero que cree una tabla temporal como la tabla de destino e inserte sus datos allí.

CREATE TABLE tmpTbl LIKE trgtTbl LOCATION ''<tmpLocation'';

Una vez que se crea la tabla, debe escribir sus datos en tmpLocation

df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)

Luego recuperaría las rutas de partición de la tabla ejecutando:

MSCK REPAIR TABLE tmpTbl;

Obtenga las rutas de partición consultando los metadatos de Hive como:

SHOW PARTITONS tmpTbl;

Elimine estas particiones de trgtTbl y mueva los directorios de tmpTbl a trgtTbl


Este es un problema común. La única solución con Spark hasta 2.0 es escribir directamente en el directorio de partición, por ejemplo,

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")

Si usa Spark antes de 2.0, deberá evitar que Spark emita archivos de metadatos (porque romperán el descubrimiento automático de particiones) usando:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

Si usa Spark antes de 1.6.2, también necesitará eliminar el archivo _SUCCESS en /root/path/to/data/partition_col=value o su presencia interrumpirá el descubrimiento automático de la partición. (Recomiendo usar 1.6.2 o posterior).

Puede obtener algunos detalles más sobre cómo administrar grandes tablas particionadas en mi charla de Spark Summit sobre Bulletproof Jobs .


Intenté el siguiente enfoque para sobrescribir una partición particular en la tabla HIVE.

### load Data and check records raw_df = spark.table("test.original") raw_df.count() lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925 ### Check data in few partitions. sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag") print "Number of records: ", sample.count() sample.show() ### Back-up the partitions before deletion raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite") ### UDF : To delete particular partition. def delete_part(table, part): qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")" spark.sql(qry) ### Delete partitions part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct() part_list = part_df.rdd.map(lambda x : x[0]).collect() table = "test.original" for p in part_list: delete_part(table, p) ### Do the required Changes to the columns in partitions df = spark.table("test.original_bkp") newdf = df.withColumn("c_preferred_cust_flag", lit("Y")) newdf.select("c_customer_sk", "c_preferred_cust_flag").show() ### Write the Partitions back to Original table newdf.write.insertInto("test.original") ### Verify data in Original table orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show() Hope it helps. Regards, Neeraj


Para> = Spark 2.3.0:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") data.write.insertInto("partitioned_table", overwrite=True)


Podría hacer algo como esto para hacer que el trabajo sea reentrante (idempotente): (probé esto en la chispa 2.2)

import scala.sys.process._ def deletePath(path: String): Unit = { s"hdfs dfs -rm -r -skipTrash $path".! } df.select(partitionColumn).distinct.collect().foreach(p => { val partition = p.getAs[String](partitionColumn) deletePath(s"$path/$partitionColumn=$partition") }) df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)


Probé esto en Spark 2.3.1 con Scala. La mayoría de las respuestas anteriores están escribiendo en una tabla de Hive. Sin embargo, quería escribir directamente en el disco , que tiene una external hive table en la parte superior de esta carpeta.

Primero la configuración requerida

DataFrame .write .format("<required file format>") .partitionBy("<partitioned column name>") .mode(SaveMode.Overwrite) // This is required. .save(s"<path_to_root_folder>")

Uso aquí:

hiveContext.setConf("hive.exec.dynamic.partition", "true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)


Si usa DataFrame, posiblemente quiera usar la tabla Hive sobre los datos. En este caso solo necesita el método de llamada

df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)

Sobrescribirá las particiones que contiene DataFrame.

No es necesario especificar el formato (orc), porque Spark usará el formato de tabla Hive.

Funciona bien en Spark versión 1.6


Te sugiero que hagas la limpieza y luego escribas nuevas particiones con el modo Append :

sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")

Esto eliminará solo las particiones nuevas. Después de escribir datos, ejecute este comando si necesita actualizar metastore:

val sparkSession: SparkSession = SparkSession .builder .enableHiveSupport() .config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder .appName("spark_write_to_dynamic_partition_folders")

Nota: deletePath supone que el comando hfds está disponible en su sistema.


Usando Spark 1.6 ...

El HiveContext puede simplificar mucho este proceso. La clave es que primero debe crear la tabla en Hive usando una instrucción CREATE EXTERNAL TABLE con partición definida. Por ejemplo:

# Hive SQL CREATE EXTERNAL TABLE test (name STRING) PARTITIONED BY (age INT) STORED AS PARQUET LOCATION ''hdfs:///tmp/tables/test''

A partir de aquí, supongamos que tiene un marco de datos con nuevos registros para una partición específica (o varias particiones). Puede usar una instrucción SQL HiveContext para realizar una INSERT OVERWRITE usando este Dataframe, que sobrescribirá la tabla solo para las particiones contenidas en el Dataframe:

# PySpark hiveContext = HiveContext(sc) update_dataframe.registerTempTable(''update_dataframe'') hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age) SELECT name, age FROM update_dataframe""")

Nota: update_dataframe en este ejemplo tiene un esquema que coincide con el de la tabla de test destino.

Un error fácil de cometer con este enfoque es omitir el paso CREATE EXTERNAL TABLE en Hive y simplemente hacer la tabla usando los métodos de escritura de la API Dataframe. Para las tablas basadas en Parquet en particular, la tabla no se definirá adecuadamente para admitir la función INSERT OVERWRITE... PARTITION Hive.

Espero que esto ayude.


# drop the partition drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col=''{val}'')".format(val=target_partition) print drop_query spark.sql(drop_query) # delete directory dbutils.fs.rm(<partition_directoy>,recurse=True) # Load the partition df.write/ .partitionBy("partition_col")/ .saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)

Esto funciona para mí en trabajos de AWS Glue ETL (Glue 1.0 - Spark 2.4 - Python 2)