spark datacamp python apache-spark hbase pyspark apache-spark-sql

datacamp - hadoop spark python



¿Cómo conectar HBase y Spark usando Python? (1)

Encontré este comentario de uno de los creadores de hbase-spark , que parece sugerir que hay una forma de usar PySpark para consultar HBase usando Spark SQL.

Y, de hecho, el patrón que se describe aquí puede aplicarse a la consulta de HBase con Spark SQL utilizando PySpark, como se muestra en el siguiente ejemplo:

from pyspark import SparkContext from pyspark.sql import SQLContext sc = SparkContext() sqlc = SQLContext(sc) data_source_format = ''org.apache.hadoop.hbase.spark'' df = sc.parallelize([(''a'', ''1.0''), (''b'', ''2.0'')]).toDF(schema=[''col0'', ''col1'']) # ''''.join(string.split()) in order to write a multi-line JSON string here. catalog = ''''.join("""{ "table":{"namespace":"default", "name":"testtable"}, "rowkey":"key", "columns":{ "col0":{"cf":"rowkey", "col":"key", "type":"string"}, "col1":{"cf":"cf", "col":"col1", "type":"string"} } }""".split()) # Writing df.write/ .options(catalog=catalog)/ # alternatively: .option(''catalog'', catalog) .format(data_source_format)/ .save() # Reading df = sqlc.read/ .options(catalog=catalog)/ .format(data_source_format)/ .load()

He intentado hbase-spark-1.2.0-cdh5.7.0.jar (distribuido por Cloudera) para esto, pero hbase-spark-1.2.0-cdh5.7.0.jar problemas ( org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select cuando escritura, java.util.NoSuchElementException: None.get al leer). Como resultado, la versión actual de CDH no incluye los cambios en hbase-spark que permiten la integración de Spark SQL-HBase.

Lo que funciona para mí es el paquete shc Spark, que se encuentra aquí . El único cambio que tuve que hacer en el script anterior es cambiar:

data_source_format = ''org.apache.spark.sql.execution.datasources.hbase''

Así es como presento el script anterior en mi clúster CDH, siguiendo el ejemplo del shc README de shc :

spark-submit --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/ --files /opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml example.py

La mayor parte del trabajo en shc parece shc fusionado ya en el módulo hbase-spark de HBase, para su lanzamiento en la versión 2.0. Con eso, es posible consultar Spark SQL de HBase usando el patrón mencionado anteriormente (ver: https://hbase.apache.org/book.html#_sparksql_dataframes para más detalles). Mi ejemplo anterior muestra cómo se ve para los usuarios de PySpark.

Finalmente, una advertencia: mis datos de ejemplo anteriores solo tienen cadenas. La conversión de datos de Python no es compatible con shc , así que tuve problemas con los enteros y flotantes que no aparecían en HBase o con valores extraños.

Tengo una tarea embarazosamente paralela para la cual utilizo Spark para distribuir los cálculos. Estos cálculos están en Python, y uso PySpark para leer y preprocesar los datos. Los datos de entrada a mi tarea se almacenan en HBase. Desafortunadamente, aún no he encontrado una forma satisfactoria (es decir, fácil de usar y escalable) de leer / escribir datos de HBase desde / a Spark usando Python.

Lo que he explorado anteriormente:

  • Conectando desde dentro de mis procesos de Python usando happybase . Este paquete permite conectarse a HBase desde Python al usar la Thrift API de HBase. De esta manera, básicamente me salteo Spark para la lectura / escritura de datos y me estoy perdiendo las posibles optimizaciones de HBase-Spark. Las velocidades de lectura parecen razonablemente rápidas, pero las velocidades de escritura son lentas. Esta es actualmente mi mejor solución.

  • Usando el nuevo APIHadoopRDD de newAPIHadoopRDD y saveAsNewAPIHadoopDataset que hacen uso de la interfaz MapReduce de HBase. Ejemplos de esto fueron incluidos una vez en la base del código Spark ( ver aquí ). Sin embargo, ahora se consideran obsoletos a favor de las fijaciones de Spark de HBase ( ver aquí ). También encontré que este método es lento y engorroso (para leer, escribir funcionaba bien), por ejemplo, ya que las cadenas devueltas desde el newAPIHadoopRDD tuvieron que ser analizadas y transformadas de varias maneras para terminar con los objetos de Python que yo quería.

Alternativas de las que soy consciente:

  • Actualmente estoy usando el CDH de Cloudera y la versión 5.7.0 ofrece hbase-spark ( notas de la versión de CDH y una publicación de blog detallada ). Este módulo (anteriormente conocido como SparkOnHBase ) será oficialmente parte de HBase 2.0. Desafortunadamente, esta maravillosa solución parece funcionar solo con Scala / Java.

  • Spark-SQL-on-HBase / Astro de Huawei (no veo la diferencia entre los dos ...). No parece tan robusto y bien respaldado como me gustaría que fuera mi solución.