tutorial spark sources receive foreachrdd example data context scala apache-spark psql

scala - sources - spark streaming 2.2 0



¿Cómo puedo conectarme a una base de datos postgreSQL en Apache Spark usando Scala? (1)

Nuestro objetivo es ejecutar consultas SQL paralelas de los trabajadores de Spark.

Configuración de compilación

Agregue el conector y JDBC a libraryDependencies en build.sbt . Solo he intentado esto con MySQL, así que lo usaré en mis ejemplos, pero Postgres debería ser muy parecido.

libraryDependencies ++= Seq( jdbc, "mysql" % "mysql-connector-java" % "5.1.29", "org.apache.spark" %% "spark-core" % "1.0.1", // etc )

Código

Cuando crea SparkContext le dice qué SparkContext para copiar a los ejecutores. Incluye el frasco conector. Una buena forma de hacer esto:

val classes = Seq( getClass, // To get the jar with our own code. classOf[mysql.jdbc.Driver] // To get the connector. ) val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath()) val conf = new SparkConf().setJars(jars)

Ahora Spark está listo para conectarse a la base de datos. Cada ejecutor ejecutará parte de la consulta, de modo que los resultados estén listos para el cálculo distribuido.

Hay dos opciones para esto. El enfoque anterior es usar org.apache.spark.rdd.JdbcRDD :

val rdd = new org.apache.spark.rdd.JdbcRDD( sc, () => { sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred") }, "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?", 0, 1000, 10, row => row.getString("BOOK_TITLE") )

Consulte la documentación de los parámetros. Brevemente:

  • Tienes el SparkContext .
  • Entonces una función que crea la conexión. Se solicitará a cada trabajador que se conecte a la base de datos.
  • Luego la consulta SQL. Esto tiene que ser similar al ejemplo y contener marcadores de posición para la clave inicial y final.
  • Luego, especifica el rango de teclas (de 0 a 1000 en mi ejemplo) y el número de particiones. El rango se dividirá entre las particiones. Entonces, un hilo ejecutor terminará ejecutando SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100 en el ejemplo.
  • Y finalmente tenemos una función que convierte el ResultSet en algo. En el ejemplo, lo convertimos en un String , por lo que termina con un RDD[String] .

Desde Apache Spark versión 1.3.0, otro método está disponible a través de la API de DataFrame. En lugar de JdbcRDD , crearía un org.apache.spark.sql.DataFrame :

val df = sqlContext.load("jdbc", Map( "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred", "dbtable" -> "BOOKS"))

Consulte https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases para obtener la lista completa de opciones (el rango de claves y el número de particiones se pueden establecer solo como con JdbcRDD ).

Actualizaciones

JdbcRDD no es compatible con las actualizaciones. Pero puedes simplemente hacerlos en una foreachPartition .

rdd.foreachPartition { it => val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred") val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?") for (bookTitle <- it) { del.setString(1, bookTitle) del.executeUpdate } }

(Esto crea una conexión por partición. Si eso es una preocupación, ¡use un grupo de conexiones!)

DataFrame actualizaciones de soporte de DataFrame a través de los métodos createJDBCTable e insertIntoJDBC .

Quiero saber cómo puedo hacer las siguientes cosas en Scala.

  1. Conéctese a una base de datos postgreSQL usando Spark scala.
  2. Escribir consultas SQL como SELECT, UPDATE, etc. para modificar una tabla en esa base de datos.

Sé hacerlo con scala, pero ¿cómo importar el conector de psql scala en sbt mientras lo empaqueto?