hadoop hdfs sequencefile

Hadoop HDFS: archivos de secuencia de lectura que se escriben



sequencefile (4)

Estoy usando Hadoop 1.0.3.

Escribo los registros en un archivo de secuencia de Hadoop en HDFS, llamo a syncFS () después de cada grupo de registros, pero nunca cierro el archivo (excepto cuando estoy realizando un rodaje diario).

Lo que quiero garantizar es que el archivo esté disponible para los lectores mientras el archivo se está escribiendo.

Puedo leer los bytes del archivo de secuencia a través de FSDataInputStream, pero si trato de usar SequenceFile.Reader.next (clave, val), devuelve falso en la primera llamada.

Sé que los datos están en el archivo porque puedo leerlos con FSDataInputStream o con el comando cat y estoy 100% seguro de que se llama a syncFS ().

Revisé los registros namenode y datanode, sin error ni advertencia.

¿Por qué SequenceFile.Reader no puede leer mi archivo actualmente escrito?


No puede asegurarse de que una lectura se escriba completamente en el disco en el lado del nodo de datos. Puede ver esto en la documentación de DFSClient#DFSOutputStream.sync() que indica:

All data is written out to datanodes. It is not guaranteed that data has been flushed to persistent store on the datanode. Block allocations are persisted on namenode.

Por lo tanto, básicamente actualiza el mapa de bloques del namenode con la información actual y envía los datos al nodo de datos. Como no puede vaciar los datos en el disco en el nodo de datos, pero lee directamente desde el nodo de datos, alcanza un marco de tiempo donde los datos están almacenados en un búfer y no son accesibles. Por lo tanto, el lector de archivos de secuencia pensará que el flujo de datos está terminado (o vacío) y no puede leer los bytes adicionales que devuelven falso al proceso de deserialización.

Un nodo de datos escribe los datos en el disco (está escrito de antemano, pero no se puede leer desde el exterior) si el bloque se recibe por completo. Entonces, puede leer el archivo una vez que se ha alcanzado su tamaño de bloque o su archivo se ha cerrado de antemano y, por lo tanto, ha finalizado un bloque. Lo cual tiene mucho sentido en un entorno distribuido, ya que su escritor puede morir y no terminar un bloque correctamente, esto es una cuestión de consistencia.

Entonces, la solución sería reducir el tamaño del bloque para que el bloque se termine más seguido. Pero eso no es tan eficiente y espero que quede claro que su requisito no es adecuado para HDFS.


El motivo por el que SequenceFile.Reader no puede leer un archivo que se está escribiendo es que usa la longitud del archivo para realizar su magia.

La longitud del archivo permanece en 0 mientras se está escribiendo el primer bloque, y se actualiza solo cuando el bloque está lleno (de forma predeterminada, 64 MB). Luego, el tamaño del archivo queda bloqueado en 64MB hasta que el segundo bloque esté completamente escrito y así sucesivamente ...

Eso significa que no puede leer el último bloque incompleto en un archivo de secuencia usando SequenceFile.Reader, incluso si los datos brutos son legibles usando FSInputStream directamente.

Cerrar el archivo también corrige la longitud del archivo, pero en mi caso necesito leer los archivos antes de que se cierren.



Así que llegué al mismo problema y después de un poco de investigación y tiempo, pensé que la siguiente solución alternativa funciona.

Entonces, el problema se debe a la implementación interna de la creación del archivo de secuencia y al hecho de que está utilizando la longitud del archivo que se actualiza por bloque de 64 MB.

Así que creé la siguiente clase para crear el lector y envolví el hadoop FS con el mío, mientras que sobreescribí el método get get para devolver la longitud del archivo:

public class SequenceFileUtil { public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException { WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf)); return new SequenceFile.Reader(fileSystem, path, conf); } private class WrappedFileSystem extends FileSystem { private final FileSystem nestedFs; public WrappedFileSystem(FileSystem fs){ this.nestedFs = fs; } @Override public URI getUri() { return nestedFs.getUri(); } @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { return nestedFs.open(f,bufferSize); } @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress); } @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { return nestedFs.append(f, bufferSize, progress); } @Override public boolean rename(Path src, Path dst) throws IOException { return nestedFs.rename(src, dst); } @Override public boolean delete(Path path) throws IOException { return nestedFs.delete(path); } @Override public boolean delete(Path f, boolean recursive) throws IOException { return nestedFs.delete(f, recursive); } @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { return nestedFs.listStatus(f); } @Override public void setWorkingDirectory(Path new_dir) { nestedFs.setWorkingDirectory(new_dir); } @Override public Path getWorkingDirectory() { return nestedFs.getWorkingDirectory(); } @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { return nestedFs.mkdirs(f, permission); } @Override public FileStatus getFileStatus(Path f) throws IOException { return nestedFs.getFileStatus(f); } @Override public long getLength(Path f) throws IOException { DFSClient.DFSInputStream open = new DFSClient(nestedFs.getConf()).open(f.toUri().getPath()); long fileLength = open.getFileLength(); long length = nestedFs.getLength(f); if (length < fileLength){ //We might have uncompleted blocks return fileLength; } return length; } } }