apache spark - examples - Cómo sobrescribir el directorio de salida en spark
apache spark wikipedia (8)
ACTUALIZACIÓN: Sugerir el uso de
Dataframes
, más algo como
... .write.mode(SaveMode.Overwrite) ...
Para versiones anteriores intente
yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)
En 1.1.0 puede establecer configuraciones conf usando el script spark-submit con la bandera --conf.
ADVERTENCIA (versiones anteriores): según @piggybox, hay un error en Spark en el que solo sobrescribirá los archivos que necesita para escribir sus archivos
part-
, cualquier otro archivo quedará sin eliminar.
Tengo una aplicación de transmisión por chispa que produce un conjunto de datos por cada minuto. Necesito guardar / sobrescribir los resultados de los datos procesados.
Cuando intenté sobrescribir el conjunto de datos org.apache.hadoop.mapred.FileAlreadyExistsException detiene la ejecución.
Establecí el conjunto de propiedades Spark
set("spark.files.overwrite","true")
, pero no hay suerte.
¿Cómo sobrescribir o anteponer los archivos de la chispa?
De la documentación de
pyspark.sql.DataFrame.save
(actualmente en 1.3.1), puede especificar
mode=''overwrite''
al guardar un DataFrame:
myDataFrame.save(path=''myPath'', source=''parquet'', mode=''overwrite'')
He verificado que esto incluso eliminará los archivos de partición sobrantes. Entonces, si usted dijo originalmente 10 particiones / archivos, pero luego sobrescribió la carpeta con un DataFrame que solo tenía 6 particiones, la carpeta resultante tendrá las 6 particiones / archivos.
Consulte la documentación de Spark SQL para obtener más información sobre las opciones de modo.
Esta versión sobrecargada de la función guardar funciona para mí:
yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Sobrescribir"))
El ejemplo anterior sobrescribiría una carpeta existente. El modo de salvar también puede tomar estos parámetros ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):
Agregar : el modo Agregar significa que al guardar un DataFrame en un origen de datos, si los datos / la tabla ya existen, se espera que el contenido del DataFrame se agregue a los datos existentes.
ErrorIfExists : el modo ErrorIfExists significa que al guardar un DataFrame en un origen de datos, si los datos ya existen, se espera una excepción.
Ignorar : el modo Ignorar significa que al guardar un DataFrame en un origen de datos, si los datos ya existen, se espera que la operación de guardar no guarde el contenido del DataFrame y no cambie los datos existentes.
La documentación del parámetro
spark.files.overwrite
dice esto: "Si se deben sobrescribir los archivos agregados a través de
SparkContext.addFile()
cuando el archivo de destino existe y su contenido no coincide con el de la fuente".
Por lo tanto, no tiene ningún efecto en el método saveAsTextFiles.
Puede hacer esto antes de guardar el archivo:
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }
Aas explicó aquí: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html
Si está dispuesto a usar su propio formato de salida personalizado, también podría obtener el comportamiento deseado con RDD.
Eche un vistazo a las siguientes clases: FileOutputFormat , FileOutputCommitter
En el formato de salida de archivo, tiene un método llamado checkOutputSpecs, que verifica si el directorio de salida existe. En FileOutputCommitter tiene el commitJob, que generalmente transfiere datos del directorio temporal a su lugar final.
No pude verificarlo todavía (lo haría, tan pronto como tenga unos minutos libres) pero teóricamente: si extiendo FileOutputFormat y anulo checkOutputSpecs a un método que no arroja excepciones en el directorio ya existe, y ajusto el El método commitJob de mi confirmador de salida personalizado para realizar cualquier lógica que quiera (por ejemplo, anular algunos de los archivos, agregar otros) de lo que también puedo lograr el comportamiento deseado con los RDD.
El formato de salida se pasa a: saveAsNewAPIHadoopFile (que es también el método llamado saveAsTextFile para guardar los archivos). Y el confirmador de salida está configurado a nivel de aplicación.
dado que
df.save(path, source, mode)
está en desuso, (
http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame
)
use
df.write.format(source).mode("overwrite").save(path)
donde df.write es DataFrameWriter
''fuente'' puede ser ("com.databricks.spark.avro" | "parquet" | "json")
df.write.mode(''overwrite'').parquet("/output/folder/path")
funciona si desea sobrescribir un archivo de parquet usando python.
Esto está en chispa 1.6.2.
La API puede ser diferente en versiones posteriores
val jobName = "WordCount";
//overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false")
val conf = new
SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
val sc = new SparkContext(conf)