HCatalog - Reader Writer

HCatalog contiene una API de transferencia de datos para entrada y salida en paralelo sin usar MapReduce. Esta API utiliza una abstracción de almacenamiento básica de tablas y filas para leer datos del clúster de Hadoop y escribir datos en él.

La API de transferencia de datos contiene principalmente tres clases; esos son -

  • HCatReader - Lee datos de un clúster de Hadoop.

  • HCatWriter - Escribe datos en un clúster de Hadoop.

  • DataTransferFactory - Genera instancias de lector y escritor.

Esta API es adecuada para la configuración del nodo maestro-esclavo. Discutamos más sobreHCatReader y HCatWriter.

HCatReader

HCatReader es una clase abstracta interna de HCatalog y abstrae las complejidades del sistema subyacente desde donde se recuperarán los registros.

No Señor. Nombre y descripción del método
1

Public abstract ReaderContext prepareRead() throws HCatException

Esto debe llamarse en el nodo maestro para obtener ReaderContext que luego debe serializarse y enviarse a los nodos esclavos.

2

Public abstract Iterator <HCatRecorder> read() throws HCaException

Esto se debe llamar en los nodos esclavos para leer HCatRecords.

3

Public Configuration getConf()

Devolverá el objeto de la clase de configuración.

La clase HCatReader se utiliza para leer los datos de HDFS. La lectura es un proceso de dos pasos en el que el primer paso ocurre en el nodo maestro de un sistema externo. El segundo paso se lleva a cabo en paralelo en múltiples nodos esclavos.

Las lecturas se realizan en un ReadEntity. Antes de comenzar a leer, debe definir una ReadEntity desde la que leer. Esto se puede hacer a través deReadEntity.Builder. Puede especificar un nombre de base de datos, un nombre de tabla, una partición y una cadena de filtro. Por ejemplo

ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.

El fragmento de código anterior define un objeto ReadEntity ("entidad"), que comprende una tabla llamada mytbl en una base de datos llamada mydb, que se puede utilizar para leer todas las filas de esta tabla. Tenga en cuenta que esta tabla debe existir en HCatalog antes del inicio de esta operación.

Después de definir un ReadEntity, obtiene una instancia de HCatReader usando ReadEntity y la configuración del clúster:

HCatReader reader = DataTransferFactory.getHCatReader(entity, config);

El siguiente paso es obtener un ReaderContext del lector de la siguiente manera:

ReaderContext cntxt = reader.prepareRead();

HCatWriter

Esta abstracción es interna de HCatalog. Esto es para facilitar la escritura en HCatalog desde sistemas externos. No intente crear una instancia de esto directamente. En su lugar, utilice DataTransferFactory.

No Señor. Nombre y descripción del método
1

Public abstract WriterContext prepareRead() throws HCatException

El sistema externo debe invocar este método exactamente una vez desde un nodo maestro. Devuelve unWriterContext. Esto debe serializarse y enviarse a los nodos esclavos para construirHCatWriter allí.

2

Public abstract void write(Iterator<HCatRecord> recordItr) throws HCaException

Este método debe usarse en los nodos esclavos para realizar escrituras. RecordItr es un objeto iterador que contiene la colección de registros que se escribirán en HCatalog.

3

Public abstract void abort(WriterContext cntxt) throws HCatException

Este método debe llamarse en el nodo principal. El propósito principal de este método es realizar limpiezas en caso de fallas.

4

public abstract void commit(WriterContext cntxt) throws HCatException

Este método debe llamarse en el nodo principal. El propósito de este método es confirmar los metadatos.

Similar a la lectura, la escritura también es un proceso de dos pasos en el que el primer paso ocurre en el nodo maestro. Posteriormente, el segundo paso ocurre en paralelo en los nodos esclavos.

Las escrituras se realizan en un WriteEntity que se puede construir de una manera similar a las lecturas:

WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();

El código anterior crea un objeto WriteEntity entityque se puede usar para escribir en una tabla llamadamytbl en la base de datos mydb.

Después de crear WriteEntity, el siguiente paso es obtener un WriterContext -

HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();

Todos los pasos anteriores ocurren en el nodo principal. El nodo maestro luego serializa el objeto WriterContext y lo pone a disposición de todos los esclavos.

