HCatalog - Formato de entrada y salida

los HCatInputFormat y HCatOutputFormatLas interfaces se utilizan para leer datos de HDFS y, después del procesamiento, escribir los datos resultantes en HDFS utilizando el trabajo MapReduce. Elaboremos las interfaces de formato de entrada y salida.

HCatInputFormat

los HCatInputFormatse usa con trabajos de MapReduce para leer datos de tablas administradas por HCatalog. HCatInputFormat expone una API MapReduce de Hadoop 0.20 para leer datos como si se hubieran publicado en una tabla.

No Señor. Nombre y descripción del método
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException

Configure las entradas que se utilizarán para el trabajo. Consulta la tienda de metadatos con la especificación de entrada dada y serializa las particiones coincidentes en la configuración del trabajo para las tareas de MapReduce.

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

Configure las entradas que se utilizarán para el trabajo. Consulta la tienda de metadatos con la especificación de entrada dada y serializa las particiones coincidentes en la configuración del trabajo para las tareas de MapReduce.

3

public HCatInputFormat setFilter(String filter)throws IOException

Establezca un filtro en la tabla de entrada.

4

public HCatInputFormat setProperties(Properties properties) throws IOException

Establecer propiedades para el formato de entrada.

los HCatInputFormat API incluye los siguientes métodos:

  • setInput
  • setOutputSchema
  • getTableSchema

Usar HCatInputFormat para leer datos, primero cree una instancia InputJobInfo con la información necesaria de la tabla que se está leyendo y luego llame setInput con el InputJobInfo.

Puedes usar el setOutputSchema método para incluir un projection schema, para especificar los campos de salida. Si no se especifica un esquema, se devolverán todas las columnas de la tabla. Puede utilizar el método getTableSchema para determinar el esquema de tabla para una tabla de entrada especificada.

HCatOutputFormat

HCatOutputFormat se usa con trabajos de MapReduce para escribir datos en tablas administradas por HCatalog. HCatOutputFormat expone una API MapReduce de Hadoop 0.20 para escribir datos en una tabla. Cuando un trabajo de MapReduce usa HCatOutputFormat para escribir la salida, se usa el OutputFormat predeterminado configurado para la tabla y la nueva partición se publica en la tabla una vez que se completa el trabajo.

No Señor. Nombre y descripción del método
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

Configure la información sobre la salida para escribir para el trabajo. Consulta al servidor de metadatos para encontrar el StorageHandler que se utilizará para la tabla. Lanza un error si la partición ya está publicada.

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

Establezca el esquema para los datos que se escriben en la partición. El esquema de tabla se usa de forma predeterminada para la partición si no se llama.

3

public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException

Consiga al escritor de discos para el trabajo. Utiliza el OutputFormat predeterminado de StorageHandler para obtener el escritor de registros.

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

Obtenga el confirmador de salida para este formato de salida. Asegura que la salida se confirme correctamente.

los HCatOutputFormat API incluye los siguientes métodos:

  • setOutput
  • setSchema
  • getTableSchema

La primera llamada en HCatOutputFormat debe ser setOutput; cualquier otra llamada lanzará una excepción diciendo que el formato de salida no está inicializado.

El esquema de los datos que se escriben lo especifica el setSchemamétodo. Debe llamar a este método, proporcionando el esquema de datos que está escribiendo. Si sus datos tienen el mismo esquema que el esquema de la tabla, puede usarHCatOutputFormat.getTableSchema() para obtener el esquema de la tabla y luego pasarlo a setSchema().

Ejemplo

El siguiente programa MapReduce lee datos de una tabla que asume que tiene un número entero en la segunda columna ("columna 1"), y cuenta cuántas instancias de cada valor distinto encuentra. Es decir, hace el equivalente de "select col1, count(*) from $table group by col1;".

Por ejemplo, si los valores de la segunda columna son {1, 1, 1, 3, 3, 5}, el programa producirá la siguiente salida de valores y recuentos:

1, 3
3, 2
5, 1

Echemos ahora un vistazo al código del programa:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
		
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
	
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
			
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
			
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
	
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
		
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
		
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
		
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
	
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

Antes de compilar el programa anterior, debe descargar algunos jars y agregarlos al classpathpara esta aplicación. Necesita descargar todos los archivos de Hive y HCatalog (HCatalog-core-0.5.0.jar, hive-metastore-0.10.0.jar, libthrift-0.7.0.jar, hive-exec-0.10.0.jar, libfb303-0.7.0.jar, jdo2-api-2.3-ec.jar, slf4j-api-1.6.1.jar).

Utilice los siguientes comandos para copiar esos jar archivos de local a HDFS y agregarlos al classpath.

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

Utilice el siguiente comando para compilar y ejecutar el programa dado.

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

Ahora, verifique su directorio de salida (hdfs: user / tmp / hive) para ver la salida (part_0000, part_0001).