tutorial spark reducebykey ejemplo apache-spark filter rdd

reducebykey - Apache Spark: Splitting Pair RDD en varios RDD por clave para guardar valores



spark shell (3)

Estoy usando Spark 1.0.1 para procesar una gran cantidad de datos. Cada fila contiene un número de ID, algunos con ID duplicados. Quiero guardar todas las filas con el mismo número de ID en la misma ubicación, pero tengo problemas para hacerlo de manera eficiente. Creo un RDD [(String, String)] de (número de ID, fila de datos) pares:

val mapRdd = rdd.map{ x=> (x.split("//t+")(1), x)}

Una forma que funciona, pero no funciona, es recolectar los números de ID, filtrar el RDD para cada ID, y guardar el RDD de los valores con la misma ID que un archivo de texto.

val ids = rdd.keys.distinct.collect ids.foreach({ id => val dataRows = mapRdd.filter(_._1 == id).values dataRows.saveAsTextFile(id) })

También probé un groupByKey o reduceByKey para que cada tupla en el RDD contenga un número de ID único como clave y una cadena de filas de datos combinadas separadas por nuevas líneas para ese número de ID. Quiero iterar a través del RDD solo una vez usando foreach para guardar los datos, pero no puedo dar los valores como un RDD

groupedRdd.foreach({ tup => val data = sc.parallelize(List(tup._2)) //nested RDD does not work data.saveAsTextFile(tup._1) })

Esencialmente, quiero dividir un RDD en varios RDD por un número de ID y guardar los valores para ese número de ID en su propia ubicación.


Creo que este problema es similar a Escribir en múltiples salidas por clave Spark - un trabajo de Spark

Por favor refiérase la respuesta allí.

import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } object Split { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Split" + args(1)) val sc = new SparkContext(conf) sc.textFile("input/path") .map(a => (k, v)) // Your own implementation .partitionBy(new HashPartitioner(num)) .saveAsHadoopFile("output/path", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() } }

Acabo de ver una respuesta similar anteriormente, pero en realidad no necesitamos particiones personalizadas. MultipleTextOutputFormat creará un archivo para cada clave. Está bien que el registro múltiple con las mismas claves caiga en la misma partición.

nuevo HashPartitioner (num), donde el número es el número de partición que desea. En caso de que tenga una gran cantidad de claves diferentes, puede establecer el número en grande. En este caso, cada partición no abrirá demasiados manejadores de archivos hdfs.


Esto guardará los datos por ID de usuario

val mapRdd = rdd.map{ x=> (x.split("//t+")(1), x)}.groupByKey(numPartitions).saveAsObjectFile("file")

Si necesita recuperar los datos de nuevo en función de la identificación del usuario, puede hacer algo como

val userIdLookupTable = sc.objectFile("file").cache() //could use persist() if data is to big for memory val data = userIdLookupTable.lookup(id) //note this returns a sequence, in this case you can just get the first one

Tenga en cuenta que no hay ninguna razón particular para guardar en el archivo en este caso. Lo hice desde que el OP lo solicitó, y el hecho de guardarlo en un archivo le permite cargar el RDD en cualquier momento después de que se haya realizado la agrupación inicial.

Una última cosa, la lookup es más rápida que un enfoque de filtro para acceder a los identificadores, pero si está dispuesto a suspender una solicitud de extracción de chispa, puede verificar esta respuesta para un acercamiento más rápido.


puede llamar directamente a saveAsTextFile en RDD agrupado, aquí guardará los datos basados ​​en particiones, es decir, si tiene 4 distinctID, y especificó el número de particiones del RDPD como 4, entonces spark almacena cada información de partición en un archivo (por lo tanto mediante el cual puede tener solo una ID de archivador), puede incluso ver los datos como iterables de cada Id en el sistema de archivos.