Rendimiento de chispa para Scala vs Python
performance apache-spark (2)
Prefiero Python sobre Scala. Pero, como Spark está escrito de forma nativa en Scala, esperaba que mi código se ejecute más rápido en Scala que en la versión de Python por razones obvias.
Con esa suposición, pensé en aprender y escribir la versión Scala de un código de preprocesamiento muy común para aproximadamente 1 GB de datos.
Los datos se obtienen de la competencia
Kaggle
en
Kaggle
.
Solo para dar una visión general de los datos (contiene 1936 dimensiones y 145232 filas).
Los datos se componen de varios tipos, por ejemplo, int, float, string, boolean.
Estoy usando 6 núcleos de 8 para el procesamiento de Spark;
Es por eso que utilicé
minPartitions=6
para que cada núcleo tenga algo que procesar.
Código Scala
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "/001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Código de Python
input = sc.textFile(''train.csv'', minPartitions=6)
DELIM_1 = ''/001''
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace(''true'', ''1'').replace(''false'', ''0'')
vals = line.split('','')
vals2 = [''VAR_%04d%s%s'' %(e, DELIM_1, val.strip(''/"''))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + '','' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile(''output'')
Scala Performance Etapa 0 (38 minutos), Etapa 1 (18 segundos)
Python Performance Etapa 0 (11 minutos), Etapa 1 (7 segundos)
Ambos producen diferentes gráficos de visualización DAG (debido a que ambas imágenes muestran diferentes funciones de etapa 0 para Scala (
map
) y Python (
reduceByKey
))
Pero, esencialmente ambos códigos intentan transformar los datos en (dimension_id, cadena de la lista de valores) RDD y guardarlos en el disco. La salida se usará para calcular varias estadísticas para cada dimensión.
En cuanto al rendimiento, el código Scala para estos datos reales como este parece ejecutarse 4 veces más lento que la versión de Python. La buena noticia para mí es que me dio una buena motivación para seguir con Python. La mala noticia es que no entendí bien por qué.
Extensión a las respuestas anteriores -
Scala es más rápido en muchos aspectos en comparación con Python, pero hay algunas razones válidas por las que Python se está volviendo más popular que Scala, veamos algunos de ellos:
Python para Apache Spark es bastante fácil de aprender y usar. Sin embargo, esta no es la única razón por la cual Pyspark es una mejor opción que Scala. Hay más.
Python API for Spark puede ser más lento en el clúster, pero al final, los científicos de datos pueden hacer mucho más con él en comparación con Scala. La complejidad de Scala está ausente. La interfaz es simple y completa.
Hablar sobre la legibilidad del código, el mantenimiento y la familiaridad con Python API para Apache Spark es mucho mejor que Scala.
Python viene con varias bibliotecas relacionadas con el aprendizaje automático y el procesamiento del lenguaje natural. Esto ayuda en el análisis de datos y también tiene estadísticas muy maduras y probadas en el tiempo. Por ejemplo, numpy, pandas, scikit-learn, seaborn y matplotlib.
Nota: La mayoría de los científicos de datos utilizan un enfoque híbrido en el que utilizan lo mejor de ambas API.
Por último, la comunidad Scala a menudo resulta ser mucho menos útil para los programadores. Esto hace de Python un aprendizaje muy valioso. Si tiene suficiente experiencia con algún lenguaje de programación estáticamente tipado como Java, puede dejar de preocuparse por no usar Scala por completo.
La respuesta original sobre el código se puede encontrar a continuación.
En primer lugar, debe distinguir entre diferentes tipos de API, cada uno con sus propias consideraciones de rendimiento.
RDD API
(estructuras puras de Python con orquestación basada en JVM)
Este es el componente que se verá más afectado por el rendimiento del código Python y los detalles de la implementación de PySpark. Si bien es poco probable que el rendimiento de Python sea un problema, hay al menos algunos factores que debe tener en cuenta:
- Sobrecarga de la comunicación JVM. Prácticamente todos los datos que vienen hacia y desde el ejecutor de Python tienen que pasar a través de un socket y un trabajador JVM. Si bien esta es una comunicación local relativamente eficiente, todavía no es gratuita.
-
Ejecutores basados en procesos (Python) versus ejecutores basados en subprocesos (JVM múltiples subprocesos múltiples) (Scala). Cada ejecutor de Python se ejecuta en su propio proceso. Como efecto secundario, proporciona un aislamiento más fuerte que su contraparte JVM y cierto control sobre el ciclo de vida del ejecutor, pero un uso de memoria potencialmente significativamente mayor:
- huella de memoria del intérprete
- huella de las bibliotecas cargadas
- Difusión menos eficiente (cada proceso requiere su propia copia de una emisión)
-
Rendimiento del código Python en sí. En general, Scala es más rápido que Python, pero variará de una tarea a otra. Además, tiene múltiples opciones que incluyen JIT como Numba, extensiones C ( Cython ) o bibliotecas especializadas como Theano . Finalmente,
si no usa ML / MLlib (o simplemente la pila NumPy), considere usar PyPy como un intérprete alternativo. Ver SPARK-3094 . -
La configuración de PySpark proporciona la opción
spark.python.worker.reuse
que se puede usar para elegir entre bifurcar el proceso de Python para cada tarea y reutilizar el proceso existente. La última opción parece ser útil para evitar la recolección de basura costosa (es más una impresión que un resultado de pruebas sistemáticas), mientras que la primera (predeterminada) es óptima en caso de emisiones e importaciones costosas. - El recuento de referencias, utilizado como el método de recolección de basura de primera línea en CPython, funciona bastante bien con las cargas de trabajo típicas de Spark (procesamiento similar a la secuencia, sin ciclos de referencia) y reduce el riesgo de pausas de GC largas.
MLlib
(ejecución mixta de Python y JVM)
Las consideraciones básicas son más o menos las mismas que antes con algunos problemas adicionales. Si bien las estructuras básicas utilizadas con MLlib son objetos RDD Python simples, todos los algoritmos se ejecutan directamente usando Scala.
Significa un costo adicional de convertir objetos Python en objetos Scala y viceversa, un mayor uso de memoria y algunas limitaciones adicionales que cubriremos más adelante.
A partir de ahora (Spark 2.x), la API basada en RDD está en modo de mantenimiento y está programada para eliminarse en Spark 3.0 .
DataFrame API y Spark ML
(Ejecución JVM con código Python limitado al controlador)
Estas son probablemente la mejor opción para las tareas estándar de procesamiento de datos. Dado que el código de Python se limita principalmente a operaciones lógicas de alto nivel en el controlador, no debería haber diferencia de rendimiento entre Python y Scala.
Una única excepción es el uso de UDF de Python en fila que son significativamente menos eficientes que sus equivalentes Scala. Si bien hay alguna posibilidad de mejoras (ha habido un desarrollo sustancial en Spark 2.0.0), la mayor limitación es el viaje de ida y vuelta completo entre la representación interna (JVM) y el intérprete de Python. Si es posible, debe favorecer una composición de expresiones incorporadas (por example comportamiento de Python UDF se ha mejorado en Spark 2.0.0, pero aún es subóptimo en comparación con la ejecución nativa. Esto puede mejorar en el futuro con la introducción de las UDF vectorizadas (SPARK-21190) .
También asegúrese de evitar el paso innecesario de datos entre
DataFrames
y
RDDs
.
Esto requiere serialización y deserialización costosas, sin mencionar la transferencia de datos desde y hacia el intérprete de Python.
Vale la pena señalar que las llamadas Py4J tienen una latencia bastante alta. Esto incluye llamadas simples como:
from pyspark.sql.functions import col
col("foo")
Por lo general, no debería importar (la sobrecarga es constante y no depende de la cantidad de datos), pero en el caso de aplicaciones de software en tiempo real, puede considerar el almacenamiento en caché / reutilización de contenedores Java.
GraphX y Spark DataSets
Por ahora (Spark
1.6
2.1) ninguno proporciona API de PySpark, por lo que puede decir que PySpark es infinitamente peor que Scala.
En la práctica, el desarrollo de GraphX se detuvo casi por completo y el proyecto se encuentra actualmente en modo de mantenimiento con los tickets JIRA relacionados cerrados, ya que no se solucionarán . GraphFrames biblioteca GraphFrames proporciona una biblioteca alternativa de procesamiento de gráficos con enlaces de Python.
Conjunto de datos
Subjetivamente hablando, no hay mucho lugar para los
Datasets
de
Datasets
tipo estático en Python e incluso si existiera la implementación actual de Scala es demasiado simplista y no proporciona los mismos beneficios de rendimiento que
DataFrame
.
Transmisión
Por lo que he visto hasta ahora, recomendaría encarecidamente usar Scala sobre Python. Puede cambiar en el futuro si PySpark obtiene soporte para flujos estructurados, pero en este momento Scala API parece ser mucho más robusto, completo y eficiente. Mi experiencia es bastante limitada.
La transmisión estructurada en Spark 2.x parece reducir la brecha entre los idiomas, pero por ahora todavía está en sus primeros días. Sin embargo, la API basada en RDD ya se menciona como "transmisión heredada" en la documentación de Databricks (fecha de acceso 2017-03-03), por lo que es razonable esperar más esfuerzos de unificación.
Consideraciones de no rendimiento
Paridad de característicasNo todas las características de Spark están expuestas a través de PySpark API. Asegúrese de verificar si las piezas que necesita ya están implementadas e intente comprender las posibles limitaciones.
Es particularmente importante cuando utiliza MLlib y contextos mixtos similares (consulte
Llamar a la función Java / Scala desde una tarea
).
Para ser justos, algunas partes de la API de PySpark, como
mllib.linalg
, proporcionan un conjunto de métodos más completo que Scala.
La API PySpark refleja de cerca su contraparte Scala y, como tal, no es exactamente Pythonic. Significa que es bastante fácil de mapear entre idiomas, pero al mismo tiempo, el código de Python puede ser significativamente más difícil de entender.
Arquitectura complejaEl flujo de datos de PySpark es relativamente complejo en comparación con la ejecución pura de JVM. Es mucho más difícil razonar sobre programas PySpark o depuración. Además, al menos una comprensión básica de Scala y JVM en general es prácticamente imprescindible.
Spark 2.xy más allá
El cambio continuo hacia
Dataset
API, con API RDD congelada, brinda oportunidades y desafíos para los usuarios de Python.
Si bien las partes de alto nivel de la API son mucho más fáciles de exponer en Python, las características más avanzadas son prácticamente imposibles de usar
directamente
.
Además, las funciones nativas de Python continúan siendo ciudadanos de segunda clase en el mundo SQL.
Esperemos que esto mejore en el futuro con la serialización de Apache Arrow (
los esfuerzos actuales apuntan a la
collection
datos
collection
pero serde UDF es un
objetivo a largo plazo
).
Para proyectos que dependen en gran medida de la base de código de Python, las alternativas puras de Python (como Dask o Ray ) podrían ser una alternativa interesante.
No tiene que ser uno contra el otro
La API Spark DataFrame (SQL, Dataset) proporciona una forma elegante de integrar el código Scala / Java en la aplicación PySpark.
Puede usar
DataFrames
para exponer datos a un código JVM nativo y volver a leer los resultados.
He explicado algunas opciones
en otro lugar
y puedes encontrar un ejemplo funcional de ida y vuelta en Python-Scala en
Cómo usar una clase Scala dentro de Pyspark
.
Se puede aumentar aún más mediante la introducción de tipos definidos por el usuario (consulte ¿Cómo definir el esquema para el tipo personalizado en Spark SQL? ).
¿Qué tiene de malo el código provisto en la pregunta?
(Descargo de responsabilidad: punto de vista de Pythonista. Lo más probable es que me haya perdido algunos trucos de Scala)
En primer lugar, hay una parte en su código que no tiene ningún sentido.
Si ya tiene pares
(key, value)
creados usando
zipWithIndex
o
enumerate
cuál es el punto de crear una cadena solo para dividirla inmediatamente después.
flatMap
no funciona de forma recursiva, por lo que simplemente puede generar tuplas y omitir el siguiente
map
.
Otra parte que encuentro problemática es
reduceByKey
.
En términos generales,
reduceByKey
es útil si la aplicación de la función de agregado puede reducir la cantidad de datos que se deben barajar.
Como simplemente concatena cadenas, no hay nada que ganar aquí.
Ignorando cosas de bajo nivel, como el número de referencias, la cantidad de datos que tiene que transferir es exactamente la misma que para
groupByKey
.
Normalmente no me detendría en eso, pero por lo que puedo decir es un cuello de botella en su código Scala.
Unir cadenas en JVM es una operación bastante costosa (ver por ejemplo:
¿La concatenación de cadenas en scala es tan costosa como en Java?
).
Significa que algo como esto
_.reduceByKey((v1: String, v2: String) => v1 + '','' + v2)
que es equivalente a
input4.reduceByKey(valsConcat)
en su código no es una buena idea.
Si desea evitar
groupByKey
, puede intentar usar
groupByKey
con
StringBuilder
.
Algo similar a esto debería hacer el truco:
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
Pero dudo que valga la pena.
Teniendo en cuenta lo anterior, reescribí su código de la siguiente manera:
Scala :
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
Python :
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace(''true'', ''1'').replace(''false'', ''0'')
vals = line.split('','')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile(''train.csv'', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
Resultados
En modo
local[6]
(Intel (R) Xeon (R) CPU E3-1245 V2 @ 3.40GHz) con 4GB de memoria por ejecutor se necesita (n = 3):
- Scala - media: 250.00s, stdev: 12.49
- Python - media: 246.66s, stdev: 1.15
Estoy bastante seguro de que la mayor parte de ese tiempo se dedica a barajar, serializar, deserializar y otras tareas secundarias. Solo por diversión, aquí hay un ingenuo código de subproceso único en Python que realiza la misma tarea en esta máquina en menos de un minuto:
def go():
with open("train.csv") as fr:
lines = [
line.replace(''true'', ''1'').replace(''false'', ''0'').split(",")
for line in fr]
return zip(*lines[1:])