una tabla example create crear consultas como java hadoop mapreduce hbase

java - tabla - hbase example



¿Cuál es la forma más rápida de cargar en masa datos en HBase programáticamente? (2)

Tengo un archivo de texto sin formato con posiblemente millones de líneas que necesita un análisis personalizado y quiero cargarlo en una tabla HBase lo más rápido posible (utilizando el cliente Java Hadoop o HBase).

Mi solución actual se basa en un trabajo de MapReduce sin la parte Reducir. Uso FileInputFormat para leer el archivo de texto de modo que cada línea pase al método de map de mi clase Mapper . En este punto, la línea se analiza para formar un objeto Put que se escribe en el context . Luego, TableOutputFormat toma el objeto Put y lo inserta en la tabla.

Esta solución produce una tasa de inserción promedio de 1,000 filas por segundo, que es inferior a la que esperaba. Mi configuración de HBase está en modo pseudo distribuido en un solo servidor.

Una cosa interesante es que durante la inserción de 1,000,000 de filas, se generan 25 Mappers (tareas) pero se ejecutan en serie (uno tras otro); ¿esto es normal?

Aquí está el código para mi solución actual:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { protected void map(LongWritable key, Text value, Context context) throws IOException { Map<String, String> parsedLine = parseLine(value.toString()); Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1]))); for (String currentKey : parsedLine.keySet()) { row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey))); } try { context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public int run(String[] args) throws Exception { if (args.length != 2) { return -1; } conf.set("hbase.mapred.outputtable", args[1]); // I got these conf parameters from a presentation about Bulk Load conf.set("hbase.hstore.blockingStoreFiles", "25"); conf.set("hbase.hregion.memstore.block.multiplier", "8"); conf.set("hbase.regionserver.handler.count", "30"); conf.set("hbase.regions.percheckin", "30"); conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3"); conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15"); Job job = new Job(conf); job.setJarByClass(BulkLoadMapReduce.class); job.setJobName(NAME); TextInputFormat.setInputPaths(job, new Path(args[0])); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(CustomMap.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); job.setNumReduceTasks(0); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { Long startTime = Calendar.getInstance().getTimeInMillis(); System.out.println("Start time : " + startTime); int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args); Long endTime = Calendar.getInstance().getTimeInMillis(); System.out.println("End time : " + endTime); System.out.println("Duration milliseconds: " + (endTime-startTime)); System.exit(errCode); }


Una cosa interesante es que durante la inserción de 1,000,000 de filas, se generan 25 Mappers (tareas) pero se ejecutan en serie (uno tras otro); ¿esto es normal?

mapreduce.tasktracker.map.tasks.maximum parámetro mapreduce.tasktracker.map.tasks.maximum que se establece de manera predeterminada en 2 determina el número máximo de tareas que pueden ejecutarse en paralelo en un nodo. A menos que se modifique, debería ver 2 tareas de mapas que se ejecutan simultáneamente en cada nodo.


He pasado por un proceso que probablemente sea muy similar al tuyo al intentar encontrar una forma eficiente de cargar datos desde un MR a HBase. Lo que encontré para trabajar es usar HFileOutputFormat como OutputFormatClass del MR.

A continuación, se encuentra la base de mi código que tengo para generar el job y la función de map del asignador que escribe los datos. Esto fue rápido. Ya no lo usamos, así que no tengo números disponibles, pero fue de alrededor de 2.5 millones de registros en menos de un minuto.

Aquí está la función (reducida) que escribí para generar el trabajo para mi proceso MapReduce para poner datos en HBase

private Job createCubeJob(...) { //Build and Configure Job Job job = new Job(conf); job.setJobName(jobName); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper job.setJarByClass(CubeBuilderDriver.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); TextInputFormat.setInputPaths(job, hiveOutputDir); HFileOutputFormat.setOutputPath(job, cubeOutputPath); Configuration hConf = HBaseConfiguration.create(conf); hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort); HTable hTable = new HTable(hConf, tableName); HFileOutputFormat.configureIncrementalLoad(job, hTable); return job; }

Esta es mi función de mapa de la clase HiveToHBaseMapper (ligeramente editada).

public void map(WritableComparable key, Writable val, Context context) throws IOException, InterruptedException { try{ Configuration config = context.getConfiguration(); String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR); String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY); String column = strs[COLUMN_INDEX]; String Value = strs[VALUE_INDEX]; String sKey = generateKey(strs, config); byte[] bKey = Bytes.toBytes(sKey); Put put = new Put(bKey); put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) ? Bytes.toBytes(Double.MIN_VALUE) : Bytes.toBytes(value)); ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey); context.write(ibKey, put); context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1); } catch(Exception e){ context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1); } }

Estoy bastante seguro de que esta no será una solución para copiar y pegar. Obviamente, los datos con los que estaba trabajando aquí no necesitaron ningún procesamiento personalizado (esto se hizo en un trabajo de MR antes de este). Lo principal que quiero proporcionar de esto es el HFileOutputFormat . El resto es solo un ejemplo de cómo lo usé. :)
Espero que te lleve por un camino sólido hacia una buena solución. :