reducebykey - Apache Spark: Splitting Pair RDD en varios RDD por clave para guardar valores
spark shell (3)
Estoy usando Spark 1.0.1 para procesar una gran cantidad de datos. Cada fila contiene un número de ID, algunos con ID duplicados. Quiero guardar todas las filas con el mismo número de ID en la misma ubicación, pero tengo problemas para hacerlo de manera eficiente. Creo un RDD [(String, String)] de (número de ID, fila de datos) pares:
val mapRdd = rdd.map{ x=> (x.split("//t+")(1), x)}
Una forma que funciona, pero no funciona, es recolectar los números de ID, filtrar el RDD para cada ID, y guardar el RDD de los valores con la misma ID que un archivo de texto.
val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
val dataRows = mapRdd.filter(_._1 == id).values
dataRows.saveAsTextFile(id)
})
También probé un groupByKey o reduceByKey para que cada tupla en el RDD contenga un número de ID único como clave y una cadena de filas de datos combinadas separadas por nuevas líneas para ese número de ID. Quiero iterar a través del RDD solo una vez usando foreach para guardar los datos, pero no puedo dar los valores como un RDD
groupedRdd.foreach({ tup =>
val data = sc.parallelize(List(tup._2)) //nested RDD does not work
data.saveAsTextFile(tup._1)
})
Esencialmente, quiero dividir un RDD en varios RDD por un número de ID y guardar los valores para ese número de ID en su propia ubicación.
Creo que este problema es similar a Escribir en múltiples salidas por clave Spark - un trabajo de Spark
Por favor refiérase la respuesta allí.
import org.apache.hadoop.io.NullWritable
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}
object Split {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Split" + args(1))
val sc = new SparkContext(conf)
sc.textFile("input/path")
.map(a => (k, v)) // Your own implementation
.partitionBy(new HashPartitioner(num))
.saveAsHadoopFile("output/path", classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
spark.stop()
}
}
Acabo de ver una respuesta similar anteriormente, pero en realidad no necesitamos particiones personalizadas. MultipleTextOutputFormat creará un archivo para cada clave. Está bien que el registro múltiple con las mismas claves caiga en la misma partición.
nuevo HashPartitioner (num), donde el número es el número de partición que desea. En caso de que tenga una gran cantidad de claves diferentes, puede establecer el número en grande. En este caso, cada partición no abrirá demasiados manejadores de archivos hdfs.
Esto guardará los datos por ID de usuario
val mapRdd = rdd.map{ x=> (x.split("//t+")(1),
x)}.groupByKey(numPartitions).saveAsObjectFile("file")
Si necesita recuperar los datos de nuevo en función de la identificación del usuario, puede hacer algo como
val userIdLookupTable = sc.objectFile("file").cache() //could use persist() if data is to big for memory
val data = userIdLookupTable.lookup(id) //note this returns a sequence, in this case you can just get the first one
Tenga en cuenta que no hay ninguna razón particular para guardar en el archivo en este caso. Lo hice desde que el OP lo solicitó, y el hecho de guardarlo en un archivo le permite cargar el RDD en cualquier momento después de que se haya realizado la agrupación inicial.
Una última cosa, la lookup
es más rápida que un enfoque de filtro para acceder a los identificadores, pero si está dispuesto a suspender una solicitud de extracción de chispa, puede verificar esta respuesta para un acercamiento más rápido.
puede llamar directamente a saveAsTextFile en RDD agrupado, aquí guardará los datos basados en particiones, es decir, si tiene 4 distinctID, y especificó el número de particiones del RDPD como 4, entonces spark almacena cada información de partición en un archivo (por lo tanto mediante el cual puede tener solo una ID de archivador), puede incluso ver los datos como iterables de cada Id en el sistema de archivos.