apache spark - resumen - Sobrescribir particiones específicas en el método de escritura de trama de datos de chispa
evangelio de san marcos resumen por capitulos (13)
¡Finalmente! Esta es ahora una característica en Spark 2.3.0: https://issues.apache.org/jira/browse/SPARK-20236
Para usarlo, debe establecer la configuración spark.sql.sources.partitionOverwriteMode en dinámica, el conjunto de datos debe particionarse y el modo de escritura sobrescribir . Ejemplo:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
Recomiendo hacer una partición basada en su columna de partición antes de escribir, para que no termine con 400 archivos por carpeta.
Antes de Spark 2.3.0, la mejor solución sería lanzar sentencias SQL para eliminar esas particiones y luego escribirlas con mode append.
Quiero sobrescribir particiones específicas en lugar de todas en chispa. Estoy intentando el siguiente comando:
df.write.orc(''maprfs:///hdfs-base-path'',''overwrite'',partitionBy=''col4'')
donde df es un marco de datos que tiene los datos incrementales que se sobrescribirán.
hdfs-base-path contiene los datos maestros.
Cuando pruebo el comando anterior, elimina todas las particiones e inserta las presentes en df en la ruta hdfs.
Mi requisito es sobrescribir solo aquellas particiones presentes en df en la ruta hdfs especificada. ¿Puede alguien ayudarme en esto?
Agregar el parámetro ''overwrite = True'' en la instrucción insertInto resuelve esto:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.insertInto("partitioned_table", overwrite=True)
Por defecto
overwrite=False
.
Cambiarlo a
True
nos permite sobrescribir particiones específicas contenidas en
df
y en la tabla_particionada.
Esto nos ayuda a evitar sobrescribir todo el contenido de partioned_table con
df
.
Como jatin Wrote puede eliminar las particiones de la colmena y de la ruta y luego agregar datos Como estaba perdiendo demasiado tiempo con él, agregué el siguiente ejemplo para otros usuarios de spark. Usé Scala con chispa 2.2.1
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
case class DataExample(partition1: Int, partition2: String, someTest: String, id: Int)
object Example extends App {
//Prepare spark & Data
val sparkConf = new SparkConf()
sparkConf.setMaster(s"local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val tableName = "my_table"
val partitions1 = List(1, 2)
val partitions2 = List("e1", "e2")
val partitionColumns = List("partition1", "partition2")
val myTablePath = "/tmp/some_example"
val someText = List("text1", "text2")
val ids = (0 until 5).toList
val listData = partitions1.flatMap(p1 => {
partitions2.flatMap(p2 => {
someText.flatMap(
text => {
ids.map(
id => DataExample(p1, p2, text, id)
)
}
)
}
)
})
val asDataFrame = spark.createDataFrame(listData)
//Delete path function
def deletePath(path: String, recursive: Boolean): Unit = {
val p = new Path(path)
val fs = p.getFileSystem(new Configuration())
fs.delete(p, recursive)
}
def tableOverwrite(df: DataFrame, partitions: List[String], path: String): Unit = {
if (spark.catalog.tableExists(tableName)) {
//clean partitions
val asColumns = partitions.map(c => new Column(c))
val relevantPartitions = df.select(asColumns: _*).distinct().collect()
val partitionToRemove = relevantPartitions.map(row => {
val fields = row.schema.fields
s"ALTER TABLE ${tableName} DROP IF EXISTS PARTITION " +
s"${fields.map(field => s"${field.name}=''${row.getAs(field.name)}''").mkString("(", ",", ")")} PURGE"
})
val cleanFolders = relevantPartitions.map(partition => {
val fields = partition.schema.fields
path + fields.map(f => s"${f.name}=${partition.getAs(f.name)}").mkString("/")
})
println(s"Going to clean ${partitionToRemove.size} partitions")
partitionToRemove.foreach(partition => spark.sqlContext.sql(partition))
cleanFolders.foreach(partition => deletePath(partition, true))
}
asDataFrame.write
.options(Map("path" -> myTablePath))
.mode(SaveMode.Append)
.partitionBy(partitionColumns: _*)
.saveAsTable(tableName)
}
//Now test
tableOverwrite(asDataFrame, partitionColumns, tableName)
spark.sqlContext.sql(s"select * from $tableName").show(1000)
tableOverwrite(asDataFrame, partitionColumns, tableName)
import spark.implicits._
val asLocalSet = spark.sqlContext.sql(s"select * from $tableName").as[DataExample].collect().toSet
if (asLocalSet == listData.toSet) {
println("Overwrite is working !!!")
}
}
En lugar de escribir directamente en la tabla de destino, le sugiero que cree una tabla temporal como la tabla de destino e inserte sus datos allí.
CREATE TABLE tmpTbl LIKE trgtTbl LOCATION ''<tmpLocation'';
Una vez que se crea la tabla, debe escribir sus datos en
tmpLocation
df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)
Luego recuperaría las rutas de partición de la tabla ejecutando:
MSCK REPAIR TABLE tmpTbl;
Obtenga las rutas de partición consultando los metadatos de Hive como:
SHOW PARTITONS tmpTbl;
Elimine estas particiones de
trgtTbl
y mueva los directorios de
tmpTbl
a
trgtTbl
Este es un problema común. La única solución con Spark hasta 2.0 es escribir directamente en el directorio de partición, por ejemplo,
df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
Si usa Spark antes de 2.0, deberá evitar que Spark emita archivos de metadatos (porque romperán el descubrimiento automático de particiones) usando:
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
Si usa Spark antes de 1.6.2, también necesitará eliminar el archivo
_SUCCESS
en
/root/path/to/data/partition_col=value
o su presencia interrumpirá el descubrimiento automático de la partición.
(Recomiendo usar 1.6.2 o posterior).
Puede obtener algunos detalles más sobre cómo administrar grandes tablas particionadas en mi charla de Spark Summit sobre Bulletproof Jobs .
Intenté el siguiente enfoque para sobrescribir una partición particular en la tabla HIVE.
### load Data and check records
raw_df = spark.table("test.original")
raw_df.count()
lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925
### Check data in few partitions.
sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
print "Number of records: ", sample.count()
sample.show()
### Back-up the partitions before deletion
raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")
### UDF : To delete particular partition.
def delete_part(table, part):
qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
spark.sql(qry)
### Delete partitions
part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
part_list = part_df.rdd.map(lambda x : x[0]).collect()
table = "test.original"
for p in part_list:
delete_part(table, p)
### Do the required Changes to the columns in partitions
df = spark.table("test.original_bkp")
newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
newdf.select("c_customer_sk", "c_preferred_cust_flag").show()
### Write the Partitions back to Original table
newdf.write.insertInto("test.original")
### Verify data in Original table
orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()
Hope it helps.
Regards,
Neeraj
Para> = Spark 2.3.0:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") data.write.insertInto("partitioned_table", overwrite=True)
Podría hacer algo como esto para hacer que el trabajo sea reentrante (idempotente): (probé esto en la chispa 2.2)
import scala.sys.process._
def deletePath(path: String): Unit = {
s"hdfs dfs -rm -r -skipTrash $path".!
}
df.select(partitionColumn).distinct.collect().foreach(p => {
val partition = p.getAs[String](partitionColumn)
deletePath(s"$path/$partitionColumn=$partition")
})
df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)
Probé esto en Spark 2.3.1 con Scala.
La mayoría de las respuestas anteriores están escribiendo en una tabla de Hive.
Sin embargo, quería escribir directamente en el
disco
, que tiene una
external hive table
en la parte superior de esta carpeta.
Primero la configuración requerida
DataFrame
.write
.format("<required file format>")
.partitionBy("<partitioned column name>")
.mode(SaveMode.Overwrite) // This is required.
.save(s"<path_to_root_folder>")
Uso aquí:
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)
Si usa DataFrame, posiblemente quiera usar la tabla Hive sobre los datos. En este caso solo necesita el método de llamada
df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)
Sobrescribirá las particiones que contiene DataFrame.
No es necesario especificar el formato (orc), porque Spark usará el formato de tabla Hive.
Funciona bien en Spark versión 1.6
Te sugiero que hagas la limpieza y luego escribas nuevas particiones con el modo
Append
:
sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")
Esto eliminará solo las particiones nuevas. Después de escribir datos, ejecute este comando si necesita actualizar metastore:
val sparkSession: SparkSession = SparkSession
.builder
.enableHiveSupport()
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder
.appName("spark_write_to_dynamic_partition_folders")
Nota:
deletePath
supone que el comando
hfds
está disponible en su sistema.
Usando Spark 1.6 ...
El HiveContext puede simplificar mucho este proceso.
La clave es que primero debe crear la tabla en Hive usando una instrucción
CREATE EXTERNAL TABLE
con partición definida.
Por ejemplo:
# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION ''hdfs:///tmp/tables/test''
A partir de aquí, supongamos que tiene un marco de datos con nuevos registros para una partición específica (o varias particiones).
Puede usar una instrucción SQL HiveContext para realizar una
INSERT OVERWRITE
usando este Dataframe, que sobrescribirá la tabla solo para las particiones contenidas en el Dataframe:
# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable(''update_dataframe'')
hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
SELECT name, age
FROM update_dataframe""")
Nota:
update_dataframe
en este ejemplo tiene un esquema que coincide con el de la tabla de
test
destino.
Un error fácil de cometer con este enfoque es omitir el paso
CREATE EXTERNAL TABLE
en Hive y simplemente hacer la tabla usando los métodos de escritura de la API Dataframe.
Para las tablas basadas en Parquet en particular, la tabla no se definirá adecuadamente para admitir la función
INSERT OVERWRITE... PARTITION
Hive.
Espero que esto ayude.
# drop the partition
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col=''{val}'')".format(val=target_partition)
print drop_query
spark.sql(drop_query)
# delete directory
dbutils.fs.rm(<partition_directoy>,recurse=True)
# Load the partition
df.write/
.partitionBy("partition_col")/
.saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)
Esto funciona para mí en trabajos de AWS Glue ETL (Glue 1.0 - Spark 2.4 - Python 2)