apache-spark - org - spark apache 2018
Organización de código chispa y mejores prácticas (1)
Creo que puedes suscribirte a
Apache Spark
, el canal de
databricks
en youtube, escuchar más y saber más, especialmente por las experiencias y lecciones de otros.
Aquí hay algunos videos recomendados:
- Visualización de SparkUI
-
Chispa en producción: lecciones de más de 100 usuarios de producción
-
slide
Spark in Production: Lecciones de más de 100 usuarios de producción -
slide
Construcción, depuración y ajuste de tuberías de aprendizaje automático de Spark -
slide
los 5 errores principales al escribir aplicaciones Spark -
Una comprensión más profunda de las chispas internas - Aaron Davidson (Databricks)
-
slide
Una comprensión más profunda de los aspectos internos de Spark - Aaron Davidson (Databricks)
y lo publiqué y aún lo actualizo en mi github y blog:
espero que esto te pueda ayudar ~
Entonces, después de haber pasado muchos años en un mundo orientado a objetos con la reutilización del código, los patrones de diseño y las mejores prácticas siempre tomados en cuenta, me encuentro luchando un poco con la organización y reutilización del código en el mundo de Spark.
Si trato de escribir código de manera reutilizable, casi siempre viene con un costo de rendimiento y termino reescribiéndolo a lo que sea óptimo para mi caso de uso particular. Esta constante "escribir lo que es óptimo para este caso de uso en particular" también afecta la organización del código, porque dividir el código en diferentes objetos o módulos es difícil cuando "todo realmente pertenece" y, por lo tanto, termino con muy pocos objetos "Dios" que contienen mucho tiempo. cadenas de transformaciones complejas. De hecho, con frecuencia pienso que si hubiera echado un vistazo a la mayoría del código Spark que estoy escribiendo ahora cuando estaba trabajando en el mundo orientado a objetos, habría hecho una mueca y lo descarté como "código spaghetti".
He navegado por Internet tratando de encontrar algún tipo de equivalente a las mejores prácticas del mundo orientado a objetos, pero sin mucha suerte. Puedo encontrar algunas "mejores prácticas" para la programación funcional, pero Spark simplemente agrega una capa adicional, porque el rendimiento es un factor muy importante aquí.
Entonces, mi pregunta es, ¿alguno de ustedes, gurús de Spark, encontró algunas mejores prácticas para escribir código de Spark que puedan recomendar?
EDITAR
Como se escribió en un comentario, en realidad no esperaba que nadie publicara una respuesta sobre cómo resolver este problema, sino que esperaba que alguien en esta comunidad hubiera encontrado algún tipo de Martin Fowler, que había escrito algunos artículos o publicaciones de blog en alguna parte sobre cómo abordar los problemas con la organización del código en el mundo de Spark.
@DanielDarabos sugirió que podría poner un ejemplo de una situación en la que la organización del código y el rendimiento son conflictivos. Si bien encuentro que frecuentemente tengo problemas con esto en mi trabajo diario, me resulta un poco difícil resumirlo en un buen ejemplo mínimo;) pero lo intentaré.
En el mundo orientado a objetos, soy un gran admirador del Principio de responsabilidad única, por lo que me aseguraría de que mis métodos solo fueran responsables de una cosa. Los hace reutilizables y fácilmente comprobables. Entonces, si tuviera que, por ejemplo, calcular la suma de algunos números en una lista (coincidiendo con algunos criterios) y tuviera que calcular el promedio del mismo número, definitivamente crearía dos métodos: uno que calcula la suma y otro que calculó el promedio. Me gusta esto:
def main(implicit args: Array[String]): Unit = {
val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))
println("Summed weights for DK = " + summedWeights(list, "DK")
println("Averaged weights for DK = " + averagedWeights(list, "DK")
}
def summedWeights(list: List, country: String): Double = {
list.filter(_._1 == country).map(_._2).sum
}
def averagedWeights(list: List, country: String): Double = {
val filteredByCountry = list.filter(_._1 == country)
filteredByCountry.map(_._2).sum/ filteredByCountry.length
}
Por supuesto, puedo seguir honrando a SRP en Spark:
def main(implicit args: Array[String]): Unit = {
val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")
println("Summed weights for DK = " + summedWeights(df, "DK")
println("Averaged weights for DK = " + averagedWeights(df, "DK")
}
def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val countrySpecific = df.filter(''country === country)
val summedWeight = countrySpecific.agg(avg(''weight))
summedWeight.first().getDouble(0)
}
def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val countrySpecific = df.filter(''country === country)
val summedWeight = countrySpecific.agg(sum(''weight))
summedWeight.first().getDouble(0)
}
Pero debido a que mi
df
puede contener miles de millones de filas, preferiría no tener que realizar el
filter
dos veces.
De hecho, el rendimiento está directamente relacionado con el costo de EMR, por lo que REALMENTE no quiero eso.
Para superarlo, decido violar SRP y simplemente pongo las dos funciones en una y me aseguro de llamar a persistir en el
DataFrame
filtrado por
DataFrame
, así:
def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val countrySpecific = df.filter(''country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
val summedWeights = countrySpecific.agg(sum(''weight)).first().getDouble(0)
val averagedWeights = summedWeights / countrySpecific.count()
(summedWeights, averagedWeights)
}
Ahora, este ejemplo es, por supuesto, una gran simplificación de lo que se encuentra en la vida real.
Aquí podría resolverlo simplemente filtrando y persistiendo
df
antes de
pasarlo a las funciones de suma y promedio (que también sería más SRP), pero en la vida real puede haber una serie de cálculos intermedios que se necesitan una y otra vez.
En otras palabras, la función de
filter
aquí es simplemente un intento de hacer un ejemplo
simple
de algo que se beneficiará de la persistencia.
De hecho, creo que las llamadas a
persist
son una palabra clave aquí.
Las llamadas
persist
acelerarán enormemente mi trabajo, pero el costo es que tengo que acoplar estrechamente todo el código que depende del
DataFrame
persistente, incluso si están lógicamente separados.