topandas structfield multiple drop columns approxquantile python apache-spark log4j pyspark

python - structfield - Registro PySpark del ejecutor



pyspark topandas (2)

¿Cuál es la forma correcta de acceder al registrador log4j de Spark usando pyspark en un ejecutor?

Es fácil hacerlo en el controlador, pero parece que no puedo entender cómo acceder a las funcionalidades de registro en el ejecutor para poder iniciar sesión localmente y dejar que YARN recopile los registros locales.

¿Hay alguna forma de acceder al registrador local?

El procedimiento de registro estándar no es suficiente porque no puedo acceder al contexto de chispa del ejecutor.


No puede usar el logger log4j local en ejecutores. Trabajadores de Python generados por ejecutores jvms no tiene una conexión de "devolución de llamada" a Java, solo reciben comandos. Pero hay una forma de iniciar sesión desde ejecutores utilizando el registro de python estándar y capturarlos mediante YARN.

En usted, HDFS coloca el archivo del módulo de Python que configura el registro una vez por trabajador python y las funciones de registro de proxies ( logger.py nombre logger.py ):

import os import logging import sys class YarnLogger: @staticmethod def setup_logger(): if not ''LOG_DIRS'' in os.environ: sys.stderr.write(''Missing LOG_DIRS environment variable, pyspark logging disabled'') return file = os.environ[''LOG_DIRS''].split('','')[0] + ''/pyspark.log'' logging.basicConfig(filename=file, level=logging.INFO, format=''%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s'') def __getattr__(self, key): return getattr(logging, key) YarnLogger.setup_logger()

Luego importe este módulo dentro de su aplicación:

spark.sparkContext.addPyFile(''hdfs:///path/to/logger.py'') import logger logger = logger.YarnLogger()

Y puede usar dentro de sus funciones de pyspark como la biblioteca de registro normal:

def map_sth(s): logger.info("Mapping " + str(s)) return s spark.range(10).rdd.map(map_sth).count()

El pyspark.log estará visible en el administrador de recursos y se recopilará al finalizar la aplicación, para que pueda acceder a estos registros más tarde con yarn logs -applicationId ....


Tenga en cuenta que la respuesta de Mariusz devuelve un proxy al módulo de registro. Esto funciona (upvoted) cuando sus demandas de registro son muy básicas. Una vez que esté interesado en hacer cosas como configurar múltiples instancias de registrador o usar múltiples controladores, faltará. Por ejemplo, si tiene un conjunto de código más grande que solo desea ejecutar al depurar, una de las soluciones sería verificar el método isEnabledFor una instancia del isEnabledFor , así:

logger = logging.getLogger(__name__) if logger.isEnabledFor(logging.DEBUG): # do some heavy calculations and call `logger.debug` (or any other logging method, really)

Esto fallaría cuando se invoca el método en el módulo de registro, como en la respuesta de Mariusz, porque el módulo de registro no tiene dicho atributo.

Una forma de resolver esto sería crear un módulo spark_logging.py en el que configure el registro y devuelva una nueva instancia de Logger . El siguiente código muestra un ejemplo de esto, que configura el registro usando dictConfig . También agrega un filtro para que el número de repeticiones de todos los nodos del trabajador se reduzca considerablemente cuando se utiliza el registrador de raíz (el ejemplo del filtro es de Christopher Dunn ( ref )).

# spark_logging.py import logging import logging.config import os import tempfile from logging import * # gives access to logging.DEBUG etc by aliasing this module for the standard logging module class Unique(logging.Filter): """Messages are allowed through just once. The ''message'' includes substitutions, but is not formatted by the handler. If it were, then practically all messages would be unique! """ def __init__(self, name=""): logging.Filter.__init__(self, name) self.reset() def reset(self): """Act as if nothing has happened.""" self.__logged = {} def filter(self, rec): """logging.Filter.filter performs an extra filter on the name.""" return logging.Filter.filter(self, rec) and self.__is_first_time(rec) def __is_first_time(self, rec): """Emit a message only once.""" msg = rec.msg %(rec.args) if msg in self.__logged: self.__logged[msg] += 1 return False else: self.__logged[msg] = 1 return True def getLogger(name, logfile="pyspark.log"): """Replaces getLogger from logging to ensure each worker configures logging locally.""" try: logfile = os.path.join(os.environ[''LOG_DIRS''].split('','')[0], logfile) except (KeyError, IndexError): tmpdir = tempfile.gettempdir() logfile = os.path.join(tmpdir, logfile) rootlogger = logging.getLogger("") rootlogger.addFilter(Unique()) rootlogger.warning( "LOG_DIRS not in environment variables or is empty. Will log to {}." .format(logfile)) # Alternatively, load log settings from YAML or use JSON. log_settings = { ''version'': 1, ''disable_existing_loggers'': False, ''handlers'': { ''file'': { ''class'': ''logging.FileHandler'', ''level'': ''DEBUG'', ''formatter'': ''detailed'', ''filename'': logfile }, ''default'': { ''level'': ''INFO'', ''class'': ''logging.StreamHandler'', }, }, ''formatters'': { ''detailed'': { ''format'': ("%(asctime)s.%(msecs)03d %(levelname)s %(module)s - " "%(funcName)s: %(message)s"), }, }, ''loggers'': { ''driver'': { ''level'': ''INFO'', ''handlers'': [''file'', ] }, ''executor'': { ''level'': ''DEBUG'', ''handlers'': [''file'', ] }, } } logging.config.dictConfig(log_settings) return logging.getLogger(name)

A continuación, puede importar este módulo y alias para logging :

from pyspark.sql import SparkSession spark = SparkSession / .builder / .appName("Test logging") / .getOrCreate() try: spark.sparkContext.addPyFile(''s3://YOUR_BUCKET/spark_logging.py'') except: # Probably running this locally. Make sure to have spark_logging in the PYTHONPATH pass finally: import spark_logging as logging def map_sth(s): log3 = logging.getLogger("executor") log3.info("Logging from executor") if log3.isEnabledFor(logging.DEBUG): log3.debug("This statement is only logged when DEBUG is configured.") return s def main(): log2 = logging.getLogger("driver") log2.info("Logging from within module function on driver") spark.range(100).rdd.map(map_sth).count() if __name__ == "__main__": log1 = logging.getLogger("driver") log1.info("logging from module level") main()

Al igual que con la respuesta de Mariusz , se podrá acceder a los registros utilizando el administrador de recursos (o volcarlos en su carpeta temporal cuando LOG_DIRS no se encuentre en las variables de su entorno). El manejo de errores hecho en la parte superior de este script se agrega para que pueda ejecutar este script localmente.

Este enfoque permite más libertad: puede hacer que los ejecutores inicien sesión en un archivo y que todo tipo de agregación cuente en el disco en otro archivo.

Tenga en cuenta que hay algo más de trabajo por hacer en este caso, en comparación con el uso de una clase como proxy para el módulo de registro integrado, ya que cada vez que solicite un registrador en las instancias del ejecutor, tendrá que configurarse. Sin embargo, ese no será tu principal problema cuando se analicen los big data. ;-)