tutorial spark examples example apache-spark

apache spark - examples - Cómo sobrescribir el directorio de salida en spark



apache spark wikipedia (8)

ACTUALIZACIÓN: Sugerir el uso de Dataframes , más algo como ... .write.mode(SaveMode.Overwrite) ...

Para versiones anteriores intente

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false") val sc = SparkContext(yourSparkConf)

En 1.1.0 puede establecer configuraciones conf usando el script spark-submit con la bandera --conf.

ADVERTENCIA (versiones anteriores): según @piggybox, hay un error en Spark en el que solo sobrescribirá los archivos que necesita para escribir sus archivos part- , cualquier otro archivo quedará sin eliminar.

Tengo una aplicación de transmisión por chispa que produce un conjunto de datos por cada minuto. Necesito guardar / sobrescribir los resultados de los datos procesados.

Cuando intenté sobrescribir el conjunto de datos org.apache.hadoop.mapred.FileAlreadyExistsException detiene la ejecución.

Establecí el conjunto de propiedades Spark set("spark.files.overwrite","true") , pero no hay suerte.

¿Cómo sobrescribir o anteponer los archivos de la chispa?


De la documentación de pyspark.sql.DataFrame.save (actualmente en 1.3.1), puede especificar mode=''overwrite'' al guardar un DataFrame:

myDataFrame.save(path=''myPath'', source=''parquet'', mode=''overwrite'')

He verificado que esto incluso eliminará los archivos de partición sobrantes. Entonces, si usted dijo originalmente 10 particiones / archivos, pero luego sobrescribió la carpeta con un DataFrame que solo tenía 6 particiones, la carpeta resultante tendrá las 6 particiones / archivos.

Consulte la documentación de Spark SQL para obtener más información sobre las opciones de modo.


Esta versión sobrecargada de la función guardar funciona para mí:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Sobrescribir"))

El ejemplo anterior sobrescribiría una carpeta existente. El modo de salvar también puede tomar estos parámetros ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Agregar : el modo Agregar significa que al guardar un DataFrame en un origen de datos, si los datos / la tabla ya existen, se espera que el contenido del DataFrame se agregue a los datos existentes.

ErrorIfExists : el modo ErrorIfExists significa que al guardar un DataFrame en un origen de datos, si los datos ya existen, se espera una excepción.

Ignorar : el modo Ignorar significa que al guardar un DataFrame en un origen de datos, si los datos ya existen, se espera que la operación de guardar no guarde el contenido del DataFrame y no cambie los datos existentes.


La documentación del parámetro spark.files.overwrite dice esto: "Si se deben sobrescribir los archivos agregados a través de SparkContext.addFile() cuando el archivo de destino existe y su contenido no coincide con el de la fuente". Por lo tanto, no tiene ningún efecto en el método saveAsTextFiles.

Puede hacer esto antes de guardar el archivo:

val hadoopConf = new org.apache.hadoop.conf.Configuration() val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf) try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas explicó aquí: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html


Si está dispuesto a usar su propio formato de salida personalizado, también podría obtener el comportamiento deseado con RDD.

Eche un vistazo a las siguientes clases: FileOutputFormat , FileOutputCommitter

En el formato de salida de archivo, tiene un método llamado checkOutputSpecs, que verifica si el directorio de salida existe. En FileOutputCommitter tiene el commitJob, que generalmente transfiere datos del directorio temporal a su lugar final.

No pude verificarlo todavía (lo haría, tan pronto como tenga unos minutos libres) pero teóricamente: si extiendo FileOutputFormat y anulo checkOutputSpecs a un método que no arroja excepciones en el directorio ya existe, y ajusto el El método commitJob de mi confirmador de salida personalizado para realizar cualquier lógica que quiera (por ejemplo, anular algunos de los archivos, agregar otros) de lo que también puedo lograr el comportamiento deseado con los RDD.

El formato de salida se pasa a: saveAsNewAPIHadoopFile (que es también el método llamado saveAsTextFile para guardar los archivos). Y el confirmador de salida está configurado a nivel de aplicación.



df.write.mode(''overwrite'').parquet("/output/folder/path") funciona si desea sobrescribir un archivo de parquet usando python. Esto está en chispa 1.6.2. La API puede ser diferente en versiones posteriores


val jobName = "WordCount"; //overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false") val conf = new SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false"); val sc = new SparkContext(conf)