scala apache-spark apache-spark-sql

scala - Cómo usar COGROUP para grandes conjuntos de datos



apache-spark apache-spark-sql (2)

Tengo dos rdd''s : val tab_a: RDD[(String, String)] y val tab_b: RDD[(String, String)] Estoy usando cogroup para esos conjuntos de datos como:

val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } }

Estoy usando valores tab_c tab_c para la función de mapa y funciona bien para pequeños conjuntos de datos, pero en el caso de grandes conjuntos de datos arroja la Out Of Memory exception .

He intentado convertir el valor final a RDD pero no tuve suerte el mismo error

val newcos = spark.sparkContext.parallelize(tab_c)

1.¿Cómo usar Cogroup para grandes conjuntos de datos?

2. ¿Podemos persistir el valor agrupado?

Código

val source_primary_key = source.map(rec => (rec.split(",")(0), rec)) source_primary_key.persist(StorageLevel.DISK_ONLY) val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec)) destination_primary_key.persist(StorageLevel.DISK_ONLY) val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect() var srcmis: Array[String] = new Array[String](0) var destmis: Array[String] = new Array[String](0) var extrainsrc: Array[String] = new Array[String](0) var extraindest: Array[String] = new Array[String](0) var srcs: String = Seq("")(0) var destt: String = Seq("")(0) val updated = cos.map { x => { val key = x._1 val value = x._2 srcs = value._1.mkString(",") destt = value._2.mkString(",") if (srcs.equalsIgnoreCase(destt) == false && destt != "") { srcmis :+= srcs destmis :+= destt } if (srcs == "") { extraindest :+= destt.mkString("") } if (destt == "") { extrainsrc :+= srcs.mkString("") } } }

Código actualizado:

val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2) // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)} {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..

ERROR:

ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693) ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Gracias


Cuando usa collect() básicamente le está diciendo a spark que mueva todos los datos resultantes de regreso al nodo maestro, lo que puede producir fácilmente un cuello de botella. Ya no estás usando Spark en ese punto, solo una matriz simple en una sola máquina.

Para desencadenar el cálculo, simplemente use algo que requiera los datos en cada nodo, es por eso que los ejecutores viven sobre un sistema de archivos distribuido. Por ejemplo saveAsTextFile() .

Aquí hay algunos ejemplos básicos.

Recuerde, el objetivo completo aquí (es decir, si tiene datos grandes) es mover el código a sus datos y calcularlos allí, no traer todos los datos al cálculo.


TL; DR No collect .

Para ejecutar este código de manera segura, sin suposiciones adicionales (en promedio, los requisitos para los nodos de trabajo podrían ser significativamente más pequeños), cada nodo (controlador y cada ejecutor) requeriría una memoria que excedería significativamente los requisitos de memoria total para todos los datos.

Si lo ejecutara fuera de Spark, necesitaría solo un nodo. Por lo tanto, Spark no proporciona beneficios aquí.

Sin embargo, si omite collect.toArray y hace algunas suposiciones sobre la distribución de datos, puede ejecutarlo bien.