read - Consultar tabla de HIVE en pyspark
spark scala sql (5)
En mi problema, cp el hive-site.xml a su $ SPARK_HOME / conf, y cp el mysql-connect-java - *. Jar a su $ SPARK_HOME / jars, esta solución resolvió mi problema.
Estoy usando CDH5.5
Tengo una tabla creada en la base de datos predeterminada de HIVE y puedo consultarla desde el comando HIVE.
Salida
hive> use default;
OK
Time taken: 0.582 seconds
hive> show tables;
OK
bank
Time taken: 0.341 seconds, Fetched: 1 row(s)
hive> select count(*) from bank;
OK
542
Time taken: 64.961 seconds, Fetched: 1 row(s)
Sin embargo, no puedo consultar la tabla desde pyspark ya que no puede reconocer la tabla.
from pyspark.context import SparkContext
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.sql("use default")
DataFrame[result: string]
sqlContext.sql("show tables").show()
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
+---------+-----------+
sqlContext.sql("FROM bank SELECT count(*)")
16/03/16 20:12:13 INFO parse.ParseDriver: Parsing command: FROM bank SELECT count(*)
16/03/16 20:12:13 INFO parse.ParseDriver: Parse Completed
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 40, in deco
raise AnalysisException(s.split('': '', 1)[1])
**pyspark.sql.utils.AnalysisException: no such table bank; line 1 pos 5**
Nuevo error
>>> from pyspark.sql import HiveContext
>>> hive_context = HiveContext(sc)
>>> bank = hive_context.table("default.bank")
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/03/22 18:33:50 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 565, in table
return DataFrame(self._ssql_ctx.table(tableName), self)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o22.table.
: org.apache.spark.sql.catalyst.analysis.NoSuchTableException
at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123)
at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:123)
at org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:60)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:406)
at org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:422)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:203)
at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:422)
at org.apache.spark.sql.SQLContext.table(SQLContext.scala:739)
at org.apache.spark.sql.SQLContext.table(SQLContext.scala:735)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Gracias
No estoy seguro, si esto no se ha resuelto todavía, estaba revisando el kernel de pyspark con la integración de Livy y así es como probé la configuración de Hive
from pyspark.sql import Row
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
test_list = [(''A'', 25),(''B'', 20),(''C'', 25),(''D'', 18)]
rdd = sc.parallelize(test_list)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)
# Register it as a temp table
sqlContext.registerDataFrameAsTable(schemaPeople, "test_table")
sqlContext.sql("show tables").show()
Output:
--------
+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
| |test_table| true|
+--------+----------+-----------+
Now one can query it in many different ways,
1. jupyter kernel(sparkmagic syntax):
%%sql
SELECT * FROM test_table limit 4
2. Using default HiveContext:
sqlContext.sql("Select * from test_table").show()
No podemos pasar el nombre de la tabla de Hive directamente al método de contexto de SQL de Hive ya que no entiende el nombre de la tabla de Hive. Una forma de leer la tabla de Hive en el shell de pyspark es:
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
bank = hive_context.table("default.bank")
bank.show()
Para ejecutar el SQL en la tabla hive: Primero, debemos registrar el marco de datos que obtenemos al leer la tabla hive. Entonces podemos ejecutar la consulta SQL.
bank.registerTempTable("bank_temp")
hive_context.sql("select * from bank_temp").show()
SparkSQL se entrega con su propia metástasis (derby), de modo que puede funcionar incluso si el subárbol no está instalado en el sistema. Este es el modo predeterminado.
En la pregunta anterior, creó una tabla en la colmena. Obtiene el error de table not found
porque SparkSQL está utilizando su almacén de datos predeterminado que no tiene metadatos de su tabla de subárbol.
Si desea que SparkSQL use en su lugar el almacén de metáforos de Hive y acceda a las tablas de Hive, debe agregar hive-site.xml
en la carpeta spark conf.
puede utilizar sqlCtx.sql. El archivo hive-site.xml se debe copiar en la ruta spark conf.
my_dataframe = sqlCtx.sql ("Seleccionar * de las categorías") my_dataframe.show ()