scala - velocidad - cómo hacer saveAsTextFile NO divide la salida en múltiples archivos?
tipos de arranque atletismo (9)
Aquí está mi respuesta para generar un solo archivo. Acabo de agregar coalesce(1)
val year = sc.textFile("apat63_99.txt")
.map(_.split(",")(1))
.flatMap(_.split(","))
.map((_,1))
.reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
Código:
year.coalesce(1).saveAsTextFile("year")
Cuando uso Scala en Spark, cada vez que vuelco los resultados usando saveAsTextFile
, parece dividir el resultado en varias partes. Solo le paso un parámetro (ruta).
val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
- ¿El número de salidas corresponde a la cantidad de reductores que usa?
- ¿Esto significa que la salida está comprimida?
- Sé que puedo combinar la salida usando bash, pero ¿hay una opción para almacenar la salida en un solo archivo de texto, sin dividir? Miré los documentos de la API, pero no dice mucho sobre esto.
Como han mencionado otros, puede recopilar o fusionar su conjunto de datos para forzar a Spark a producir un único archivo. Pero esto también limita el número de tareas de Spark que pueden funcionar en su conjunto de datos en paralelo. Prefiero dejar que cree cien archivos en el directorio HDFS de salida, luego use hadoop fs -getmerge /hdfs/dir /local/file.txt
para extraer los resultados en un único archivo en el sistema de archivos local. Esto tiene más sentido cuando su resultado es un informe relativamente pequeño, por supuesto.
En Spark 1.6.1, el formato es el siguiente. Crea un solo archivo de salida. Es una buena práctica usarlo si la salida es lo suficientemente pequeña como para manejarlo. Básicamente lo que hace es devolver un nuevo RDD que se reduce a particiones numPartitions. Si estás haciendo una unión drástica, por ejemplo, para numPartitions = 1, esto puede hacer que su computación tenga lugar en menos nodos de los que desea (por ejemplo, un nodo en el caso de numPartitions = 1)
pair_result.coalesce(1).saveAsTextFile("/app/data/")
La razón por la que se guarda como archivos múltiples es porque el cálculo se distribuye. Si la salida es lo suficientemente pequeña como para pensar que puede caber en una máquina, entonces puede finalizar su programa con
val arr = year.collect()
Y luego guarde la matriz resultante como un archivo. Otra forma sería usar un particionador personalizado, partitionBy
, y hacerlo de modo que todo vaya a una partición, aunque eso no es aconsejable porque no obtendrá paralelización.
Si necesita que el archivo se guarde con saveAsTextFile
, puede usar coalesce(1,true).saveAsTextFile()
. Esto básicamente significa que los cálculos se unen a una partición. También puede usar repartition(1)
que es solo un contenedor para coalesce
con el argumento de mezcla establecido en verdadero. Mirando a través de la fuente de RDD.scala es como me di cuenta de la mayoría de estas cosas, deberías echarle un vistazo.
Para aquellos que trabajan con un conjunto de datos más grande :
rdd.collect()
no debe usarse en este caso, ya que recopilará todos los datos como unaArray
en el controlador, que es la forma más fácil de obtener memoria insuficiente.rdd.coalesce(1).saveAsTextFile()
tampoco debe utilizarse ya que se perderá el paralelismo de las fases iniciales en un solo nodo, donde se almacenarán los datos.rdd.coalesce(1, shuffle = true).saveAsTextFile()
es la mejor opción simple ya que mantendrá el procesamiento de las tareas de subida paralelas y luego solo realizará la reproducción aleatoria en un nodo (rdd.repartition(1).saveAsTextFile()
es un sinónimo exacto).rdd.saveAsSingleTextFile()
como serdd.saveAsSingleTextFile()
continuación, permite almacenar el rdd en un solo archivo con un nombre específico, manteniendo las propiedades de paralelismo derdd.coalesce(1, shuffle = true).saveAsTextFile()
.
Lo que puede ser inconveniente con rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")
es que en realidad produce un archivo cuya ruta es path/to/file.txt/part-00000
y no path/to/file.txt
.
La siguiente solución rdd.saveAsSingleTextFile("path/to/file.txt")
realmente producirá un archivo cuya ruta es path/to/file.txt
:
package com.whatever.package
import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec
object SparkHelper {
// This is an implicit class so that saveAsSingleTextFile can be attached to
// SparkContext and be called like this: sc.saveAsSingleTextFile
implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {
def saveAsSingleTextFile(path: String): Unit =
saveAsSingleTextFileInternal(path, None)
def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
saveAsSingleTextFileInternal(path, Some(codec))
private def saveAsSingleTextFileInternal(
path: String, codec: Option[Class[_ <: CompressionCodec]]
): Unit = {
// The interface with hdfs:
val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
// Classic saveAsTextFile in a temporary folder:
hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it''s not there already
codec match {
case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
case None => rdd.saveAsTextFile(s"$path.tmp")
}
// Merge the folder of resulting part-xxxxx into one file:
hdfs.delete(new Path(path), true) // to make sure it''s not there already
FileUtil.copyMerge(
hdfs, new Path(s"$path.tmp"),
hdfs, new Path(path),
true, rdd.sparkContext.hadoopConfiguration, null
)
hdfs.delete(new Path(s"$path.tmp"), true)
}
}
}
que se puede usar de esta manera:
import com.whatever.package.SparkHelper.RDDExtensions
rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
Este fragmento primero almacena el rdd con rdd.saveAsTextFile("path/to/file.txt")
en una path/to/file.txt.tmp
carpeta temporal path/to/file.txt.tmp
como si no quisiéramos almacenar datos en un archivo (lo que mantiene el procesamiento de tareas upstream paralelo).
Y solo entonces, utilizando la API del sistema de archivos hadoop , procedemos con la merge ( FileUtil.copyMerge()
) de los diferentes archivos de salida para crear nuestra path/to/file.txt
final de salida de archivo único path/to/file.txt
.
Podrás hacerlo en la próxima versión de Spark, en la versión actual 1.0.0 no es posible a menos que lo hagas manualmente de alguna manera, por ejemplo, como mencionaste, con una llamada de script bash.
Podría llamar a coalesce(1)
y luego saveAsTextFile()
, pero podría ser una mala idea si tiene muchos datos. Se generan archivos separados por división como en Hadoop para permitir que los mapeadores y los reductores por separado escriban en diferentes archivos. Tener un solo archivo de salida es solo una buena idea si tiene muy pocos datos, en cuyo caso también podría recopilar (), como dijo @aaronman.
También quiero mencionar que la documentación establece claramente que los usuarios deben tener cuidado cuando las llamadas se fusionan con un número realmente pequeño de particiones. esto puede provocar que las particiones en sentido ascendente hereden este número de particiones.
No recomendaría usar coalesce (1) a menos que realmente se requiera.