tutorial spark read python scala apache-spark apache-spark-sql pyspark

python - read - spark sql tutorial



¿Cómo usar la fuente JDBC para escribir y leer datos en(Py) Spark? (3)

Escribir datos

  1. Incluya el controlador JDBC aplicable cuando envíe la aplicación o inicie el shell. Puede usar por ejemplo --packages :

    bin/pyspark --packages group:name:version

    o combinando driver-class-path y jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

    Estas propiedades también se pueden establecer usando la variable de entorno PYSPARK_SUBMIT_ARGS antes de que se inicie la instancia JVM o usando conf/spark-defaults.conf para establecer spark.jars.packages o spark.jars / spark.driver.extraClassPath .

  2. Elija el modo deseado. El escritor JDBC de Spark admite los siguientes modos:

    • append : agrega contenido de esto: clase: DataFrame a los datos existentes.
    • overwrite : sobrescribe los datos existentes.
    • ignore : ignore silenciosamente esta operación si ya existen datos.
    • error (caso predeterminado): lanzar una excepción si ya existen datos.

    Upserts u otras modificaciones de grano fino no son compatibles

    mode = ...

  3. Prepare JDBC URI, por ejemplo:

    # You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"

  4. (Opcional) Cree un diccionario de argumentos JDBC.

    properties = { "user": "foo", "password": "bar" }

    properties / options también se pueden utilizar para establecer propiedades de conexión JDBC compatibles .

  5. Use DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)

    para guardar los datos (ver pyspark.sql.DataFrameWriter para más detalles).

Problemas conocidos :

  • No se puede encontrar el controlador adecuado cuando el controlador se ha incluido usando --packages ( java.sql.SQLException: No suitable driver found for jdbc: ... )

    Suponiendo que no hay una versión de controlador que no coincida para resolver esto, puede agregar la clase de driver a las properties . Por ejemplo:

    properties = { ... "driver": "org.postgresql.Driver" }

  • El uso de df.write.format("jdbc").options(...).save() puede resultar en:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource no permite crear tablas como select.

    Solución desconocida

  • en Pyspark 1.3 puede intentar llamar al método Java directamente:

    df._jdf.insertIntoJDBC(url, "baz", True)

Lectura de datos

  1. Siga los pasos 1-4 de Escribir datos
  2. Utilice sqlContext.read.jdbc :

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)

    o sqlContext.read.format("jdbc") :

    (sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())

Problemas conocidos y problemas :

  • No se puede encontrar el controlador adecuado; consulte: Escritura de datos
  • Spark SQL admite el empuje de predicados con fuentes JDBC, aunque no todos los predicados pueden empujarse hacia abajo. Tampoco delega límites ni agregaciones. Una posible solución es reemplazar el argumento dbtable / table con una subconsulta válida. Ver por ejemplo:

    • ¿Funciona el pushdown de predicado de chispa con JDBC?
    • Más de una hora para ejecutar pyspark.sql.DataFrame.take (4)
    • ¿Cómo usar la consulta SQL para definir la tabla en dbtable?
  • Por defecto, las fuentes de datos JDBC cargan datos secuencialmente usando un solo hilo ejecutor. Para garantizar la carga de datos distribuidos, puede:

    • Proporcione la column partición (debe ser IntegeType ), lowerBound , upperBound , numPartitions .
    • Proporcione una lista de predicados mutuamente excluyentes, uno para cada partición deseada.

    Ver:

    • Particionar en chispa mientras lee desde RDBMS a través de JDBC ,
    • ¿Cómo optimizar el particionamiento al migrar datos desde la fuente JDBC? ,
    • ¿Cómo mejorar el rendimiento para trabajos lentos de Spark utilizando DataFrame y conexión JDBC?
    • ¿Cómo particionar Spark RDD al importar Postgres usando JDBC?
  • En un modo distribuido (con columna de partición o predicados) cada ejecutor opera en su propia transacción. Si la base de datos fuente se modifica al mismo tiempo, no hay garantía de que la vista final sea coherente.

Dónde encontrar los controladores adecuados:

Otras opciones

Dependiendo de la base de datos, puede existir una fuente especializada y preferible en algunos casos:

El objetivo de esta pregunta es documentar:

  • pasos necesarios para leer y escribir datos utilizando conexiones JDBC en PySpark

  • posibles problemas con fuentes JDBC y soluciones conocidas

Con pequeños cambios, estos métodos deberían funcionar con otros lenguajes compatibles, incluidos Scala y R.


Consulte este enlace para descargar el jdbc para postgres y siga los pasos para descargar el archivo jar

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html archivo jar se descargará en la ruta de esta manera. "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"

Si tu versión de chispa es 2

from pyspark.sql import SparkSession spark = SparkSession.builder .appName("sparkanalysis") .config("spark.driver.extraClassPath", "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar") .getOrCreate() //for localhost database// pgDF = spark.read / .format("jdbc") / .option("url", "jdbc:postgresql:postgres") / .option("dbtable", "public.user_emp_tab") / .option("user", "postgres") / .option("password", "Jonsnow@100") / .load() print(pgDF) pgDF.filter(pgDF["user_id"]>5).show()

guarde el archivo como python y ejecute "python respectfilename.py"


Descargue el controlador mysql-connector-java y manténgalo en la carpeta spark jar, observe el siguiente código de python aquí escribiendo datos en "acotr1", tenemos que crear la estructura de tabla acotr1 en la base de datos mysql

spark = SparkSession.builder.appName("prasadad").master(''local'').config(''spark.driver.extraClassPath'',''D:/spark-2.1.0-bin-hadoop2.7/jars/mysql-connector-java-5.1.41-bin.jar'').getOrCreate() sc = spark.sparkContext from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="Ramyam01").load() mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=Ramyam01" df.write.jdbc(mysql_url,table="actor1",mode="append")