python - read - spark sql tutorial
¿Cómo usar la fuente JDBC para escribir y leer datos en(Py) Spark? (3)
Escribir datos
-
Incluya el controlador JDBC aplicable cuando envíe la aplicación o inicie el shell. Puede usar por ejemplo
--packages
:bin/pyspark --packages group:name:version
o combinando
driver-class-path
yjars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
Estas propiedades también se pueden establecer usando la variable de entorno
PYSPARK_SUBMIT_ARGS
antes de que se inicie la instancia JVM o usandoconf/spark-defaults.conf
para establecerspark.jars.packages
ospark.jars
/spark.driver.extraClassPath
. -
Elija el modo deseado. El escritor JDBC de Spark admite los siguientes modos:
-
append
: agrega contenido de esto: clase:DataFrame
a los datos existentes. -
overwrite
: sobrescribe los datos existentes. -
ignore
:ignore
silenciosamente esta operación si ya existen datos. -
error
(caso predeterminado): lanzar una excepción si ya existen datos.
Upserts u otras modificaciones de grano fino no son compatibles
mode = ...
-
-
Prepare JDBC URI, por ejemplo:
# You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
-
(Opcional) Cree un diccionario de argumentos JDBC.
properties = { "user": "foo", "password": "bar" }
properties
/options
también se pueden utilizar para establecer propiedades de conexión JDBC compatibles . -
Use
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
para guardar los datos (ver
pyspark.sql.DataFrameWriter
para más detalles).
Problemas conocidos :
-
No se puede encontrar el controlador adecuado cuando el controlador se ha incluido usando
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)Suponiendo que no hay una versión de controlador que no coincida para resolver esto, puede agregar la clase de
driver
a lasproperties
. Por ejemplo:properties = { ... "driver": "org.postgresql.Driver" }
-
El uso de
df.write.format("jdbc").options(...).save()
puede resultar en:java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource no permite crear tablas como select.
Solución desconocida
-
en Pyspark 1.3 puede intentar llamar al método Java directamente:
df._jdf.insertIntoJDBC(url, "baz", True)
Lectura de datos
- Siga los pasos 1-4 de Escribir datos
-
Utilice
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
o
sqlContext.read.format("jdbc")
:(sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
Problemas conocidos y problemas :
- No se puede encontrar el controlador adecuado; consulte: Escritura de datos
-
Spark SQL admite el empuje de predicados con fuentes JDBC, aunque no todos los predicados pueden empujarse hacia abajo. Tampoco delega límites ni agregaciones. Una posible solución es reemplazar el argumento
dbtable
/table
con una subconsulta válida. Ver por ejemplo:- ¿Funciona el pushdown de predicado de chispa con JDBC?
- Más de una hora para ejecutar pyspark.sql.DataFrame.take (4)
- ¿Cómo usar la consulta SQL para definir la tabla en dbtable?
-
Por defecto, las fuentes de datos JDBC cargan datos secuencialmente usando un solo hilo ejecutor. Para garantizar la carga de datos distribuidos, puede:
-
Proporcione la
column
partición (debe serIntegeType
),lowerBound
,upperBound
,numPartitions
. - Proporcione una lista de predicados mutuamente excluyentes, uno para cada partición deseada.
Ver:
- Particionar en chispa mientras lee desde RDBMS a través de JDBC ,
- ¿Cómo optimizar el particionamiento al migrar datos desde la fuente JDBC? ,
- ¿Cómo mejorar el rendimiento para trabajos lentos de Spark utilizando DataFrame y conexión JDBC?
- ¿Cómo particionar Spark RDD al importar Postgres usando JDBC?
-
Proporcione la
-
En un modo distribuido (con columna de partición o predicados) cada ejecutor opera en su propia transacción. Si la base de datos fuente se modifica al mismo tiempo, no hay garantía de que la vista final sea coherente.
Dónde encontrar los controladores adecuados:
-
Maven Repository (para obtener las coordenadas requeridas para
--packages
seleccione la versión deseada y copie los datos de una pestaña de Gradle en un formulariocompile-group:name:version
sustituye los campos respectivos) o Maven Central Repository :
Otras opciones
Dependiendo de la base de datos, puede existir una fuente especializada y preferible en algunos casos:
- Greenplum - Conector Pivotal Greenplum-Spark
- Apache Phoenix - Apache Spark Plugin
- Microsoft SQL Server: conector de chispa para bases de datos SQL de Azure y SQL Server
- Amazon Redshift: conector Databricks Redshift (versiones actuales disponibles solo en un Databricks Runtime patentado. Versión de código abierto descontinuada, disponible en GitHub ).
El objetivo de esta pregunta es documentar:
-
pasos necesarios para leer y escribir datos utilizando conexiones JDBC en PySpark
-
posibles problemas con fuentes JDBC y soluciones conocidas
Con pequeños cambios, estos métodos deberían funcionar con otros lenguajes compatibles, incluidos Scala y R.
Consulte este enlace para descargar el jdbc para postgres y siga los pasos para descargar el archivo jar
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html archivo jar se descargará en la ruta de esta manera. "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"
Si tu versión de chispa es 2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read /
.format("jdbc") /
.option("url", "jdbc:postgresql:postgres") /
.option("dbtable", "public.user_emp_tab") /
.option("user", "postgres") /
.option("password", "Jonsnow@100") /
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
guarde el archivo como python y ejecute "python respectfilename.py"
Descargue el controlador mysql-connector-java y manténgalo en la carpeta spark jar, observe el siguiente código de python aquí escribiendo datos en "acotr1", tenemos que crear la estructura de tabla acotr1 en la base de datos mysql
spark = SparkSession.builder.appName("prasadad").master(''local'').config(''spark.driver.extraClassPath'',''D:/spark-2.1.0-bin-hadoop2.7/jars/mysql-connector-java-5.1.41-bin.jar'').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="Ramyam01").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=Ramyam01"
df.write.jdbc(mysql_url,table="actor1",mode="append")