Registro de Apache Spark dentro de Scala
logging apache-spark (6)
Aquí está mi solución:
Estoy usando SLF4j (con enlace Log4j), en mi clase base de cada trabajo de chispa tengo algo como esto:
import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass)
Justo antes del lugar donde uso
LOG
en el código funcional distribuido, copio la referencia del registrador a una constante local.
val LOG = this.LOG
¡Funcionó para mí!
Estoy buscando una solución para poder registrar datos adicionales al ejecutar código en Apache Spark Nodes que podría ayudar a investigar más adelante algunos problemas que pueden aparecer durante la ejecución.
Intentar utilizar una solución tradicional como, por ejemplo,
com.typesafe.scalalogging.LazyLogging
falla porque la instancia de registro no se puede serializar en un entorno distribuido como Apache Spark.
Investigué este problema y por ahora la solución que encontré fue usar el rasgo
org.apache.spark.Logging
como este:
class SparkExample with Logging {
val someRDD = ...
someRDD.map {
rddElement => logInfo(s"$rddElement will be processed.")
doSomething(rddElement)
}
}
Sin embargo, parece que el rasgo Logging no es una solución permanente para Apache Spark porque está marcado como
@DeveloperApi
y la documentación de la clase menciona:
Es probable que esto se cambie o elimine en futuras versiones.
Me pregunto: ¿son alguna solución de registro conocida que pueda usar y me permitirán registrar datos cuando los RDD se ejecuten en los nodos de Apache Spark?
@Edición posterior : algunos de los comentarios a continuación sugieren utilizar Log4J. Intenté usar Log4J pero todavía tengo problemas al usar el registrador de una clase Scala (y no un objeto Scala). Aquí está mi código completo:
import org.apache.log4j.Logger
import org.apache.spark._
object Main {
def main(args: Array[String]) {
new LoggingTestWithRDD().doTest()
}
}
class LoggingTestWithRDD extends Serializable {
val log = Logger.getLogger(getClass.getName)
def doTest(): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
val spark = new SparkContext(conf)
val someRdd = spark.parallelize(List(1, 2, 3))
someRdd.map {
element =>
log.info(s"$element will be processed")
element + 1
}
spark.stop()
}
}
La excepción que estoy viendo es:
Excepción en el hilo "main" org.apache.spark.SparkException: Tarea no serializable -> Causada por: java.io.NotSerializableException: org.apache.log4j.Logger
Esta es una publicación antigua, pero quiero proporcionar mi solución de trabajo que acabo de obtener después de luchar mucho y que aún puede ser útil para otros:
Quiero imprimir contenido rdd dentro de la función rdd.map pero obtengo el
Task Not Serializalable Error
.
Esta es mi solución para este problema usando el objeto estático scala que está extendiendo
java.io.Serializable
import org.apache.log4j.Level
object MyClass extends Serializable{
val log = org.apache.log4j.LogManager.getLogger("name of my spark log")
log.setLevel(Level.INFO)
def main(args:Array[String])
{
rdd.map(t=>
//Using object''s logger here
val log =MyClass.log
log.INFO("count"+rdd.count)
)
}
}
Puede usar la solución de Akhil propuesta en
https://www.mail-archive.com/[email protected]/msg29010.html
.
Lo he usado solo y funciona.
Akhil Das Mon, 25 de mayo de 2015 08:20:40 -0700
Intenta de esta manera:
object Holder extends Serializable { @transient lazy val log = Logger.getLogger(getClass.getName) } val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element => Holder.log.info(element) }
Si necesita ejecutar algún código antes y después de un
map
,
filter
u otra función
RDD
, intente usar
mapPartition
, donde el iterador subyacente se pasa explícitamente.
Ejemplo:
val log = ??? // this gets captured and produced serialization error
rdd.map { x =>
log.info(x)
x+1
}
Se convierte en:
rdd.mapPartition { it =>
val log = ??? // this is freshly initialized in worker nodes
it.map { x =>
log.info(x)
x + 1
}
}
Cada función básica de
RDD
siempre se implementa con un
mapPartition
.
Asegúrese de manejar el particionador explícitamente y de no perderlo: consulte Scaladoc, parámetro de
preservesPartitioning
particiones, esto es fundamental para las actuaciones.
Utilice Log4j 2.x. El registrador central se ha hecho serializable. Problema resuelto.
Discusión de Jira: https://issues.apache.org/jira/browse/LOG4J2-801
"org.apache.logging.log4j" % "log4j-api" % "2.x.x"
"org.apache.logging.log4j" % "log4j-core" % "2.x.x"
"org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"
val log = Logger.getLogger(getClass.getName),
Puede usar "log" para escribir registros. Además, si necesita cambiar las propiedades del registrador, debe tener log4j.properties en la carpeta / conf. Por defecto tendremos una plantilla en esa ubicación.