scala apache-spark println accumulator

scala - Spark perder println() en stdout



apache-spark accumulator (2)

Tengo el siguiente código:

val blueCount = sc.accumulator[Long](0) val output = input.map { data => for (value <- data.getValues()) { if (record.getEnum() == DataEnum.BLUE) { blueCount += 1 println("Enum = BLUE : " + value.toString() } } data }.persist(StorageLevel.MEMORY_ONLY_SER) output.saveAsTextFile("myOutput")

Entonces blueCount no es cero, ¡pero no obtuve salida println ()! ¿Me estoy perdiendo algo aquí? ¡Gracias!


Esta es una pregunta conceptual ...

Imagine que tiene un grupo grande, compuesto por muchos trabajadores, digamos n trabajadores y esos trabajadores almacenan una partición de un RDD o DataFrame , imagine que comienza una tarea de map través de esos datos, y dentro de ese map tiene una declaración print , en primer lugar :

  • ¿Dónde se imprimirán esos datos?
  • ¿Qué nodo tiene prioridad y qué partición?
  • Si todos los nodos se ejecutan en paralelo, ¿quién se imprimirá primero?
  • ¿Cómo se creará esta cola de impresión?

Esas son demasiadas preguntas, por lo tanto, los diseñadores / mantenedores de apache-spark decidieron lógicamente abandonar cualquier soporte para print declaraciones dentro de cualquier operación de map-reduce (esto incluye accumulators e incluso variables de broadcast ).

Esto también tiene sentido porque Spark es un lenguaje diseñado para conjuntos de datos muy grandes. Si bien la impresión puede ser útil para probar y depurar, ¡no querrá imprimir cada línea de un DataFrame o RDD porque están diseñados para tener millones o miles de millones de filas! Entonces, ¿por qué lidiar con estas preguntas complicadas cuando ni siquiera quieres imprimir en primer lugar?

Para probar esto, puede ejecutar este código scala, por ejemplo:

// Let''s create a simple RDD val rdd = sc.parallelize(1 to 10000) def printStuff(x:Int):Int = { println(x) x + 1 } // It doesn''t print anything! because of a logic design limitation! rdd.map(printStuff) // But you can print the RDD by doing the following: rdd.take(10).foreach(println)


Pude solucionarlo haciendo una función de utilidad:

object PrintUtiltity { def print(data:String) = { println(data) } }