tutorial spark read sql scala mysqli apache-spark mysql-connector

read - Crear Spark Dataframe desde SQL Query



spark sql java (5)

Si ya tiene su table registrada en su SQLContext , puede simplemente usar el método sql .

val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")

Estoy seguro de que esta es una pregunta simple de SQLContext, pero no puedo encontrar ninguna respuesta en los documentos de Spark o Stackoverflow

Quiero crear un Spark Dataframe a partir de una consulta SQL en MySQL

Por ejemplo, tengo una consulta complicada de MySQL como

SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...

y quiero un Dataframe con Columnas X, Y y Z

Descubrí cómo cargar tablas enteras en Spark, y pude cargarlas todas, y luego hacer la unión y selección allí. Sin embargo, eso es muy ineficiente. Solo quiero cargar la tabla generada por mi consulta SQL.

Aquí está mi aproximación actual del código, eso no funciona. Mysql-connector tiene una opción "dbtable" que se puede usar para cargar una tabla completa. Espero que haya alguna manera de especificar una consulta

val df = sqlContext.format("jdbc"). option("url", "jdbc:mysql://localhost:3306/local_content"). option("driver", "com.mysql.jdbc.Driver"). option("useUnicode", "true"). option("continueBatchOnError","true"). option("useSSL", "false"). option("user", "root"). option("password", ""). sql( """ select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d join DialogLine as dl on dl.DialogID=d.DialogID join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID join WordRoot as wr on wr.WordRootID=wi.WordRootID where d.InSite=1 and dl.Active=1 limit 100 """ ).load()

Gracias Peter


para guardar el resultado de una consulta en un nuevo marco de datos, simplemente configure el resultado igual a una variable:

val newDataFrame = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")

y ahora newDataFrame es un marco de datos con todas las funciones de dataframe disponibles.


TL; DR: solo crea una vista en tu base de datos.

Detalle: Tengo una tabla t_city en mi base de datos postgres, en la cual creo una vista:

create view v_city_3500 as select asciiname, country, population, elevation from t_city where elevation>3500 and population>100000 select * from v_city_3500; asciiname | country | population | elevation -----------+---------+------------+----------- Potosi | BO | 141251 | 3967 Oruro | BO | 208684 | 3936 La Paz | BO | 812799 | 3782 Lhasa | CN | 118721 | 3651 Puno | PE | 116552 | 3825 Juliaca | PE | 245675 | 3834

En la chispa-cáscara:

val sx= new org.apache.spark.sql.SQLContext(sc) var props=new java.util.Properties() props.setProperty("driver", "org.postgresql.Driver" ) val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn" val city_df=sx.read.jdbc(url=url,table="t_city",props) val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)

Resultado:

city_df.count() Long = 145725 city_3500_df.count() Long = 6


con MYSQL leer / cargar datos algo así como a continuación

val conf = new SparkConf().setAppName("SparkMe Application").setMaster("local[2]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val sqlContext = new org.apache.spark.sql.SQLContext(sc) val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password", "dbtable" -> "TABLE_NAME")).load()

escribir datos en la tabla como se muestra a continuación

import java.util.Properties val prop = new Properties() prop.put("user", "<>") prop.put("password", "simple$123") val dfWriter = jdbcDF.write.mode("append") dfWriter.jdbc("jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password", "tableName", prop)

para crear un marco de datos a partir de la consulta, haga algo como a continuación

val finalModelDataDF = { val query = "select * from table_name" sqlContext.sql(query) }; finalModelDataDF.show()


OK, aquí está la respuesta ...

Encontré esto aquí Migración masiva de datos a través de Spark SQL

El parámetro dbname puede ser cualquier consulta envuelta entre paréntesis con un alias. Entonces en mi caso, necesito hacer esto ...

val query = """ (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d join DialogLine as dl on dl.DialogID=d.DialogID join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID join WordRoot as wr on wr.WordRootID=wi.WordRootID where d.InSite=1 and dl.Active=1 limit 100) foo """ val df = sqlContext.format("jdbc"). option("url", "jdbc:mysql://localhost:3306/local_content"). option("driver", "com.mysql.jdbc.Driver"). option("useUnicode", "true"). option("continueBatchOnError","true"). option("useSSL", "false"). option("user", "root"). option("password", ""). option("dbtable",query). load()

Como era de esperar, cargar cada tabla como su propio Dataframe y unirlas en Spark fue muy ineficiente.