En los nodos esclavos, debe obtener un HCatWriter usando WriterContext de la siguiente manera:

HCatWriter writer = DataTransferFactory.getHCatWriter(context);

Entonces la writertoma un iterador como argumento para el writemétodo -

writer.write(hCatRecordItr);

los writer luego llama getNext() en este iterador en un bucle y escribe todos los registros adjuntos al iterador.

los TestReaderWriter.javaEl archivo se utiliza para probar las clases HCatreader y HCatWriter. El siguiente programa demuestra cómo usar HCatReader y la API de HCatWriter para leer datos de un archivo fuente y luego escribirlos en un archivo de destino.

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;

import org.junit.Assert;
import org.junit.Test;

public class TestReaderWriter extends HCatBaseTest {
   @Test
   public void test() throws MetaException, CommandNeedRetryException,
      IOException, ClassNotFoundException {
		
      driver.run("drop table mytbl");
      driver.run("create table mytbl (a string, b int)");
		
      Iterator<Entry<String, String>> itr = hiveConf.iterator();
      Map<String, String> map = new HashMap<String, String>();
		
      while (itr.hasNext()) {
         Entry<String, String> kv = itr.next();
         map.put(kv.getKey(), kv.getValue());
      }
		
      WriterContext cntxt = runsInMaster(map);
      File writeCntxtFile = File.createTempFile("hcat-write", "temp");
      writeCntxtFile.deleteOnExit();
		
      // Serialize context.
      ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
      oos.writeObject(cntxt);
      oos.flush();
      oos.close();
		
      // Now, deserialize it.
      ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
      cntxt = (WriterContext) ois.readObject();
      ois.close();
      runsInSlave(cntxt);
      commit(map, true, cntxt);
		
      ReaderContext readCntxt = runsInMaster(map, false);
      File readCntxtFile = File.createTempFile("hcat-read", "temp");
      readCntxtFile.deleteOnExit();
      oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
      oos.writeObject(readCntxt);
      oos.flush();
      oos.close();
		
      ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
      readCntxt = (ReaderContext) ois.readObject();
      ois.close();
		
      for (int i = 0; i < readCntxt.numSplits(); i++) {
         runsInSlave(readCntxt, i);
      }
   }
	
   private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
		
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
      WriterContext info = writer.prepareWrite();
      return info;
   }
	
   private ReaderContext runsInMaster(Map<String, String> config, 
      boolean bogus) throws HCatException {
      ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
      HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
      ReaderContext cntxt = reader.prepareRead();
      return cntxt;
   }
	
   private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
      HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
      Iterator<HCatRecord> itr = reader.read();
      int i = 1;
		
      while (itr.hasNext()) {
         HCatRecord read = itr.next();
         HCatRecord written = getRecord(i++);
			
         // Argh, HCatRecord doesnt implement equals()
         Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
         written.get(0).equals(read.get(0)));
			
         Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
         written.get(1).equals(read.get(1)));
			
         Assert.assertEquals(2, read.size());
      }
		
      //Assert.assertFalse(itr.hasNext());
   }
	
   private void runsInSlave(WriterContext context) throws HCatException {
      HCatWriter writer = DataTransferFactory.getHCatWriter(context);
      writer.write(new HCatRecordItr());
   }
	
   private void commit(Map<String, String> config, boolean status,
      WriterContext context) throws IOException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
		
      if (status) {
         writer.commit(context);
      } else {
         writer.abort(context);
      }
   }
	
   private static HCatRecord getRecord(int i) {
      List<Object> list = new ArrayList<Object>(2);
      list.add("Row #: " + i);
      list.add(i);
      return new DefaultHCatRecord(list);
   }
	
   private static class HCatRecordItr implements Iterator<HCatRecord> {
      int i = 0;
		
      @Override
      public boolean hasNext() {
         return i++ < 100 ? true : false;
      }
		
      @Override
      public HCatRecord next() {
         return getRecord(i);
      }
		
      @Override
      public void remove() {
         throw new RuntimeException();
      }
   }
}

El programa anterior lee los datos del HDFS en forma de registros y escribe los datos del registro en mytable