apache-spark pyspark apache-spark-sql pyspark-sql

apache spark - Más de una hora para ejecutar pyspark.sql.DataFrame.take(4)



apache-spark apache-spark-sql (1)

Estoy ejecutando spark 1.6 en 3 máquinas virtuales (es decir, 1x maestro; 2x esclavos), todos con 4 núcleos y 16 GB de RAM.

Puedo ver a los trabajadores registrados en spark-master webUI.

Quiero recuperar datos de mi base de datos Vertica para trabajar en ellos. Como no pude ejecutar consultas complejas, intenté entender las consultas ficticias. Consideramos aquí una tarea fácil.

Mi código es:

df = sqlContext.read.format(''jdbc'').options(url=''xxxx'', dbtable=''xxx'', user=''xxxx'', password=''xxxx'').load() four = df.take(4)

Y la salida es (nota: reemplazo con @IPSLAVE la IP de VM esclava: Puerto):

16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1 16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions 16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1) 16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List() 16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List() 16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents 16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB) 16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB) 16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB) 16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1) 16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes) 16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB) 16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1) 16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s 16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s

Como puede ver, toma mucho tiempo. En realidad, mi tabla es bastante grande (almacena alrededor de 220 millones de líneas, 11 campos cada una), pero dicha consulta se ejecutaría instantáneamente usando sql "normal" (por ejemplo, pyodbc).

Supongo que estoy malinterpretando / haciendo mal uso de Spark, ¿tendrías ideas o consejos para que funcione mejor?


Si bien Spark admite un empuje de predicado limitado sobre JDBC, todas las demás operaciones, como límite, grupo y agregaciones, se realizan internamente. Desafortunadamente, significa que take(4) buscará primero los datos y luego aplicará el limit . En otras palabras, su base de datos ejecutará (suponiendo que no haya proyecciones ni filtros) algo equivalente a:

SELECT * FROM table

y el resto será manejado por Spark. Hay algunas optimizaciones involucradas (en particular, Spark evalúa las particiones de forma iterativa para obtener el número de registros solicitados por LIMIT ), pero sigue siendo un proceso bastante ineficiente en comparación con las optimizaciones del lado de la base de datos.

Si desea empujar el limit a la base de datos, deberá hacerlo estáticamente utilizando subconsulta como parámetro dbtable :

(sqlContext.read.format(''jdbc'') .options(url=''xxxx'', dbtable=''(SELECT * FROM xxx LIMIT 4) tmp'', ....))

sqlContext.read.format("jdbc").options(Map( "url" -> "xxxx", "dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp", ))

Tenga en cuenta que un alias en subconsulta es obligatorio.

Nota :

Este comportamiento puede mejorarse en el futuro, una vez que Data Source API v2 esté listo: