tutorial spark org framework examples example and python logging apache-spark

org - spark framework python



¿Cómo me registro desde mi script de Python Spark? (5)

Debe obtener el registrador para la propia chispa, de forma predeterminada, getLogger () devolverá el registrador para su propio módulo. Intenta algo como:

logger = logging.getLogger(''py4j'') logger.info("My test info statement")

También podría ser ''pyspark'' en lugar de ''py4j''.

En caso de que la función que utiliza en su programa de chispa (y que hace algún registro) se defina en el mismo módulo que la función principal, dará un error de serialización.

Esto se explica here y here un ejemplo de la misma persona.

También probé esto en la chispa 1.3.1

EDITAR:

Para cambiar el registro de STDERR a STDOUT, deberá eliminar el StreamHandler actual y agregar uno nuevo.

Encuentre el controlador de flujo existente (esta línea se puede eliminar cuando haya terminado)

print(logger.handlers) # will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]

Probablemente solo habrá uno, pero si no, tendrá que actualizar la posición.

logger.removeHandler(logger.handlers[0])

Agregar nuevo controlador para sys.stdout

import sys # Put at top if not already there sh = logging.StreamHandler(sys.stdout) sh.setLevel(logging.DEBUG) logger.addHandler(sh)

Tengo un programa Python Spark que ejecuto con spark-submit . Quiero poner declaraciones de registro en él.

logging.info("This is an informative message.") logging.debug("This is a debug message.")

Quiero usar el mismo registrador que usa Spark para que los mensajes de registro salgan en el mismo formato y el nivel esté controlado por los mismos archivos de configuración. ¿Cómo hago esto?

He intentado poner las declaraciones de logging en el código y comenzar con un logging.getLogger() . En ambos casos veo los mensajes de registro de Spark pero no los míos. He estado buscando en la documentación de registro de Python , pero no he podido resolverlo a partir de ahí.

No estoy seguro de si esto es algo específico de los scripts enviados a Spark o simplemente no entiendo cómo funciona el registro.


En mi caso, estoy feliz de que mis mensajes de registro se agreguen al stderr de los trabajadores, junto con los mensajes de registro de chispas habituales.

Si eso se adapta a sus necesidades, entonces el truco es redirigir el registrador Python particular a stderr .

Por ejemplo, lo siguiente, inspirado en esta respuesta , funciona bien para mí:

def getlogger(name, level=logging.INFO): import logging import sys logger = logging.getLogger(name) logger.setLevel(level) if logger.handlers: # or else, as I found out, we keep adding handlers and duplicate messages pass else: ch = logging.StreamHandler(sys.stderr) ch.setLevel(level) formatter = logging.Formatter(''%(asctime)s - %(name)s - %(levelname)s - %(message)s'') ch.setFormatter(formatter) logger.addHandler(ch) return logger

Uso:

def tst_log(): logger = getlogger(''my-worker'') logger.debug(''a'') logger.info(''b'') logger.warning(''c'') logger.error(''d'') logger.critical(''e'') ...

Salida (más algunas líneas circundantes para el contexto):

17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB) 2017-05-03 03:25:32,849 - my-worker - INFO - b 2017-05-03 03:25:32,849 - my-worker - WARNING - c 2017-05-03 03:25:32,849 - my-worker - ERROR - d 2017-05-03 03:25:32,849 - my-worker - CRITICAL - e 17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0 17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver


La clave de la interacción pyspark y java log4j es el jvm. Esto a continuación es el código de Python, a la configuración le falta la url, pero se trata del registro.

from pyspark.conf import SparkConf from pyspark.sql import SparkSession my_jars = os.environ.get("SPARK_HOME") myconf = SparkConf() myconf.setMaster("local").setAppName("DB2_Test") myconf.set("spark.jars","%s/jars/log4j-1.2.17.jar" % my_jars) spark = SparkSession/ .builder/ .appName("DB2_Test")/ .config(conf = myconf) / .getOrCreate() Logger= spark._jvm.org.apache.log4j.Logger mylogger = Logger.getLogger(__name__) mylogger.error("some error trace") mylogger.info("some info trace")


Necesitábamos registrar desde los ejecutores , no desde el nodo del controlador. Así que hicimos lo siguiente:

  1. Creamos un /etc/rsyslog.d/spark.conf en todos los nodos (usando un método Bootstrap con Amazon Elastic Map Reduce so that the Core nodes forwarded syslog mensajes so that the Core nodes forwarded syslog local1` al nodo maestro.

  2. En el nodo Maestro, habilitamos los escuchas de syslog UDP y TCP, y lo configuramos para que todos local mensajes local se /var/log/local1.log en /var/log/local1.log .

  3. Creamos un módulo de logging Python Syslog logger en nuestra función de mapa.

  4. Ahora podemos iniciar sesión con logging.info() . ...

Una de las cosas que descubrimos es que la misma partición se procesa simultáneamente en varios ejecutores. Al parecer, Spark hace esto todo el tiempo, cuando tiene recursos adicionales. Esto maneja el caso cuando un ejecutor se retrasa misteriosamente o falla.

El registro en las funciones del map nos ha enseñado mucho sobre cómo funciona Spark.


Puede obtener el registrador del objeto SparkContext:

log4jLogger = sc._jvm.org.apache.log4j LOGGER = log4jLogger.LogManager.getLogger(__name__) LOGGER.info("pyspark script logger initialized")