tutorial spark org examples example ejemplo downloads apache-spark functional-programming code-organization

apache-spark - org - spark apache 2018



Organización de código chispa y mejores prácticas (1)

Creo que puedes suscribirte a Apache Spark , el canal de databricks en youtube, escuchar más y saber más, especialmente por las experiencias y lecciones de otros.

Aquí hay algunos videos recomendados:

y lo publiqué y aún lo actualizo en mi github y blog:

espero que esto te pueda ayudar ~

Entonces, después de haber pasado muchos años en un mundo orientado a objetos con la reutilización del código, los patrones de diseño y las mejores prácticas siempre tomados en cuenta, me encuentro luchando un poco con la organización y reutilización del código en el mundo de Spark.

Si trato de escribir código de manera reutilizable, casi siempre viene con un costo de rendimiento y termino reescribiéndolo a lo que sea óptimo para mi caso de uso particular. Esta constante "escribir lo que es óptimo para este caso de uso en particular" también afecta la organización del código, porque dividir el código en diferentes objetos o módulos es difícil cuando "todo realmente pertenece" y, por lo tanto, termino con muy pocos objetos "Dios" que contienen mucho tiempo. cadenas de transformaciones complejas. De hecho, con frecuencia pienso que si hubiera echado un vistazo a la mayoría del código Spark que estoy escribiendo ahora cuando estaba trabajando en el mundo orientado a objetos, habría hecho una mueca y lo descarté como "código spaghetti".

He navegado por Internet tratando de encontrar algún tipo de equivalente a las mejores prácticas del mundo orientado a objetos, pero sin mucha suerte. Puedo encontrar algunas "mejores prácticas" para la programación funcional, pero Spark simplemente agrega una capa adicional, porque el rendimiento es un factor muy importante aquí.

Entonces, mi pregunta es, ¿alguno de ustedes, gurús de Spark, encontró algunas mejores prácticas para escribir código de Spark que puedan recomendar?

EDITAR

Como se escribió en un comentario, en realidad no esperaba que nadie publicara una respuesta sobre cómo resolver este problema, sino que esperaba que alguien en esta comunidad hubiera encontrado algún tipo de Martin Fowler, que había escrito algunos artículos o publicaciones de blog en alguna parte sobre cómo abordar los problemas con la organización del código en el mundo de Spark.

@DanielDarabos sugirió que podría poner un ejemplo de una situación en la que la organización del código y el rendimiento son conflictivos. Si bien encuentro que frecuentemente tengo problemas con esto en mi trabajo diario, me resulta un poco difícil resumirlo en un buen ejemplo mínimo;) pero lo intentaré.

En el mundo orientado a objetos, soy un gran admirador del Principio de responsabilidad única, por lo que me aseguraría de que mis métodos solo fueran responsables de una cosa. Los hace reutilizables y fácilmente comprobables. Entonces, si tuviera que, por ejemplo, calcular la suma de algunos números en una lista (coincidiendo con algunos criterios) y tuviera que calcular el promedio del mismo número, definitivamente crearía dos métodos: uno que calcula la suma y otro que calculó el promedio. Me gusta esto:

def main(implicit args: Array[String]): Unit = { val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)) println("Summed weights for DK = " + summedWeights(list, "DK") println("Averaged weights for DK = " + averagedWeights(list, "DK") } def summedWeights(list: List, country: String): Double = { list.filter(_._1 == country).map(_._2).sum } def averagedWeights(list: List, country: String): Double = { val filteredByCountry = list.filter(_._1 == country) filteredByCountry.map(_._2).sum/ filteredByCountry.length }

Por supuesto, puedo seguir honrando a SRP en Spark:

def main(implicit args: Array[String]): Unit = { val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight") println("Summed weights for DK = " + summedWeights(df, "DK") println("Averaged weights for DK = " + averagedWeights(df, "DK") } def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = { import org.apache.spark.sql.functions._ import sqlContext.implicits._ val countrySpecific = df.filter(''country === country) val summedWeight = countrySpecific.agg(avg(''weight)) summedWeight.first().getDouble(0) } def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = { import org.apache.spark.sql.functions._ import sqlContext.implicits._ val countrySpecific = df.filter(''country === country) val summedWeight = countrySpecific.agg(sum(''weight)) summedWeight.first().getDouble(0) }

Pero debido a que mi df puede contener miles de millones de filas, preferiría no tener que realizar el filter dos veces. De hecho, el rendimiento está directamente relacionado con el costo de EMR, por lo que REALMENTE no quiero eso. Para superarlo, decido violar SRP y simplemente pongo las dos funciones en una y me aseguro de llamar a persistir en el DataFrame filtrado por DataFrame , así:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = { import org.apache.spark.sql.functions._ import sqlContext.implicits._ val countrySpecific = df.filter(''country === country).persist(StorageLevel.MEMORY_AND_DISK_SER) val summedWeights = countrySpecific.agg(sum(''weight)).first().getDouble(0) val averagedWeights = summedWeights / countrySpecific.count() (summedWeights, averagedWeights) }

Ahora, este ejemplo es, por supuesto, una gran simplificación de lo que se encuentra en la vida real. Aquí podría resolverlo simplemente filtrando y persistiendo df antes de pasarlo a las funciones de suma y promedio (que también sería más SRP), pero en la vida real puede haber una serie de cálculos intermedios que se necesitan una y otra vez. En otras palabras, la función de filter aquí es simplemente un intento de hacer un ejemplo simple de algo que se beneficiará de la persistencia. De hecho, creo que las llamadas a persist son una palabra clave aquí. Las llamadas persist acelerarán enormemente mi trabajo, pero el costo es que tengo que acoplar estrechamente todo el código que depende del DataFrame persistente, incluso si están lógicamente separados.