python hadoop logging mapreduce mrjob

python - mrjob: configuración de inicio de sesión en EMR



hadoop logging (1)

De todas las opciones, la única que realmente funciona es usar stderr con una escritura directa ( sys.stderr.write ) o usar un registrador con un StreamHandler a stderr.

Los registros se pueden recuperar después de que el trabajo finalice (con éxito o con un error) desde:

[s3_log_uri] / [jobflow-id] / task-attempts / [job-id] / [attempt-id] / stderr

Asegúrese de mantener los registros en su configuración runners.emr.cleanup .

Estoy tratando de usar mrjob para ejecutar hadoop en EMR, y no puedo encontrar la forma de configurar el registro (registros generados por el usuario en los pasos de mapa / reducción) para poder acceder a ellos una vez que finaliza el clúster.

He intentado configurar el registro utilizando el módulo de logging , print y sys.stderr.write() pero sin suerte hasta el momento. La única opción que funciona para mí es escribir los registros en un archivo, luego SSH la máquina y leerlo, pero es engorroso. Me gustaría que mis registros vayan a stderr / stdout / syslog y se recopilen automáticamente en S3, de modo que pueda verlos una vez que finalice el clúster.

Aquí está el ejemplo de word_freq con el registro:

"""The classic MapReduce job: count the frequency of words. """ from mrjob.job import MRJob import re import logging import logging.handlers import sys WORD_RE = re.compile(r"[/w'']+") class MRWordFreqCount(MRJob): def mapper_init(self): self.logger = logging.getLogger() self.logger.setLevel(logging.INFO) self.logger.addHandler(logging.FileHandler("/tmp/mr.log")) self.logger.addHandler(logging.StreamHandler()) self.logger.addHandler(logging.StreamHandler(sys.stdout)) self.logger.addHandler(logging.handlers.SysLogHandler()) def mapper(self, _, line): self.logger.info("Test logging: %s", line) sys.stderr.write("Test stderr: %s/n" % line) print "Test print: %s" % line for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner(self, word, counts): yield (word, sum(counts)) def reducer(self, word, counts): yield (word, sum(counts)) if __name__ == ''__main__'': MRWordFreqCount.run()