scala - Spark perder println() en stdout
apache-spark accumulator (2)
Tengo el siguiente código:
val blueCount = sc.accumulator[Long](0)
val output = input.map { data =>
for (value <- data.getValues()) {
if (record.getEnum() == DataEnum.BLUE) {
blueCount += 1
println("Enum = BLUE : " + value.toString()
}
}
data
}.persist(StorageLevel.MEMORY_ONLY_SER)
output.saveAsTextFile("myOutput")
Entonces blueCount no es cero, ¡pero no obtuve salida println ()! ¿Me estoy perdiendo algo aquí? ¡Gracias!
Esta es una pregunta conceptual ...
Imagine que tiene un grupo grande, compuesto por muchos trabajadores, digamos
n
trabajadores y esos trabajadores almacenan una partición de un
RDD
o
DataFrame
, imagine que comienza una tarea de
map
través de esos datos, y dentro de ese
map
tiene una declaración
print
, en primer lugar :
- ¿Dónde se imprimirán esos datos?
- ¿Qué nodo tiene prioridad y qué partición?
- Si todos los nodos se ejecutan en paralelo, ¿quién se imprimirá primero?
- ¿Cómo se creará esta cola de impresión?
Esas son demasiadas preguntas, por lo tanto, los diseñadores / mantenedores de
apache-spark
decidieron lógicamente abandonar cualquier soporte para
print
declaraciones dentro de cualquier operación de
map-reduce
(esto incluye
accumulators
e incluso variables de
broadcast
).
Esto también tiene sentido porque Spark es un lenguaje diseñado para conjuntos de datos muy grandes. Si bien la impresión puede ser útil para probar y depurar, ¡no querrá imprimir cada línea de un DataFrame o RDD porque están diseñados para tener millones o miles de millones de filas! Entonces, ¿por qué lidiar con estas preguntas complicadas cuando ni siquiera quieres imprimir en primer lugar?
Para probar esto, puede ejecutar este código scala, por ejemplo:
// Let''s create a simple RDD
val rdd = sc.parallelize(1 to 10000)
def printStuff(x:Int):Int = {
println(x)
x + 1
}
// It doesn''t print anything! because of a logic design limitation!
rdd.map(printStuff)
// But you can print the RDD by doing the following:
rdd.take(10).foreach(println)
Pude solucionarlo haciendo una función de utilidad:
object PrintUtiltity {
def print(data:String) = {
println(data)
}
}