scala apache-spark rdd

Análisis de registros multilínea en Scala



apache-spark rdd (1)

Aquí está mi RDD [String]

M1 module1 PIP a Z A PIP b Z B PIP c Y n4 M2 module2 PIP a I n4 PIP b O D PIP c O n5

y así. Básicamente, necesito un RDD de clave (que contiene la segunda palabra en la línea 1) y los valores de las líneas PIP posteriores que se pueden iterar.

He intentado lo siguiente

val usgPairRDD = usgRDD.map(x => (x.split("//n")(0), x))

pero esto me da el siguiente resultado

(,) (M1 module1,M1 module1) (PIP a Z A,PIP a Z A) (PIP b Z B,PIP b Z B) (PIP c Y n4,PIP c Y n4) (,) (M2 module2,M2 module2) (PIP a I n4,PIP a I n4) (PIP b O D,PIP b O D) (PIP c O n5,PIP c O n5)

En cambio, me gustaría que la salida sea

module1, (PIP a Z A, PIP b Z B, PIP b Z B) module2, (PIP a I n4,PIP b O D, PIP c O n5)

¿Qué estoy haciendo mal? Soy bastante nuevo en las API de Spark. Gracias

Hola @ zero323

usgRDD.take(10).foreach(x => println(x + "%%%%%%%%%"))

rinde ...

%%%%%%%%% M1 module1%%%%%%%%% PIP a Z A%%%%%%%%% PIP b Z B%%%%%%%%% PIP c Y n4%%%%%%%%% %%%%%%%%% M2 module2%%%%%%%%% PIP a I n4%%%%%%%%% PIP b O D%%%%%%%%% PIP c O n5%%%%%%%%%

y así

Hola, @ zero323 y @Daniel Darabos. Mi entrada es un conjunto muy grande de muchos archivos (que abarcan TB). Aquí está la muestra ...

BIN n4 BIN n5 BIN D BIN E PIT A I A PIT B I B PIT C I C PIT D O D PIT E O E DEF M1 module1 PIP a Z A PIP b Z B PIP c Y n4 DEF M2 module2 PIP a I n4 PIP b O D PIP c O n5

Necesito todos los BINS, PIT y DEF (incluidas las líneas PIP a continuación) en 3 RDDS diferentes. Aquí es cómo estoy haciendo esto actualmente (de la discusión, siento que usgRDD a continuación está mal calculado)

val binRDD = levelfileRDD.filter(line => line.contains("BIN")) val pitRDD = levelfileRDD.filter(line => line.contains("PIT")) val usgRDD = levelfileRDD.filter(line => !line.contains("BIN") && !line.contains("PIT")).flatMap(s=>s.split("DEF").map(_.trim))

Necesito 3 tipos (en este momento) de RDD porque necesito realizar la validación más adelante. Por ejemplo, "n4" en "DEF M2 module2" solo puede existir si n4 es un elemento BIN. De los RDD, espero derivar relaciones usando GraphX ​​API (obviamente no he llegado a este punto). Sería ideal si cada usgPairRDD (calculado a partir de usgRDD o de otro modo) imprima lo siguiente

module1, (a Z A, b Z B, c Y n4) %%%%%%% module2, (a I n4, b O D, c O n5) %%%%%%%

Espero tener sentido. Disculpas a los Dioses Chispa, si no lo soy.


Por defecto, Spark crea un único elemento por línea. Significa que, en su caso, cada registro se extiende sobre múltiples elementos que, como lo indica Daniel Darabos en los comentarios, pueden ser procesados ​​por diferentes trabajadores.

Como parece que sus datos son relativamente regulares y están separados por una línea vacía, debería poder usar newAPIHadoopFile con delimitador personalizado:

import org.apache.spark.rdd.RDD import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.io.{LongWritable, Text} val path: String = ??? val conf = new org.apache.hadoop.mapreduce.Job().getConfiguration conf.set("textinputformat.record.delimiter", "/n/n") val usgRDD = sc.newAPIHadoopFile( path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) .map{ case (_, v) => v.toString } val usgPairRDD: RDD[(String, Seq[String])] = usgRDD.map(_.split("/n") match { case Array(x, xs @ _*) => (x, xs) })