scan - Cómo leer desde hbase usando chispa
hbase tutorial español (4)
El código a continuación se leerá desde hbase, luego lo convertirá a estructura json y convertirá a schemaRDD, pero el problema es que estoy using List
para almacenar la cadena json y luego pasar a javaRDD, para datos de aproximadamente 100 GB, el maestro será cargado con datos en la memoria. ¿Cuál es la forma correcta de cargar los datos de hbase y luego realizar la manipulación, luego convertir a JavaRDD.
package hbase_reader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import com.google.common.collect.Lists;
public class hbase_reader {
public static void main(String[] args) throws IOException, ParseException {
List<String> jars = Lists.newArrayList("");
SparkConf spconf = new SparkConf();
spconf.setMaster("local[2]");
spconf.setAppName("HBase");
//spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
spconf.setJars(jars.toArray(new String[jars.size()]));
JavaSparkContext sc = new JavaSparkContext(spconf);
//spconf.set("spark.executor.memory", "1g");
JavaSQLContext jsql = new JavaSQLContext(sc);
HBaseConfiguration conf = new HBaseConfiguration();
String tableName = "HBase.CounData1_Raw_Min1";
HTable table = new HTable(conf,tableName);
try {
ResultScanner scanner = table.getScanner(new Scan());
List<String> jsonList = new ArrayList<String>();
String json = null;
for(Result rowResult:scanner) {
json = "";
String rowKey = Bytes.toString(rowResult.getRow());
for(byte[] s1:rowResult.getMap().keySet()) {
String s1_str = Bytes.toString(s1);
String jsonSame = "";
for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
String s2_str = Bytes.toString(s2);
for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
jsonSame += "/""+s2_str+"/":"+s3_str+",";
}
}
jsonSame = jsonSame.substring(0,jsonSame.length()-1);
json += "/""+s1_str+"/""+":{"+jsonSame+"}"+",";
}
json = json.substring(0,json.length()-1);
json = "{/"RowKey/":/""+rowKey+"/","+json+"}";
jsonList.add(json);
}
JavaRDD<String> jsonRDD = sc.parallelize(jsonList);
JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);
System.out.println(schemaRDD.take(2));
} finally {
table.close();
}
}
}
Dado que la pregunta no es nueva, hay algunas otras alternativas por ahora:
- hbase-spark , un módulo que está disponible directamente en el repositorio HBase
- Spark-on-HBase por Hortonworks
No sé mucho sobre el primer proyecto, pero parece que no es compatible con Spark 2.x. Sin embargo, tiene una gran compatibilidad en el nivel de RDD para Spark 1.6.x.
Spark-on-HBase, por otro lado, tiene sucursales para Spark 2.0 y el próximo Spark 2.1. Este proyecto es muy prometedor ya que se centra en las API de Dataset / DataFrame. Debajo del capó, implementa la API estándar Spark Datasource y aprovecha el motor Spark Catalyst para la optimización de consultas. Los desarrolladores afirman here que es capaz de eliminar particiones, poda de columnas, pushdown de predicados y lograr ubicación de datos.
Un ejemplo simple, que utiliza el com.hortonworks:shc:1.0.0-2.0-s_2.11
de este repo y Spark 2.0.2, se presenta a continuación:
case class Record(col0: Int, col1: Int, col2: Boolean)
val spark = SparkSession
.builder()
.appName("Spark HBase Example")
.master("local[4]")
.getOrCreate()
def catalog =
s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"int"},
|"col2":{"cf":"cf2", "col":"col2", "type":"boolean"}
|}
|}""".stripMargin
val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0))
// write
spark
.createDataFrame(artificialData)
.write
.option(HBaseTableCatalog.tableCatalog, catalog)
.option(HBaseTableCatalog.newTable, "5")
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
// read
val df = spark
.read
.option(HBaseTableCatalog.tableCatalog, catalog)
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
df.count()
Prefiero leer de hbase y hacer la manipulación json todo en chispa.
Spark proporciona la función JavaSparkContext.newAPIHadoopRDD para leer datos del almacenamiento de hadoop, incluido HBase. Tendrá que proporcionar la configuración de HBase, el nombre de la tabla y el escaneo en el parámetro de configuración y el formato de entrada de la tabla y su valor clave
Puede usar la clase de formato de entrada de la tabla y su parámetro de trabajo para proporcionar el nombre de la tabla y la configuración de escaneo
ejemplo:
conf.set(TableInputFormat.INPUT_TABLE, "tablename");
JavaPairRDD<ImmutableBytesWritable, Result> data =
jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
entonces puedes hacer la manipulación json en chispa. Como la chispa puede hacer un recálculo cuando la memoria está llena, solo cargará los datos necesarios para la parte de recálculo (cmiiw) para que no tenga que preocuparse por el tamaño de los datos.
Un ejemplo básico para leer los datos de HBase usando Spark (Scala), también puedes ver esto en Java:
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark._
object HBaseRead {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val tableName = "table1"
System.setProperty("user.name", "hdfs")
System.setProperty("HADOOP_USER_NAME", "hdfs")
conf.set("hbase.master", "localhost:60000")
conf.setInt("timeout", 120000)
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
}
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("Number of Records found : " + hBaseRDD.count())
sc.stop()
}
}
ACTUALIZADO -2016
A partir de Spark 1.0.x +, ahora también puedes usar el Conector Spark-HBase:
Dependencia de Maven para incluir:
<dependency>
<groupId>it.nerdammer.bigdata</groupId>
<artifactId>spark-hbase-connector_2.10</artifactId>
<version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>
Y encuentre un código de muestra a continuación para el mismo:
import org.apache.spark._
import it.nerdammer.spark.hbase._
object HBaseRead extends App {
val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
val sc = new SparkContext(sparkConf)
// For Example If you have an HBase Table as ''Document'' with ColumnFamily ''SMPL'' and qualifier as ''DocID, Title'' then:
val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
.select("DocID", "Title").inColumnFamily("SMPL")
println("Number of Records found : " + docRdd .count())
}
ACTUALIZADO - 2017
A partir de Spark 1.6.x +, ahora también puede usar SHC Connector (usuarios de Hortonworks o HDP):
Dependencia de Maven para incluir:
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc</artifactId>
<version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
</dependency>
La principal ventaja de utilizar este conector es que tiene flexibilidad en la definición del esquema y no necesita parametros codificados como en nerdammer / spark-hbase-connector. Además, recuerde que es compatible con Spark 2.x, por lo que este conector es bastante flexible y brinda soporte de extremo a extremo en problemas y relaciones públicas.
Busque la ruta de depósito siguiente para el último archivo léame y las muestras:
Conector Hortonworks Spark HBase
También puede convertir este RDD a DataFrames y ejecutar SQL sobre él o puede asignar estos Dataset o DataFrames a las clases Java Pojo o Case definidas por el usuario. Funciona genial.
Por favor comente abajo si necesita algo más.
solo para agregar un comentario sobre cómo agregar escaneo:
TableInputFormat tiene los siguientes atributos:
- SCAN_ROW_START
- SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");