tutorial spark read hive pyspark

read - Consultar tabla de HIVE en pyspark



spark scala sql (5)

En mi problema, cp el hive-site.xml a su $ SPARK_HOME / conf, y cp el mysql-connect-java - *. Jar a su $ SPARK_HOME / jars, esta solución resolvió mi problema.

Estoy usando CDH5.5

Tengo una tabla creada en la base de datos predeterminada de HIVE y puedo consultarla desde el comando HIVE.

Salida

hive> use default; OK Time taken: 0.582 seconds hive> show tables; OK bank Time taken: 0.341 seconds, Fetched: 1 row(s) hive> select count(*) from bank; OK 542 Time taken: 64.961 seconds, Fetched: 1 row(s)

Sin embargo, no puedo consultar la tabla desde pyspark ya que no puede reconocer la tabla.

from pyspark.context import SparkContext from pyspark.sql import HiveContext sqlContext = HiveContext(sc) sqlContext.sql("use default") DataFrame[result: string] sqlContext.sql("show tables").show() +---------+-----------+ |tableName|isTemporary| +---------+-----------+ +---------+-----------+ sqlContext.sql("FROM bank SELECT count(*)") 16/03/16 20:12:13 INFO parse.ParseDriver: Parsing command: FROM bank SELECT count(*) 16/03/16 20:12:13 INFO parse.ParseDriver: Parse Completed Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/lib/spark/python/pyspark/sql/utils.py", line 40, in deco raise AnalysisException(s.split('': '', 1)[1]) **pyspark.sql.utils.AnalysisException: no such table bank; line 1 pos 5**

Nuevo error

>>> from pyspark.sql import HiveContext >>> hive_context = HiveContext(sc) >>> bank = hive_context.table("default.bank") 16/03/22 18:33:30 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 16/03/22 18:33:30 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 16/03/22 18:33:50 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table. Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/spark/python/pyspark/sql/context.py", line 565, in table return DataFrame(self._ssql_ctx.table(tableName), self) File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o22.table. : org.apache.spark.sql.catalyst.analysis.NoSuchTableException at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:123) at org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:60) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:406) at org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:422) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:203) at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:422) at org.apache.spark.sql.SQLContext.table(SQLContext.scala:739) at org.apache.spark.sql.SQLContext.table(SQLContext.scala:735) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)

Gracias


No estoy seguro, si esto no se ha resuelto todavía, estaba revisando el kernel de pyspark con la integración de Livy y así es como probé la configuración de Hive

from pyspark.sql import Row from pyspark.sql import HiveContext sqlContext = HiveContext(sc) test_list = [(''A'', 25),(''B'', 20),(''C'', 25),(''D'', 18)] rdd = sc.parallelize(test_list) people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) schemaPeople = sqlContext.createDataFrame(people) # Register it as a temp table sqlContext.registerDataFrameAsTable(schemaPeople, "test_table") sqlContext.sql("show tables").show() Output: -------- +--------+----------+-----------+ |database| tableName|isTemporary| +--------+----------+-----------+ | |test_table| true| +--------+----------+-----------+ Now one can query it in many different ways, 1. jupyter kernel(sparkmagic syntax): %%sql SELECT * FROM test_table limit 4 2. Using default HiveContext: sqlContext.sql("Select * from test_table").show()


No podemos pasar el nombre de la tabla de Hive directamente al método de contexto de SQL de Hive ya que no entiende el nombre de la tabla de Hive. Una forma de leer la tabla de Hive en el shell de pyspark es:

from pyspark.sql import HiveContext hive_context = HiveContext(sc) bank = hive_context.table("default.bank") bank.show()

Para ejecutar el SQL en la tabla hive: Primero, debemos registrar el marco de datos que obtenemos al leer la tabla hive. Entonces podemos ejecutar la consulta SQL.

bank.registerTempTable("bank_temp") hive_context.sql("select * from bank_temp").show()


SparkSQL se entrega con su propia metástasis (derby), de modo que puede funcionar incluso si el subárbol no está instalado en el sistema. Este es el modo predeterminado.

En la pregunta anterior, creó una tabla en la colmena. Obtiene el error de table not found porque SparkSQL está utilizando su almacén de datos predeterminado que no tiene metadatos de su tabla de subárbol.

Si desea que SparkSQL use en su lugar el almacén de metáforos de Hive y acceda a las tablas de Hive, debe agregar hive-site.xml en la carpeta spark conf.


puede utilizar sqlCtx.sql. El archivo hive-site.xml se debe copiar en la ruta spark conf.

my_dataframe = sqlCtx.sql ("Seleccionar * de las categorías") my_dataframe.show ()