spark nuevo life chevrolet caracteristicas scala apache-spark hive left-join

scala - nuevo - La tarea final de Spark tarda 100 veces más que la primera 199, cómo mejorar



spark gt (1)

Estoy viendo algunos problemas de rendimiento al ejecutar consultas utilizando marcos de datos. He visto en mi investigación que, finalmente, las tareas de larga duración pueden ser una señal de que los datos no se alteran de manera óptima, pero no han encontrado un proceso detallado para resolver este problema.

Estoy comenzando a cargar dos tablas como marcos de datos, y luego me estoy uniendo a esas tablas en un campo. He tratado de agregar distribuir por (repartición) y ordenar por, para mejorar el rendimiento, pero todavía estoy viendo esta única tarea final de larga ejecución. Aquí hay una versión simple de mi código, tenga en cuenta que la consulta uno y dos no son tan simples y usan UDF para calcular algunos valores.

He intentado algunas configuraciones diferentes para spark.sql.shuffle . He intentado 100, pero falló (en realidad no depuré esto para ser sincero). Intenté 300, 4000 y 8000. El rendimiento disminuyó con cada aumento. Estoy seleccionando un solo día de datos, donde cada archivo es una hora.

val df1 = sqlContext.sql("Select * from Table1") val df2 = sqlContext.sql("Select * from Table2") val distributeDf1 = df1 .repartition(df1("userId")) .sortWithinPartitions(df1("userId")) val distributeDf2 = df2 .repartition(df2("userId")) .sortWithinPartitions(df2("userId")) distributeDf1.registerTempTable("df1") distributeDf2.registerTempTable("df2") val df3 = sqlContext .sql(""" Select df1.* from df1 left outer join df2 on df1.userId = df2.userId""")

Dado que parece que la partición por ID de usuario no es ideal, podría realizar una división por la marca de tiempo en su lugar. Si hago esto, ¿debería simplemente hacer la Fecha + Hora? Si tengo menos de 200 combos únicos para esto, ¿tendré albaceas vacías?


Claramente tienes un problema con una gran inclinación de datos correcta . Echemos un vistazo a las estadísticas que ha proporcionado :

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] df2 = [mean=1.0, stddev=0.0, count=18408194]

Con una media de alrededor de 5 y una desviación estándar de más de 2000, obtienes una larga cola .

Como algunas teclas son mucho más frecuentes que otras después de la creación de particiones, algunos ejecutores tendrán mucho más trabajo que hacer que los restantes.

Además, si su descripción sugiere que el problema puede ser con una o varias teclas que coincidan con la misma partición.

Entonces, primero identifiquemos los valores atípicos (pseudocódigo):

val mean = 4.989209978967438 val sd = 2255.654165352454 val df1 = sqlContext.sql("Select * from Table1") val counts = df.groupBy("userId").count.cache val frequent = counts .where($"count" > mean + 2 * sd) // Adjust threshold based on actual dist. .alias("frequent") .join(df1, Seq("userId"))

y el resto:

val infrequent = counts .where($"count" <= mean + 2 * sd) .alias("infrequent") .join(df1, Seq("userId"))

¿Es realmente algo que se espera? Si no, intenta identificar la fuente del problema en sentido ascendente.

Si se espera, puedes intentar :

  • difusión de una tabla más pequeña:

    val df2 = sqlContext.sql("Select * from Table2") df2.join(broadcast(df1), Seq("userId"), "rightouter")

  • división, unificación ( union ) y transmisión solo frecuente:

    df2.join(broadcast(frequent), Seq("userId"), "rightouter") .union(df2.join(infrequent, Seq("userId"), "rightouter"))

  • salando userId con algunos datos aleatorios

pero no deberías :

  • repartición de todos los datos y clasificación local (aunque la clasificación localmente sola no debería ser un problema)
  • realizar uniones Hash estándar en datos completos.