scala logging apache-spark

Registro de Apache Spark dentro de Scala



logging apache-spark (6)

Aquí está mi solución:

Estoy usando SLF4j (con enlace Log4j), en mi clase base de cada trabajo de chispa tengo algo como esto:

import org.slf4j.LoggerFactory val LOG = LoggerFactory.getLogger(getClass)

Justo antes del lugar donde uso LOG en el código funcional distribuido, copio la referencia del registrador a una constante local.

val LOG = this.LOG

¡Funcionó para mí!

Estoy buscando una solución para poder registrar datos adicionales al ejecutar código en Apache Spark Nodes que podría ayudar a investigar más adelante algunos problemas que pueden aparecer durante la ejecución. Intentar utilizar una solución tradicional como, por ejemplo, com.typesafe.scalalogging.LazyLogging falla porque la instancia de registro no se puede serializar en un entorno distribuido como Apache Spark.

Investigué este problema y por ahora la solución que encontré fue usar el rasgo org.apache.spark.Logging como este:

class SparkExample with Logging { val someRDD = ... someRDD.map { rddElement => logInfo(s"$rddElement will be processed.") doSomething(rddElement) } }

Sin embargo, parece que el rasgo Logging no es una solución permanente para Apache Spark porque está marcado como @DeveloperApi y la documentación de la clase menciona:

Es probable que esto se cambie o elimine en futuras versiones.

Me pregunto: ¿son alguna solución de registro conocida que pueda usar y me permitirán registrar datos cuando los RDD se ejecuten en los nodos de Apache Spark?

@Edición posterior : algunos de los comentarios a continuación sugieren utilizar Log4J. Intenté usar Log4J pero todavía tengo problemas al usar el registrador de una clase Scala (y no un objeto Scala). Aquí está mi código completo:

import org.apache.log4j.Logger import org.apache.spark._ object Main { def main(args: Array[String]) { new LoggingTestWithRDD().doTest() } } class LoggingTestWithRDD extends Serializable { val log = Logger.getLogger(getClass.getName) def doTest(): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest") val spark = new SparkContext(conf) val someRdd = spark.parallelize(List(1, 2, 3)) someRdd.map { element => log.info(s"$element will be processed") element + 1 } spark.stop() }

}

La excepción que estoy viendo es:

Excepción en el hilo "main" org.apache.spark.SparkException: Tarea no serializable -> Causada por: java.io.NotSerializableException: org.apache.log4j.Logger


Esta es una publicación antigua, pero quiero proporcionar mi solución de trabajo que acabo de obtener después de luchar mucho y que aún puede ser útil para otros:

Quiero imprimir contenido rdd dentro de la función rdd.map pero obtengo el Task Not Serializalable Error . Esta es mi solución para este problema usando el objeto estático scala que está extendiendo java.io.Serializable

import org.apache.log4j.Level object MyClass extends Serializable{ val log = org.apache.log4j.LogManager.getLogger("name of my spark log") log.setLevel(Level.INFO) def main(args:Array[String]) { rdd.map(t=> //Using object''s logger here val log =MyClass.log log.INFO("count"+rdd.count) ) } }


Puede usar la solución de Akhil propuesta en
https://www.mail-archive.com/[email protected]/msg29010.html . Lo he usado solo y funciona.

Akhil Das Mon, 25 de mayo de 2015 08:20:40 -0700
Intenta de esta manera:

object Holder extends Serializable { @transient lazy val log = Logger.getLogger(getClass.getName) } val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element => Holder.log.info(element) }


Si necesita ejecutar algún código antes y después de un map , filter u otra función RDD , intente usar mapPartition , donde el iterador subyacente se pasa explícitamente.

Ejemplo:

val log = ??? // this gets captured and produced serialization error rdd.map { x => log.info(x) x+1 }

Se convierte en:

rdd.mapPartition { it => val log = ??? // this is freshly initialized in worker nodes it.map { x => log.info(x) x + 1 } }

Cada función básica de RDD siempre se implementa con un mapPartition .

Asegúrese de manejar el particionador explícitamente y de no perderlo: consulte Scaladoc, parámetro de preservesPartitioning particiones, esto es fundamental para las actuaciones.


Utilice Log4j 2.x. El registrador central se ha hecho serializable. Problema resuelto.

Discusión de Jira: https://issues.apache.org/jira/browse/LOG4J2-801

"org.apache.logging.log4j" % "log4j-api" % "2.x.x" "org.apache.logging.log4j" % "log4j-core" % "2.x.x" "org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"


val log = Logger.getLogger(getClass.getName),

Puede usar "log" para escribir registros. Además, si necesita cambiar las propiedades del registrador, debe tener log4j.properties en la carpeta / conf. Por defecto tendremos una plantilla en esa ubicación.