Análisis de registros multilínea en Scala
apache-spark rdd (1)
Aquí está mi RDD [String]
M1 module1
PIP a Z A
PIP b Z B
PIP c Y n4
M2 module2
PIP a I n4
PIP b O D
PIP c O n5
y así. Básicamente, necesito un RDD de clave (que contiene la segunda palabra en la línea 1) y los valores de las líneas PIP posteriores que se pueden iterar.
He intentado lo siguiente
val usgPairRDD = usgRDD.map(x => (x.split("//n")(0), x))
pero esto me da el siguiente resultado
(,)
(M1 module1,M1 module1)
(PIP a Z A,PIP a Z A)
(PIP b Z B,PIP b Z B)
(PIP c Y n4,PIP c Y n4)
(,)
(M2 module2,M2 module2)
(PIP a I n4,PIP a I n4)
(PIP b O D,PIP b O D)
(PIP c O n5,PIP c O n5)
En cambio, me gustaría que la salida sea
module1, (PIP a Z A, PIP b Z B, PIP b Z B)
module2, (PIP a I n4,PIP b O D, PIP c O n5)
¿Qué estoy haciendo mal? Soy bastante nuevo en las API de Spark. Gracias
Hola @ zero323
usgRDD.take(10).foreach(x => println(x + "%%%%%%%%%"))
rinde ...
%%%%%%%%%
M1 module1%%%%%%%%%
PIP a Z A%%%%%%%%%
PIP b Z B%%%%%%%%%
PIP c Y n4%%%%%%%%%
%%%%%%%%%
M2 module2%%%%%%%%%
PIP a I n4%%%%%%%%%
PIP b O D%%%%%%%%%
PIP c O n5%%%%%%%%%
y así
Hola, @ zero323 y @Daniel Darabos. Mi entrada es un conjunto muy grande de muchos archivos (que abarcan TB). Aquí está la muestra ...
BIN n4
BIN n5
BIN D
BIN E
PIT A I A
PIT B I B
PIT C I C
PIT D O D
PIT E O E
DEF M1 module1
PIP a Z A
PIP b Z B
PIP c Y n4
DEF M2 module2
PIP a I n4
PIP b O D
PIP c O n5
Necesito todos los BINS, PIT y DEF (incluidas las líneas PIP a continuación) en 3 RDDS diferentes. Aquí es cómo estoy haciendo esto actualmente (de la discusión, siento que usgRDD a continuación está mal calculado)
val binRDD = levelfileRDD.filter(line => line.contains("BIN"))
val pitRDD = levelfileRDD.filter(line => line.contains("PIT"))
val usgRDD = levelfileRDD.filter(line => !line.contains("BIN") && !line.contains("PIT")).flatMap(s=>s.split("DEF").map(_.trim))
Necesito 3 tipos (en este momento) de RDD porque necesito realizar la validación más adelante. Por ejemplo, "n4" en "DEF M2 module2" solo puede existir si n4 es un elemento BIN. De los RDD, espero derivar relaciones usando GraphX API (obviamente no he llegado a este punto). Sería ideal si cada usgPairRDD (calculado a partir de usgRDD o de otro modo) imprima lo siguiente
module1, (a Z A, b Z B, c Y n4) %%%%%%%
module2, (a I n4, b O D, c O n5) %%%%%%%
Espero tener sentido. Disculpas a los Dioses Chispa, si no lo soy.
Por defecto, Spark crea un único elemento por línea. Significa que, en su caso, cada registro se extiende sobre múltiples elementos que, como lo indica Daniel Darabos en los comentarios, pueden ser procesados por diferentes trabajadores.
Como parece que sus datos son relativamente regulares y están separados por una línea vacía, debería poder usar
newAPIHadoopFile
con delimitador personalizado:
import org.apache.spark.rdd.RDD
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
val path: String = ???
val conf = new org.apache.hadoop.mapreduce.Job().getConfiguration
conf.set("textinputformat.record.delimiter", "/n/n")
val usgRDD = sc.newAPIHadoopFile(
path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
.map{ case (_, v) => v.toString }
val usgPairRDD: RDD[(String, Seq[String])] = usgRDD.map(_.split("/n") match {
case Array(x, xs @ _*) => (x, xs)
})