tutorial multinodo instalar for español dummies configurar como cluster hadoop split mapreduce block hdfs

multinodo - hadoop tutorial español pdf



¿Cómo se dividen los registros de proceso de Hadoop a través de los límites del bloque? (6)

Desde el código fuente de hadoop de LineRecordReader.java el constructor: encuentro algunos comentarios:

// If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start;

de esto creo que hadoop leerá una línea adicional para cada división (al final de la división actual, lea la siguiente línea en la siguiente división), y si no se divide primero, la primera línea se descartará. para que no se pierda ningún registro de línea e incompleto

De acuerdo con Hadoop - The Definitive Guide

Los registros lógicos que FileInputFormats definen generalmente no se ajustan perfectamente a los bloques HDFS. Por ejemplo, los registros lógicos de un TextInputFormat son líneas, que cruzarán los límites de HDFS la mayoría de las veces. Esto no influye en el funcionamiento de su programa (las líneas no se pierden ni se rompen, por ejemplo), pero vale la pena conocerlo, ya que significa que los mapas de datos locales (es decir, los mapas que se ejecutan en el mismo host que su datos de entrada) realizará algunas lecturas remotas. La ligera sobrecarga que esto causa normalmente no es significativa.

Supongamos que una línea de registro se divide en dos bloques (b1 y b2). El mapeador que procesa el primer bloque (b1) notará que la última línea no tiene un separador EOL y obtiene el resto de la línea del próximo bloque de datos (b2).

¿Cómo determina el mapper que procesa el segundo bloque (b2) que el primer registro está incompleto y debe procesar a partir del segundo registro en el bloque (b2)?


Interesante pregunta, pasé algún tiempo mirando el código para los detalles y aquí están mis pensamientos. Los splits son manejados por el cliente mediante InputFormat.getSplits , por lo que un vistazo a FileInputFormat proporciona la siguiente información:

  • Para cada archivo de entrada, obtenga la longitud del archivo, el tamaño del bloque y calcule el tamaño dividido como max(minSize, min(maxSize, blockSize)) donde maxSize corresponde a mapred.max.split.size y minSize es mapred.min.split.size .
  • Divida el archivo en diferentes FileSplit s en función del tamaño de división calculado anteriormente. Lo importante aquí es que cada FileSplit se inicializa con un parámetro de start correspondiente al desplazamiento en el archivo de entrada . Todavía no hay manejo de las líneas en ese punto. La parte relevante del código se ve así:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; }

Después de eso, si observa el LineRecordReader que está definido por TextInputFormat , ahí es donde se manejan las líneas:

  • Cuando inicializa su LineRecordReader , intenta crear una instancia de un LineReader que es una abstracción para poder leer líneas sobre FSDataInputStream . Hay 2 casos:
  • Si hay un CompressionCodec definido, entonces este códec es responsable de manejar los límites. Probablemente no sea relevante para tu pregunta.
  • Sin embargo, si no hay códec, las cosas son interesantes: si el start de InputSplit es diferente de 0, retrocede 1 carácter y luego omite la primera línea que encuentre identificada por / n o / r / n (Windows) . El retroceso es importante porque en caso de que los límites de su línea sean los mismos que los límites de división, esto garantiza que no omita la línea válida. Aquí está el código relevante:

    if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start;

Entonces, dado que las divisiones se calculan en el cliente, los mapeadores no necesitan ejecutarse en secuencia, cada mapeador ya sabe si necesita descartar la primera línea o no.

Así que, básicamente, si tiene 2 líneas de cada 100 Mb en el mismo archivo, y para simplificar, digamos que el tamaño de división es de 64 MB. Luego, cuando se calculan las divisiones de entrada, tendremos la siguiente situación:

  • Dividir 1 que contiene la ruta y los hosts de este bloque. Inicializado al inicio 200-200 = 0Mb, longitud 64Mb.
  • Split 2 inicializado al inicio 200-200 + 64 = 64Mb, longitud 64Mb.
  • Split 3 inicializado al inicio 200-200 + 128 = 128Mb, longitud 64Mb.
  • Split 4 inicializado al inicio 200-200 + 192 = 192Mb, longitud 8Mb.
  • Mapper A procesará la división 1, el inicio es 0, por lo que no omita la primera línea y lea una línea completa que supere el límite de 64Mb, por lo que necesita lectura remota.
  • El mapeador B procesará la división 2, el inicio es! = 0 así que omita la primera línea después de 64Mb-1byte, que corresponde al final de la línea 1 a 100Mb que todavía está en la división 2, tenemos 28Mb de la línea en la división 2, entonces remoto lee los 72Mb restantes.
  • Mapper C procesará la división 3, el inicio es! = 0 así que omita la primera línea después de 128Mb-1byte, que corresponde al final de la línea 2 a 200Mb, que es el final del archivo, así que no haga nada.
  • Mapper D es lo mismo que mapper C, excepto que busca una nueva línea después de 192Mb-1byte.

