Scala spark, listbuffer está vacío
apache-spark (2)
En este fragmento de código en el comentario 1, la longitud de los elementos del buffer de lista se muestra correctamente, pero en el segundo código de comentario nunca se ejecuta. ¿Por qué ocurre?
val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
var wktReader: WKTReader = new WKTReader();
val dataSet = sc.textFile("dataSet.txt")
val items = new ListBuffer[String]()
dataSet.foreach { e =>
items += e
println("len = " + items.length) //1. here length is ok
}
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
items.foreach { x => print(x)} //2. this code doesn''t be executed
Los registros están aquí:
16/11/20 01:16:52 INFO Utils: Successfully started service ''SparkUI'' on port 4040.
16/11/20 01:16:52 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.56.1:4040
16/11/20 01:16:53 INFO Executor: Starting executor ID driver on host localhost
16/11/20 01:16:53 INFO Utils: Successfully started service ''org.apache.spark.network.netty.NettyBlockTransferService'' on port 58608.
16/11/20 01:16:53 INFO NettyBlockTransferService: Server created on 192.168.56.1:58608
16/11/20 01:16:53 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.56.1, 58608)
16/11/20 01:16:53 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.56.1:58608 with 347.1 MB RAM, BlockManagerId(driver, 192.168.56.1, 58608)
16/11/20 01:16:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.56.1, 58608)
Starting app
16/11/20 01:16:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 139.6 KB, free 347.0 MB)
16/11/20 01:16:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 346.9 MB)
16/11/20 01:16:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.56.1:58608 (size: 15.9 KB, free: 347.1 MB)
16/11/20 01:16:58 INFO SparkContext: Created broadcast 0 from textFile at main.scala:25
16/11/20 01:16:58 INFO FileInputFormat: Total input paths to process : 1
16/11/20 01:16:58 INFO SparkContext: Starting job: foreach at main.scala:28
16/11/20 01:16:58 INFO DAGScheduler: Got job 0 (foreach at main.scala:28) with 1 output partitions
16/11/20 01:16:58 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at main.scala:28)
16/11/20 01:16:58 INFO DAGScheduler: Parents of final stage: List()
16/11/20 01:16:58 INFO DAGScheduler: Missing parents: List()
16/11/20 01:16:58 INFO DAGScheduler: Submitting ResultStage 0 (dataSet.txt MapPartitionsRDD[1] at textFile at main.scala:25), which has no missing parents
16/11/20 01:16:58 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 346.9 MB)
16/11/20 01:16:58 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2034.0 B, free 346.9 MB)
16/11/20 01:16:58 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.56.1:58608 (size: 2034.0 B, free: 347.1 MB)
16/11/20 01:16:58 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012
16/11/20 01:16:59 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (dataSet.txt MapPartitionsRDD[1] at textFile at main.scala:25)
16/11/20 01:16:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/11/20 01:16:59 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5427 bytes)
16/11/20 01:16:59 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/11/20 01:16:59 INFO HadoopRDD: Input split: file:/D:/dataSet.txt:0+291
16/11/20 01:16:59 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/11/20 01:16:59 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/11/20 01:16:59 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/11/20 01:16:59 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/11/20 01:16:59 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
len = 1
len = 2
len = 3
len = 4
len = 5
len = 6
len = 7
16/11/20 01:16:59 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 989 bytes result sent to driver
16/11/20 01:16:59 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 417 ms on localhost (1/1)
16/11/20 01:16:59 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/11/20 01:16:59 INFO DAGScheduler: ResultStage 0 (foreach at main.scala:28) finished in 0,456 s
16/11/20 01:16:59 INFO DAGScheduler: Job 0 finished: foreach at main.scala:28, took 0,795126 s
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
16/11/20 01:16:59 INFO SparkContext: Invoking stop() from shutdown hook
16/11/20 01:16:59 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/11/20 01:16:59 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/11/20 01:16:59 INFO MemoryStore: MemoryStore cleared
16/11/20 01:16:59 INFO BlockManager: BlockManager stopped
16/11/20 01:16:59 INFO BlockManagerMaster: BlockManagerMaster stopped
16/11/20 01:16:59 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/11/20 01:16:59 INFO SparkContext: Successfully stopped SparkContext
16/11/20 01:16:59 INFO ShutdownHookManager: Shutdown hook called
16/11/20 01:16:59 INFO ShutdownHookManager: Deleting directory
Apache Spark no proporciona memoria compartida, por lo tanto aquí:
dataSet.foreach { e =>
items += e
println("len = " + items.length) //1. here length is ok
}
usted modifica una
copia local
de
items
en un ejecutor respectivo.
La lista de
items
originales definida en el controlador no se modifica.
Como resultado esto:
items.foreach { x => print(x) }
se ejecuta, pero no hay nada que imprimir.
Por favor, compruebe los cierres de comprensión
Si bien se recomendaría aquí, podría reemplazar los artículos con un accumulator
val acc = sc.collectionAccumulator[String]("Items")
dataSet.foreach(e => acc.add(e))
Spark se ejecuta en ejecutores y devuelve los resultados.
El código anterior no funciona según lo previsto.
Si necesita agregar los elementos de
foreach
, debe recopilar los datos en el controlador y agregarlos al conjunto
current_set
.
Pero recopilar los datos es una mala idea cuando tiene datos grandes.
val items = new ListBuffer[String]()
val rdd = spark.sparkContext.parallelize(1 to 10, 4)
rdd.collect().foreach(data => items += data.toString())
println(items)
Salida:
ListBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)