tutorial spark read example español python jdbc apache-spark apache-spark-sql pyspark

python - spark - ¿Funciona el pushdown de predicado de chispa con JDBC?



spark sql example (1)

De acuerdo a this

Catalyst aplica optimizaciones lógicas como el pushdown de predicados. El optimizador puede insertar predicados de filtro en la fuente de datos, lo que permite que la ejecución física omita datos irrelevantes.

Spark admite la inserción de predicados en la fuente de datos. ¿Esta función también está disponible / se espera para JDBC?

(Al inspeccionar los registros de la base de datos, puedo ver que no es el comportamiento predeterminado en este momento: la consulta completa se pasa a la base de datos, incluso si luego está limitada por los filtros de chispa)

MÁS DETALLES

Ejecutando Spark 1.5 con PostgreSQL 9.4

fragmento de código:

from pyspark import SQLContext, SparkContext, Row, SparkConf from data_access.data_access_db import REMOTE_CONNECTION sc = SparkContext() sqlContext = SQLContext(sc) url = ''jdbc:postgresql://{host}/{database}?user={user}&password={password}''.format(**REMOTE_CONNECTION) sql = "dummy" df = sqlContext.read.jdbc(url=url, table=sql) df = df.limit(1) df.show()

Seguimiento de SQL:

< 2015-09-15 07:11:37.718 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3 < 2015-09-15 07:11:37.771 EDT >LOG: execute <unnamed>: SELECT * FROM dummy WHERE 1=0 < 2015-09-15 07:11:37.830 EDT >LOG: execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = ''d'' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a drelid) LIKE ''%nextval(%'' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid AND a.attnum = vals.attnum) < 2015-09-15 07:11:40.936 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3 < 2015-09-15 07:11:40.964 EDT >LOG: execute <unnamed>: SELECT "id","name" FROM dummy

Esperaría que la última selección incluya una cláusula de limit 1 , pero no


Spark DataFrames admite la eliminación de predicados con fuentes JDBC, pero el término predicado se usa en un significado estricto de SQL. Significa que cubre solo la cláusula WHERE . Además, parece que se limita a la conjunción lógica (sin IN y OR , me temo) y predicados simples.

Todo lo demás, como límites, recuentos, pedidos, grupos y condiciones, se procesa en el lado de Spark. Una advertencia, ya cubierta en SO, es que df.count() o sqlContext.sql("SELECT COUNT(*) FROM df") se traduce a SELECT 1 FROM df y requiere una transferencia de datos sustancial y procesamiento usando Spark.

¿Significa que es una causa perdida? No exactamente. Es posible utilizar una subconsulta arbitraria como argumento de table . Es menos conveniente que un pushdown predicado, pero por lo demás funciona bastante bien:

n = ... # Number of rows to take sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n)) df = sqlContext.read.jdbc(url=url, table=sql)

Nota :

Este comportamiento puede mejorarse en el futuro, una vez que Data Source API v2 esté listo: