apache-spark - spark - try databricks
¿Por qué Spark piensa que esta es una unión cruzada/cartesiana? (3)
Esto sucede porque
join
estructuras que comparten el mismo linaje y esto lleva a una condición trivialmente igual:
res2.explain()
== Physical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Join Inner, ((idx#204L = key1#209L) && (key2#210L = idx#204L))
:- Filter isnotnull(idx#204L)
: +- LogicalRDD [idx#204L, val#205]
+- Filter ((isnotnull(key2#210L) && (key2#210L = key1#209L)) && isnotnull(key1#209L))
+- LogicalRDD [key1#209L, key2#210L, val#211L]
and
LogicalRDD [idx#235L, val#236]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
En este caso, debe usar alias:
from pyspark.sql.functions import col
rdd1 = spark.createDataFrame(...).alias(''rdd1'')
rdd2 = spark.createDataFrame(...).alias(''rdd2'')
res1 = rdd1.join(rdd2, col(''rdd1.idx'') == col(''rdd2.key1'')).alias(''res1'')
res1.join(rdd1, on=col(''res1.key2'') == col(''rdd1.idx'')).explain()
== Physical Plan ==
*SortMergeJoin [key2#297L], [idx#360L], Inner
:- *Sort [key2#297L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key2#297L, 200)
: +- *SortMergeJoin [idx#290L], [key1#296L], Inner
: :- *Sort [idx#290L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(idx#290L, 200)
: : +- *Filter isnotnull(idx#290L)
: : +- Scan ExistingRDD[idx#290L,val#291]
: +- *Sort [key1#296L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key1#296L, 200)
: +- *Filter (isnotnull(key2#297L) && isnotnull(key1#296L))
: +- Scan ExistingRDD[key1#296L,key2#297L,val#298L]
+- *Sort [idx#360L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(idx#360L, 200)
+- *Filter isnotnull(idx#360L)
+- Scan ExistingRDD[idx#360L,val#361]
Para más detalles ver SPARK-6459 .
Quiero unir datos dos veces como a continuación:
rdd1 = spark.createDataFrame([(1, ''a''), (2, ''b''), (3, ''c'')], [''idx'', ''val''])
rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], [''key1'', ''key2'', ''val''])
res1 = rdd1.join(rdd2, on=[rdd1[''idx''] == rdd2[''key1'']])
res2 = res1.join(rdd1, on=[res1[''key2''] == rdd1[''idx'']])
res2.show()
Entonces me sale un error:
pyspark.sql.utils.AnalysisException: las uniones cartesianas pueden ser prohibitivamente caras y están deshabilitadas por defecto. Para habilitarlos explícitamente, establezca spark.sql.crossJoin.enabled = true; ''
Pero creo que esto no es una unión cruzada
ACTUALIZAR:
res2.explain()
== Physical Plan ==
CartesianProduct
:- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner
: :- *Sort [idx#0L ASC, idx#0L ASC], false, 0
: : +- Exchange hashpartitioning(idx#0L, idx#0L, 200)
: : +- *Filter isnotnull(idx#0L)
: : +- Scan ExistingRDD[idx#0L,val#1]
: +- *Sort [key1#5L ASC, key2#6L ASC], false, 0
: +- Exchange hashpartitioning(key1#5L, key2#6L, 200)
: +- *Filter ((isnotnull(key2#6L) && (key2#6L = key1#5L)) && isnotnull(key1#5L))
: +- Scan ExistingRDD[key1#5L,key2#6L,val#7L]
+- Scan ExistingRDD[idx#40L,val#41]
Persistir no funcionó para mí.
Lo superé con alias en DataFrames
from pyspark.sql.functions import col
df1.alias("buildings").join(df2.alias("managers"), col("managers.distinguishedName") == col("buildings.manager"))
También tuve éxito cuando persistí el marco de datos antes de la segunda unión.
Algo como:
res1 = rdd1.join(rdd2, col(''rdd1.idx'') == col(''rdd2.key1'')).persist()
res1.join(rdd1, on=col(''res1.key2'') == col(''rdd1.idx''))