datacamp - hadoop spark python
¿Cómo conectar HBase y Spark usando Python? (1)
Encontré este comentario de uno de los creadores de hbase-spark
, que parece sugerir que hay una forma de usar PySpark para consultar HBase usando Spark SQL.
Y, de hecho, el patrón que se describe aquí puede aplicarse a la consulta de HBase con Spark SQL utilizando PySpark, como se muestra en el siguiente ejemplo:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlc = SQLContext(sc)
data_source_format = ''org.apache.hadoop.hbase.spark''
df = sc.parallelize([(''a'', ''1.0''), (''b'', ''2.0'')]).toDF(schema=[''col0'', ''col1''])
# ''''.join(string.split()) in order to write a multi-line JSON string here.
catalog = ''''.join("""{
"table":{"namespace":"default", "name":"testtable"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"cf", "col":"col1", "type":"string"}
}
}""".split())
# Writing
df.write/
.options(catalog=catalog)/ # alternatively: .option(''catalog'', catalog)
.format(data_source_format)/
.save()
# Reading
df = sqlc.read/
.options(catalog=catalog)/
.format(data_source_format)/
.load()
He intentado hbase-spark-1.2.0-cdh5.7.0.jar
(distribuido por Cloudera) para esto, pero hbase-spark-1.2.0-cdh5.7.0.jar
problemas ( org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select
cuando escritura, java.util.NoSuchElementException: None.get
al leer). Como resultado, la versión actual de CDH no incluye los cambios en hbase-spark
que permiten la integración de Spark SQL-HBase.
Lo que sí funciona para mí es el paquete shc
Spark, que se encuentra aquí . El único cambio que tuve que hacer en el script anterior es cambiar:
data_source_format = ''org.apache.spark.sql.execution.datasources.hbase''
Así es como presento el script anterior en mi clúster CDH, siguiendo el ejemplo del shc
README de shc
:
spark-submit --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/ --files /opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml example.py
La mayor parte del trabajo en shc
parece shc
fusionado ya en el módulo hbase-spark
de HBase, para su lanzamiento en la versión 2.0. Con eso, es posible consultar Spark SQL de HBase usando el patrón mencionado anteriormente (ver: https://hbase.apache.org/book.html#_sparksql_dataframes para más detalles). Mi ejemplo anterior muestra cómo se ve para los usuarios de PySpark.
Finalmente, una advertencia: mis datos de ejemplo anteriores solo tienen cadenas. La conversión de datos de Python no es compatible con shc
, así que tuve problemas con los enteros y flotantes que no aparecían en HBase o con valores extraños.
Tengo una tarea embarazosamente paralela para la cual utilizo Spark para distribuir los cálculos. Estos cálculos están en Python, y uso PySpark para leer y preprocesar los datos. Los datos de entrada a mi tarea se almacenan en HBase. Desafortunadamente, aún no he encontrado una forma satisfactoria (es decir, fácil de usar y escalable) de leer / escribir datos de HBase desde / a Spark usando Python.
Lo que he explorado anteriormente:
Conectando desde dentro de mis procesos de Python usando
happybase
. Este paquete permite conectarse a HBase desde Python al usar la Thrift API de HBase. De esta manera, básicamente me salteo Spark para la lectura / escritura de datos y me estoy perdiendo las posibles optimizaciones de HBase-Spark. Las velocidades de lectura parecen razonablemente rápidas, pero las velocidades de escritura son lentas. Esta es actualmente mi mejor solución.Usando el nuevo APIHadoopRDD de
newAPIHadoopRDD
ysaveAsNewAPIHadoopDataset
que hacen uso de la interfaz MapReduce de HBase. Ejemplos de esto fueron incluidos una vez en la base del código Spark ( ver aquí ). Sin embargo, ahora se consideran obsoletos a favor de las fijaciones de Spark de HBase ( ver aquí ). También encontré que este método es lento y engorroso (para leer, escribir funcionaba bien), por ejemplo, ya que las cadenas devueltas desde elnewAPIHadoopRDD
tuvieron que ser analizadas y transformadas de varias maneras para terminar con los objetos de Python que yo quería.
Alternativas de las que soy consciente:
Actualmente estoy usando el CDH de Cloudera y la versión 5.7.0 ofrece
hbase-spark
( notas de la versión de CDH y una publicación de blog detallada ). Este módulo (anteriormente conocido comoSparkOnHBase
) será oficialmente parte de HBase 2.0. Desafortunadamente, esta maravillosa solución parece funcionar solo con Scala / Java.Spark-SQL-on-HBase / Astro de Huawei (no veo la diferencia entre los dos ...). No parece tan robusto y bien respaldado como me gustaría que fuera mi solución.