apache spark - spark - Optimización de la unión de DataFrame-Broadcast Hash Join
left join spark scala (5)
Estoy tratando de unir efectivamente dos DataFrames, uno de los cuales es grande y el segundo es un poco más pequeño.
¿Hay alguna manera de evitar toda esta confusión?
No puedo establecer
autoBroadCastJoinThreshold
, porque solo admite números enteros, y la tabla que intento transmitir es ligeramente mayor que el número entero de bytes.
¿Hay alguna manera de forzar la transmisión ignorando esta variable?
Broadcast Hash Joins (similar a map side join o map-side combine en Mapreduce):
En SparkSQL puede ver el tipo de unión que se realiza llamando a
queryExecution.executedPlan
.
Al igual que con el núcleo de Spark, si una de las tablas es mucho más pequeña que la otra, es posible que desee unir hash de difusión.
Puede insinuar a Spark SQL que un DF determinado se debe transmitir para unirse llamando al método
broadcast
en el
DataFrame
antes de unirse
Ejemplo:
largedataframe.join(broadcast(smalldataframe), "key")
en términos DWH, donde el gran marco de datos puede ser como un hecho
smalldataframe puede ser como una dimensión
Como se describe en mi libro favorito (HPS) por favor. ver abajo para tener una mejor comprensión ..
Nota: La
broadcast
es de
import org.apache.spark.sql.functions.broadcast
no de
SparkContext
Spark también utiliza automáticamente el
spark.sql.conf.autoBroadcastJoinThreshold
para determinar si se debe transmitir una tabla.
Consejo: consulte el método DataFrame.explain ()
def
explain(): Unit
Prints the physical plan to the console for debugging purposes.
¿Hay alguna manera de forzar la transmisión ignorando esta variable?
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
NOTA :
Otra nota similar fuera de la caja wrt Hive (no chispa): Se puede lograr algo similar usando la sugerencia de colmena
MAPJOIN
como a continuación ...
Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key
hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb
Lectura adicional: consulte mi artículo sobre BHJ, SHJ, SMJ
Encontré que este código funciona para Broadcast Join en Spark 2.11 versión 2.0.0.
import org.apache.spark.sql.functions.broadcast
val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF
// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))
import context.implicits._
employeesDF.join(broadcast(tmpDepartments),
$"depId" === $"id", // join by employees.depID == departments.id
"inner").show()
Aquí está la referencia para el código anterior Henning Kropp Blog, Broadcast Join with Spark
Esta es una limitación actual de chispa, ver SPARK-6235 . El límite de 2 GB también se aplica a las variables de difusión.
¿Está seguro de que no hay otra buena manera de hacer esto, por ejemplo, particiones diferentes?
De lo contrario, puede hackearlo creando manualmente múltiples variables de difusión que son <2GB cada una.
La configuración de
spark.sql.autoBroadcastJoinThreshold = -1
deshabilitará la transmisión por completo.
Consulte
Otras opciones de configuración en Spark SQL, DataFrames y Datasets Guide
.
Puede insinuar que se transmita un marco de datos utilizando
left.join(broadcast(right), ...)