apache spark - Un pyspark eficiente unirse
apache-spark (2)
He leído mucho sobre cómo hacer uniones eficientes en pyspark. Las formas de lograr combinaciones eficientes que he encontrado son básicamente:
- Usa una transmisión de difusión si puedes. ( Normalmente no puedo porque los marcos de datos son demasiado grandes)
- Considere el uso de un grupo muy grande. (Prefiero no por el dinero ).
- Usa el mismo particionador .
El último es el que preferiría probar, pero no puedo encontrar una manera de hacerlo en pyspark. He intentado:
df.repartition(numberOfPartitions,[''parition_col1'',''partition_col2''])
pero no ayuda, todavía toma demasiado tiempo hasta que lo detengo, porque la chispa se atasca en los últimos trabajos.
Entonces, ¿cómo puedo usar el mismo particionador en pyspark y acelerar mis combinaciones, o incluso deshacerme de las combinaciones que tardan una eternidad? ¿Qué código necesito usar?
PD : He comprobado otros artículos, incluso en stackoverflow , pero aún no puedo ver el código.
Gracias @vikrantrana por su respuesta, lo intentaré si alguna vez lo necesito. Digo esto porque descubrí que el problema no era con las uniones "grandes" , el problema era la cantidad de cálculos antes de la unión. Imagina este escenario:
Leí una tabla y la almaceno en un marco de datos, llamado
df1
.
Leí otra tabla y la
df2
en
df2
.
Luego, realizo una gran cantidad de cálculos y combinaciones para ambos, y termino con una combinación entre
df1
y
df2
.
El problema aquí no era el tamaño, el problema era que el plan de ejecución de la chispa era enorme y no podía mantener todas las tablas intermedias en la memoria, así que comenzó a escribir en el disco y tomó mucho tiempo.
La solución que me funcionó fue persistir
df1
y
df2
en el disco antes de la unión (también persistí otros marcos de datos intermedios que fueron el resultado de cálculos grandes y complejos).
También puede utilizar un enfoque de dos pasos, en caso de que se ajuste a sus requisitos. Primero, vuelva a particionar los datos y persista utilizando tablas particionadas (dataframe.write.partitionBy ()). Luego, únase a las subparticiones en serie en un bucle, "agregando" a la misma tabla de resultados finales. Fue bien explicado por Sim. ver enlace abajo
Enfoque de dos pasos para unir grandes marcos de datos en pyspark
basado en el caso explicado anteriormente, pude unir las subparticiones en serie en un bucle y luego persistir los datos unidos a la tabla de la colmena.
Aquí está el código.
from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col(''emp_id'')%5).repartition(5, ''par_id'').write.format(''orc'').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col(''emp_id'')%5).repartition(5, ''par_id'').write.format(''orc'').partitionBy("par_id").saveAsTable("UDB.temptable_2")
Por lo tanto, si se está uniendo en un entero emp_id, puede particionar por el ID módulo en cierto número y de esta manera puede volver a distribuir la carga entre las particiones de chispa y los registros que tengan claves similares se agruparán y residirán en la misma partición. luego puede leer y recorrer cada sub partición de datos y unir ambos marcos de datos y mantenerlos juntos.
counter =0;
paritioncount = 4;
while counter<=paritioncount:
query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
EMP_DF1 =spark.sql(query1)
EMP_DF2 =spark.sql(query2)
df1 = EMP_DF1.alias(''df1'')
df2 = EMP_DF2.alias(''df2'')
innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,''inner'').select(''df1.*'')
innerjoin_EMP.show()
innerjoin_EMP.write.format(''orc'').insertInto("UDB.temptable")
counter = counter +1
He intentado esto y esto está funcionando bien. Este es solo un ejemplo para demostrar el enfoque de dos pasos. Sus condiciones de unión pueden variar y la cantidad de particiones también depende del tamaño de sus datos.