tables spark multiple left full apache-spark dataframe spark-dataframe apache-spark-sql apache-spark-1.4

apache spark - spark - Optimización de la unión de DataFrame-Broadcast Hash Join



left join spark scala (5)

Estoy tratando de unir efectivamente dos DataFrames, uno de los cuales es grande y el segundo es un poco más pequeño.

¿Hay alguna manera de evitar toda esta confusión? No puedo establecer autoBroadCastJoinThreshold , porque solo admite números enteros, y la tabla que intento transmitir es ligeramente mayor que el número entero de bytes.

¿Hay alguna manera de forzar la transmisión ignorando esta variable?


Broadcast Hash Joins (similar a map side join o map-side combine en Mapreduce):

En SparkSQL puede ver el tipo de unión que se realiza llamando a queryExecution.executedPlan . Al igual que con el núcleo de Spark, si una de las tablas es mucho más pequeña que la otra, es posible que desee unir hash de difusión. Puede insinuar a Spark SQL que un DF determinado se debe transmitir para unirse llamando al método broadcast en el DataFrame antes de unirse

Ejemplo: largedataframe.join(broadcast(smalldataframe), "key")

en términos DWH, donde el gran marco de datos puede ser como un hecho
smalldataframe puede ser como una dimensión

Como se describe en mi libro favorito (HPS) por favor. ver abajo para tener una mejor comprensión ..

Nota: La broadcast es de import org.apache.spark.sql.functions.broadcast no de SparkContext

Spark también utiliza automáticamente el spark.sql.conf.autoBroadcastJoinThreshold para determinar si se debe transmitir una tabla.

Consejo: consulte el método DataFrame.explain ()

def explain(): Unit Prints the physical plan to the console for debugging purposes.

¿Hay alguna manera de forzar la transmisión ignorando esta variable?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

NOTA :

Otra nota similar fuera de la caja wrt Hive (no chispa): Se puede lograr algo similar usando la sugerencia de colmena MAPJOIN como a continuación ...

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key hive> set hive.auto.convert.join=true; hive> set hive.auto.convert.join.noconditionaltask.size=20971520 hive> set hive.auto.convert.join.noconditionaltask=true; hive> set hive.auto.convert.join.use.nonstaged=true; hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb

Lectura adicional: consulte mi artículo sobre BHJ, SHJ, SMJ


Encontré que este código funciona para Broadcast Join en Spark 2.11 versión 2.0.0.

import org.apache.spark.sql.functions.broadcast val employeesDF = employeesRDD.toDF val departmentsDF = departmentsRDD.toDF // materializing the department data val tmpDepartments = broadcast(departmentsDF.as("departments")) import context.implicits._ employeesDF.join(broadcast(tmpDepartments), $"depId" === $"id", // join by employees.depID == departments.id "inner").show()

Aquí está la referencia para el código anterior Henning Kropp Blog, Broadcast Join with Spark


Esta es una limitación actual de chispa, ver SPARK-6235 . El límite de 2 GB también se aplica a las variables de difusión.

¿Está seguro de que no hay otra buena manera de hacer esto, por ejemplo, particiones diferentes?

De lo contrario, puede hackearlo creando manualmente múltiples variables de difusión que son <2GB cada una.



Puede insinuar que se transmita un marco de datos utilizando left.join(broadcast(right), ...)