apache-spark jdbc hive pyspark hortonworks-data-platform

apache spark - Error sqlContext HiveDriver en SQLException: método no admitido



apache-spark jdbc (2)

He intentado usar sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver") para obtener la tabla de Hive en Spark sin ningún éxito. He investigado y leo a continuación:

Cómo conectarse al servidor de colmena remoto desde la chispa

Spark 1.5.1 no funciona con hive jdbc 1.2.0

http://belablotski.blogspot.in/2016/01/access-hive-tables-from-spark-using.html

Usé el último Hortonworks Sandbox 2.6 y le pregunté a la comunidad la misma pregunta:

https://community.hortonworks.com/questions/156828/pyspark-jdbc-py4jjavaerror-calling-o95load-javasql.html?childToView=156936#answer-156936

Lo que quiero hacer es muy simple a través de pyspark :

df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="sample_07",user="maria_dev", password="maria_dev").load()

Eso me dio este error:

17/12/30 19:55:14 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10016/default Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/hdp/current/spark-client/python/pyspark/sql/readwriter.py", line 139, in load return self._df(self._jreader.load()) File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco return f(*a, **kw) File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o119.load. : java.sql.SQLException: Method not supported at org.apache.hive.jdbc.HiveResultSetMetaData.isSigned(HiveResultSetMetaData.java:143) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:136) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:91) at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:748)

Usando beeline, funciona bien

beeline> !connect jdbc:hive2://localhost:10016/default maria_dev maria_dev Connecting to jdbc:hive2://localhost:10016/default Connected to: Spark SQL (version 2.1.1.2.6.1.0-129) Driver: Hive JDBC (version 1.2.1000.2.6.1.0-129) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://localhost:10016/default> select * from sample_07 limit 2; +----------+-------------------------+------------+---------+--+ | code | description | total_emp | salary | +----------+-------------------------+------------+---------+--+ | 00-0000 | All Occupations | 134354250 | 40690 | | 11-0000 | Management occupations | 6003930 | 96150 | +----------+-------------------------+------------+---------+--+

Yo también podría hacer esto:

spark = SparkSession.Builder().appName("testapp").enableHiveSupport().‌​getOrCreate() spark.sql("select * from default.sample_07").collect()

Pero esto se lee directamente en Metadatos de Hive. Me gustaría usar JDBC para Spark Thrift Server para una seguridad de grano fino.

Podría hacer PostgreSQL así:

sqlContext.read.format("jdbc").options(driver="org.postgresql.Driver")

También podría usar Scala java.sql.{DriverManager, Connection, Statement, ResultSet} para crear la conexión JDBC como cliente para llegar a Spark. Pero eso básicamente pone todos los datos en la memoria y luego vuelve a crear Dataframe manualmente.

Entonces, la pregunta es: ¿hay alguna forma de crear el marco de datos Spark con los datos de la tabla Hive sin cargar datos en la memoria en el cliente JDBC como Scala y no usar SparkSession.Builder() como los ejemplos anteriores? Mi caso de uso es que necesito lidiar con la seguridad detallada.


En realidad, investigué esto. Hotornworks y cloudera están inclinando el soporte para conectarse a la colmena desde Spark a través del Thrift Server.

Entonces estás trabajando en algo que es imposible.

https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_thrift_server .

The Links dice que el ahorro está desactivado, pero es específicamente para colmenar de chispa. Puedo conectarme a todo tipo de bases de datos desde la chispa excepto la colmena.

Entonces debes trabajar en diferentes estilos de autorización.

Como el objeto de chispa se conecta directamente a la colmena, están retirando el soporte de ahorro.

De su pregunta anterior, puede leer los datos pero leer los datos incorrectos. Error de servidor de Spark 2.2 Thrift en el dataframe NumberFormatException cuando la tabla Hive de la consulta

Código

>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test4",user="hive", password="hive").option("fetchsize", "10").load() >>> df.select("*").show() +---+----+ | id|desc| +---+----+ | id|desc| | id|desc| +---+----+

El problema aquí está en la colmena

La forma predeterminada en el dialecto predeterminado para citar identificadores es el uso de comillas dobles. Una consulta SQL como SELECT "dw_date" FROM table ... será analizada por Hive para seleccionar un literal de cadena, en lugar de una columna llamada "dw_date". Al reemplazar las comillas por los backticks, parece que el problema está resuelto. Sin embargo, en mi prueba, los nombres de columna obtenidos de Hive están todos prefijados con el nombre de tabla como table.dw_date. Pero no se puede envolver directamente sobre ellos como table.dw_date . Alternativamente, necesitamos envolver cada parte individualmente

código

private case object HiveDialect extends JdbcDialect { override def canHandle(url : String): Boolean = url.startsWith("jdbc:hive2") override def quoteIdentifier(colName: String): String = { colName.split(‘.’).map(part => s”`$part`”).mkString(“.”) } }

Siga la publicación a continuación para implementar la solución.

https://medium.com/@viirya/custom-jdbc-dialect-for-hive-5dbb694cc2bd

https://medium.com/@huaxing/customize-spark-jdbc-data-source-to-work-with-your-dedicated-database-dialect-beec6519af27


No estoy seguro si entiendo tu pregunta correctamente o no, pero por lo que entiendo necesitarás tener una tabla de colmenas en el marco de datos, para eso no necesitas tener la conexión JDBC, en tus enlaces de ejemplo están tratando de conectarse a diferentes bases de datos (RDBMS), no a Hive.

Por favor, consulte el enfoque a continuación, utilizando el contexto de colmena puede obtener la tabla en un marco de datos.

import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, SQLContext} def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("APPName") val sc = new SparkContext(sparkConf) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val sqlContext = new SQLContext(sc) val hive_df = hiveContext.sql("select * from schema.table").first() //other way // val hive_df= hiveContext.table ("SchemaName.TableName") //Below will print the first line df.first() //count on dataframe df.count() }

Si realmente quieres usar la conexión JDBC, tengo el siguiente ejemplo que utilicé para la base de datos Oracle, que podría ayudarte.

val oracle_data = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:username/password//hostname:2134/databaseName", "dbtable" -> "Your query tmp", "driver" -> "oracle.jdbc.driver.OracleDriver"));