scala - Cómo usar COGROUP para grandes conjuntos de datos
apache-spark apache-spark-sql (2)
Tengo dos
rdd''s
:
val tab_a: RDD[(String, String)]
y
val tab_b: RDD[(String, String)]
Estoy usando
cogroup
para esos conjuntos de datos como:
val tab_c = tab_a.cogroup(tab_b).collect.toArray
val updated = tab_c.map { x =>
{
//somecode
}
}
Estoy usando valores
tab_c
tab_c para la función de mapa y funciona bien para pequeños conjuntos de datos, pero en el caso de grandes conjuntos de datos arroja la
Out Of Memory exception
.
He intentado convertir el valor final a RDD pero no tuve suerte el mismo error
val newcos = spark.sparkContext.parallelize(tab_c)
1.¿Cómo usar Cogroup para grandes conjuntos de datos?
2. ¿Podemos persistir el valor agrupado?
Código
val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)
val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)
val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()
var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)
var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)
var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)
val updated = cos.map { x =>
{
val key = x._1
val value = x._2
srcs = value._1.mkString(",")
destt = value._2.mkString(",")
if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
srcmis :+= srcs
destmis :+= destt
}
if (srcs == "") {
extraindest :+= destt.mkString("")
}
if (destt == "") {
extrainsrc :+= srcs.mkString("")
}
}
}
Código actualizado:
val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
// tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
{2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..
ERROR:
ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Gracias
Cuando usa
collect()
básicamente le está diciendo a spark que mueva todos los datos resultantes de regreso al nodo maestro, lo que puede producir fácilmente un cuello de botella.
Ya no estás usando Spark en ese punto, solo una matriz simple en una sola máquina.
Para desencadenar el cálculo, simplemente use algo que requiera los datos en cada nodo, es por eso que los ejecutores viven sobre un sistema de archivos distribuido.
Por ejemplo
saveAsTextFile()
.
Aquí hay algunos ejemplos básicos.
Recuerde, el objetivo completo aquí (es decir, si tiene datos grandes) es mover el código a sus datos y calcularlos allí, no traer todos los datos al cálculo.
TL; DR
No
collect
.
Para ejecutar este código de manera segura, sin suposiciones adicionales (en promedio, los requisitos para los nodos de trabajo podrían ser significativamente más pequeños), cada nodo (controlador y cada ejecutor) requeriría una memoria que excedería significativamente los requisitos de memoria total para todos los datos.
Si lo ejecutara fuera de Spark, necesitaría solo un nodo. Por lo tanto, Spark no proporciona beneficios aquí.
Sin embargo, si omite
collect.toArray
y hace algunas suposiciones sobre la distribución de datos, puede ejecutarlo bien.