python - mrjob: configuración de inicio de sesión en EMR
hadoop logging (1)
De todas las opciones, la única que realmente funciona es usar stderr con una escritura directa ( sys.stderr.write
) o usar un registrador con un StreamHandler a stderr.
Los registros se pueden recuperar después de que el trabajo finalice (con éxito o con un error) desde:
[s3_log_uri] / [jobflow-id] / task-attempts / [job-id] / [attempt-id] / stderr
Asegúrese de mantener los registros en su configuración runners.emr.cleanup
.
Estoy tratando de usar mrjob para ejecutar hadoop en EMR, y no puedo encontrar la forma de configurar el registro (registros generados por el usuario en los pasos de mapa / reducción) para poder acceder a ellos una vez que finaliza el clúster.
He intentado configurar el registro utilizando el módulo de logging
, print
y sys.stderr.write()
pero sin suerte hasta el momento. La única opción que funciona para mí es escribir los registros en un archivo, luego SSH la máquina y leerlo, pero es engorroso. Me gustaría que mis registros vayan a stderr / stdout / syslog y se recopilen automáticamente en S3, de modo que pueda verlos una vez que finalice el clúster.
Aquí está el ejemplo de word_freq con el registro:
"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re
import logging
import logging.handlers
import sys
WORD_RE = re.compile(r"[/w'']+")
class MRWordFreqCount(MRJob):
def mapper_init(self):
self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
self.logger.addHandler(logging.FileHandler("/tmp/mr.log"))
self.logger.addHandler(logging.StreamHandler())
self.logger.addHandler(logging.StreamHandler(sys.stdout))
self.logger.addHandler(logging.handlers.SysLogHandler())
def mapper(self, _, line):
self.logger.info("Test logging: %s", line)
sys.stderr.write("Test stderr: %s/n" % line)
print "Test print: %s" % line
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def combiner(self, word, counts):
yield (word, sum(counts))
def reducer(self, word, counts):
yield (word, sum(counts))
if __name__ == ''__main__'':
MRWordFreqCount.run()