scala - Escribir en múltiples salidas con la tecla Spark-un trabajo Spark
hadoop output (10)
Buenas noticias para el usuario de Python en el caso de que tenga varias columnas y desee guardar todas las otras columnas que no estén particionadas en formato csv, que fallarán si utiliza el método de "texto" como sugerencia de Nick Chammas.
people_df.write.partitionBy("number").text("people")
el mensaje de error es "AnalysisException: u''Texto fuente de datos admite solo una columna, y tiene 2 columnas .;"
En el paquete spark 2.0.0 (my test environment is hdp''s spark 2.0.0) "com.databricks.spark.csv" ahora está integrado, y nos permite guardar el archivo de texto dividido por una sola columna, ver el ejemplo de golpe:
people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
(1,"2016-12-25", "alice"),
(1,"2016-12-25", "tom"),
(1, "2016-12-25","bob"),
(2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])
df.coalesce(1).write.partitionBy("number").mode("overwrite").format(''com.databricks.spark.csv'').options(header=''false'').save("people")
[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS
[root@namenode people]# cat number/=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number/=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie
En mi entorno spark chispa 1.6.1, el código no arrojó ningún error, sin embargo, solo se genera un archivo. no está dividido por dos carpetas.
Espero que esto pueda ayudar.
¿Cómo se puede escribir en múltiples salidas dependientes de la clave usando Spark en un solo trabajo?
Relacionado: escribe en salidas múltiples con la tecla Scalding Hadoop, un trabajo de MapReduce
P.ej
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
aseguraría el cat prefix/1
es
a
b
y cat prefix/2
sería
c
Responder
Para obtener una respuesta exacta con códec completo de importación, chulo y compresión, consulte https://stackoverflow.com/a/46118044/1586965
Esto incluye el códec solicitado, las importaciones necesarias y el proxeneta según lo solicitado.
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
def writeAsMultiple(prefix: String, codec: String,
keyName: String = "key")
(implicit sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
rdd.toDF(keyName, "_2").write.partitionBy(keyName)
.format("text").option("codec", codec).save(prefix)
}
}
val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Una diferencia sutil al OP es que prefija <keyName>=
a los nombres del directorio. P.ej
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Daría:
prefix/key=1/part-00000
prefix/key=2/part-00000
donde prefix/my_number=1/part-00000
contendría las líneas a
y b
, y prefix/my_number=2/part-00000
contendría la línea c
.
Y
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
Daría:
prefix/foo=1/part-00000
prefix/foo=2/part-00000
Debe quedar claro cómo editar para parquet
.
Finalmente, a continuación, se muestra un ejemplo de Dataset
, que quizás sea mejor que usar Tuples.
implicit class PimpedDataset[T](dataset: Dataset[T]) {
def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
dataset.write.partitionBy(field)
.format("text").option("codec", codec).save(prefix)
}
}
Lo haría así, que es escalable
import org.apache.hadoop.io.NullWritable
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}
object Split {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Split" + args(1))
val sc = new SparkContext(conf)
sc.textFile("input/path")
.map(a => (k, v)) // Your own implementation
.partitionBy(new HashPartitioner(num))
.saveAsHadoopFile("output/path", classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
spark.stop()
}
}
Acabo de ver una respuesta similar anteriormente, pero en realidad no necesitamos particiones personalizadas. MultipleTextOutputFormat creará un archivo para cada clave. Está bien que el registro múltiple con las mismas claves caiga en la misma partición.
nuevo HashPartitioner (num), donde el número es el número de partición que desea. En caso de que tenga una gran cantidad de claves diferentes, puede establecer el número en grande. En este caso, cada partición no abrirá demasiados manejadores de archivos hdfs.
Necesitaba lo mismo en Java. Publicando mi traducción de la respuesta Scala de Zhang Zhan a los usuarios de Spark Java API:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {
@Override
protected String generateFileNameForKeyValue(A key, B value, String name) {
return key.toString();
}
}
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Split Job")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
sc.parallelize(Arrays.asList(strings))
// The first character of the string is the key
.mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
.saveAsHadoopFile("output/", String.class, String.class,
RDDMultipleTextOutputFormat.class);
sc.stop();
}
}
Si potencialmente tiene muchos valores para una clave determinada, creo que la solución escalable es escribir un archivo por clave por partición. Desafortunadamente no hay soporte integrado para esto en Spark, pero podemos mejorar algo.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.mapPartitionsWithIndex { (p, it) =>
val outputs = new MultiWriter(p.toString)
for ((k, v) <- it) {
outputs.write(k.toString, v)
}
outputs.close
Nil.iterator
}
.foreach((x: Nothing) => ()) // To trigger the job.
// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
def write(key: String, value: Any) = {
if (!writers.contains(key)) {
val f = new java.io.File("output/" + key + "/" + suffix)
f.getParentFile.mkdirs
writers(key) = new java.io.PrintWriter(f)
}
writers(key).println(value)
}
def close = writers.values.foreach(_.close)
}
(Reemplace PrintWriter
con la operación de sistema de archivos distribuida que elija).
Esto hace una sola pasada sobre el RDD y no realiza ninguna reproducción aleatoria. Le da un directorio por clave, con una cantidad de archivos dentro de cada uno.
Si usa Spark 1.4+, esto se ha vuelto mucho, mucho más fácil gracias a la API de DataFrame . (Los DataFrames se introdujeron en Spark 1.3, pero partitionBy()
, que necesitamos, se introdujo en 1.4 .)
Si está empezando con un RDD, primero deberá convertirlo a un DataFrame:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
En Python, este mismo código es:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
Una vez que tiene un DataFrame, escribir en múltiples salidas basadas en una clave en particular es simple. Lo que es más, y esta es la belleza de la API de DataFrame, el código es prácticamente el mismo en Python, Scala, Java y R:
people_df.write.partitionBy("number").text("people")
Y puede usar fácilmente otros formatos de salida si lo desea:
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
En cada uno de estos ejemplos, Spark creará un subdirectorio para cada una de las claves en las que hemos particionado el DataFrame:
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh
Tengo una necesidad similar y encontré la manera. Pero tiene una desventaja (que no es un problema para mi caso): necesita volver a particionar los datos con una partición por archivo de salida.
Para realizar la partición de esta manera, generalmente se requiere saber de antemano cuántos archivos se generarán en el trabajo y encontrar una función que asigne cada clave a cada partición.
Primero, creemos nuestra clase basada en MultipleTextOutputFormat:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
key.toString
}
override protected def generateActualKey(key: T, value: V) = {
null
}
}
Con esta clase Spark obtendrá una clave de una partición (la primera / última, supongo) y nombrará el archivo con esta clave, por lo que no es bueno mezclar varias claves en la misma partición.
Para su ejemplo, necesitará un particionador personalizado. Esto hará el trabajo:
import org.apache.spark.Partitioner
class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
def numPartitions = maxKey
def getPartition(key: Any): Int = key match {
case i: Int if i < maxKey => i
}
}
Ahora vamos a juntar todo:
val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))
// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)
val prefix = "hdfs://.../prefix"
val partitionedRDD = rdd.partitionBy(partitioner)
partitionedRDD.saveAsHadoopFile(prefix,
classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])
Esto generará 3 archivos bajo el prefijo (llamados 1, 2 y 7), procesando todo en una sola pasada.
Como puede ver, necesita cierto conocimiento sobre sus claves para poder utilizar esta solución.
Para mí fue más fácil porque necesitaba un archivo de salida para cada hash de clave y la cantidad de archivos estaba bajo mi control, así que podía usar el HashPartitioner original para hacer el truco.
Tuve un caso de uso similar donde dividí el archivo de entrada en Hadoop HDFS en varios archivos basados en una clave (1 archivo por clave). Aquí está mi código scala para spark
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);
@serializable object processGroup {
def apply(groupName:String, records:Iterable[String]): Unit = {
val outFileStream = fs.create(new Path("/output_dir/"+groupName))
for( line <- records ) {
outFileStream.writeUTF(line+"/n")
}
outFileStream.close()
}
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))
He agrupado los registros según la clave. Los valores para cada clave se escriben en un archivo separado.
Tuve un caso de uso similar. Lo resolví en Java escribiendo dos clases personalizadas implementando MultipleTextOutputFormat
y RecordWriter
.
Mi entrada fue JavaPairRDD<String, List<String>>
y quería almacenarlo en un archivo nombrado por su clave, con todas las líneas contenidas en su valor.
Aquí está el código para mi implementación MultipleTextOutputFormat
class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
@Override
protected String generateFileNameForKeyValue(K key, V value, String name) {
return key.toString(); //The return will be used as file name
}
/** The following 4 functions are only for visibility purposes
(they are used in the class MyRecordWriter) **/
protected String generateLeafFileName(String name) {
return super.generateLeafFileName(name);
}
protected V generateActualValue(K key, V value) {
return super.generateActualValue(key, value);
}
protected String getInputFileBasedOutputFileName(JobConf job, String name) {
return super.getInputFileBasedOutputFileName(job, name);
}
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
return super.getBaseRecordWriter(fs, job, name, arg3);
}
/** Use my custom RecordWriter **/
@Override
RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
final String myName = this.generateLeafFileName(name);
return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
}
}
Aquí está el código para mi implementación de RecordWriter
.
class MyRecordWriter<K, V> implements RecordWriter<K, V> {
private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
private final FileSystem fs;
private final JobConf job;
private final Progressable arg3;
private String myName;
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();
MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
this.fs = fs;
this.job = job;
this.arg3 = arg3;
this.myName = myName;
}
@Override
void write(K key, V value) throws IOException {
String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
RecordWriter rw = this.recordWriters.get(finalPath);
if(rw == null) {
rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
this.recordWriters.put(finalPath, rw);
}
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
}
@Override
void close(Reporter reporter) throws IOException {
Iterator keys = this.recordWriters.keySet().iterator();
while(keys.hasNext()) {
RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
rw.close(reporter);
}
this.recordWriters.clear();
}
}
La mayoría del código es exactamente el mismo que en FileOutputFormat
. La única diferencia son esas pocas líneas
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
Estas líneas me permitieron escribir cada línea de mi List<String>
entrada List<String>
en el archivo. El primer argumento de la función de write
se establece en null
para evitar escribir la clave en cada línea.
Para terminar, solo necesito hacer esta llamada para escribir mis archivos
javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);
saveAsText () y saveAsHadoop (...) se implementan en base a los datos de RDD, específicamente por el método: PairRDD.saveAsHadoopDataset que toma los datos del PairRdd donde se ejecuta. Veo dos opciones posibles: si sus datos son de un tamaño relativamente pequeño, podría ahorrar algo de tiempo de implementación al agrupar el RDD, crear un nuevo RDD de cada colección y usar ese RDD para escribir los datos. Algo como esto:
val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}
Tenga en cuenta que no funcionará para grandes conjuntos de datos b / c la materialización del iterador en v.toSeq
podría no ajustarse en la memoria.
La otra opción que veo, y en realidad la que recomendaría en este caso es la siguiente: hazlo tú mismo, llamando directamente a la API de hadoop / hdfs.
Aquí hay una discusión que comencé al investigar esta pregunta: ¿Cómo crear RDD desde otro RDD?