Lo veo así: InputFormat es responsable de dividir los datos en divisiones lógicas teniendo en cuenta la naturaleza de los datos.
Nada impide que lo haga, aunque puede agregar una latencia significativa al trabajo: toda la lógica y la lectura alrededor de los límites de tamaño de división deseados ocurrirán en el rastreador de trabajos.
El formato de entrada más simple para el registro es TextInputFormat. Está funcionando de la siguiente manera (por lo que entendí del código): el formato de entrada crea divisiones por tamaño, independientemente de las líneas, pero LineRecordReader siempre:
a) Omita la primera línea en la división (o parte de ella), si no es la primera división
b) Lea una línea después del límite de la división al final (si los datos están disponibles, entonces no es la última división).


Los mapeadores no tienen que comunicarse. Los bloques de archivos están en HDFS y el mapeador actual (RecordReader) puede leer el bloque que tiene la parte restante de la línea. Esto sucede detrás de escena.


Por lo que he entendido, cuando FileSplit se inicializa para el primer bloque, se llama al constructor predeterminado. Por lo tanto, los valores para inicio y duración son cero inicialmente. Al final del procesamiento del primer bloque, si la última línea está incompleta, entonces el valor de la longitud será mayor que la longitud de la división y también leerá la primera línea del bloque siguiente. Debido a esto, el valor de inicio para el primer bloque será mayor que cero y bajo esta condición, el LineRecordReader omitirá la primera línea del segundo bloque. (Ver source )

En caso de que se complete la última línea del primer bloque, entonces el valor de la longitud será igual a la longitud del primer bloque y el valor del inicio para el segundo bloque será cero. En ese caso, el LineRecordReader no omitirá la primera línea y leerá el segundo bloque desde el principio.

¿Tiene sentido?


El algoritmo Map Reduece no funciona en bloques físicos del archivo. Funciona en splits de entrada lógica. La división de entrada depende del lugar donde se escribió el registro. Un registro puede abarcar dos Mappers.

La forma en que HDFS se ha configurado, descompone archivos muy grandes en bloques grandes (por ejemplo, mide 128 MB) y almacena tres copias de estos bloques en diferentes nodos del clúster.

HDFS no tiene conocimiento del contenido de estos archivos. Se puede haber iniciado un registro en el Bloque-a, pero el final de ese registro puede estar presente en el Bloque-b .

Para resolver este problema, Hadoop usa una representación lógica de los datos almacenados en bloques de archivos, conocidos como divisiones de entrada. Cuando un cliente de trabajo de MapReduce calcula las divisiones de entrada , se da cuenta de dónde comienza el primer registro completo en un bloque y dónde termina el último registro en el bloque .

El punto clave :

En los casos en que el último registro en un bloque es incompleto, la división de entrada incluye información de ubicación para el próximo bloque y el desplazamiento de bytes de los datos necesarios para completar el registro.

Echa un vistazo al diagrama a continuación.

Eche un vistazo a este article y a la pregunta relacionada con SE: Acerca de la división de archivos Hadoop / HDFS

Se pueden leer más detalles de la documentation

El marco Map-Reduce depende del InputFormat del trabajo para:

  1. Valide la especificación de entrada del trabajo.
  2. Separe los archivos de entrada en InputSplits lógicos, cada uno de los cuales se asignará a un Mapper individual.
  3. Cada InputSplit se asigna a un Mapper individual para su procesamiento. Split podría ser una tupla . InputSplit[] getSplits(JobConf job,int numSplits ) es la API para encargarse de estas cosas.

FileInputFormat , que amplía el InputFormat getSplits () de InputFormat implementado. Eche un vistazo a las grepcode internas de este método en grepcode