python - ¿Es Spark''s KMeans incapaz de manejar bigdata?
apache-spark k-means (2)
KMeans tiene varios parámetros para su training , con el modo de inicialización predeterminado en kmeans ||. ¡El problema es que marcha rápidamente (menos de 10 minutos) a las primeras 13 etapas, pero luego se cuelga por completo , sin generar un error!
Ejemplo mínimo que reproduce el problema (tendrá éxito si uso 1000 puntos o una inicialización aleatoria):
from pyspark.context import SparkContext
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.random import RandomRDDs
if __name__ == "__main__":
sc = SparkContext(appName=''kmeansMinimalExample'')
# same with 10000 points
data = RandomRDDs.uniformVectorRDD(sc, 10000000, 64)
C = KMeans.train(data, 8192, maxIterations=10)
sc.stop()
El trabajo no hace nada (no tiene éxito, falla o progresa ...), como se muestra a continuación. No hay tareas activas / fallidas en la pestaña Ejecutores. Los registros de Stdout y Stderr no tienen nada particularmente interesante:
Si uso
k=81
, en lugar de 8192, tendrá éxito:
Tenga en cuenta que las dos llamadas de
takeSample()
no deberían ser un problema
, ya que se llamaron dos veces en el caso de inicialización aleatoria.
Entonces, ¿qué está pasando? ¿Spark''s Kmeans no puede escalar ? ¿Alguien sabe? Se puede reproducir?
Si se tratara de un problema de memoria, recibiría advertencias y errores, como lo había estado antes .
Nota: los comentarios de placeybordeaux se basan en la ejecución del trabajo en modo cliente , donde las configuraciones del controlador se invalidan, causando el código de salida 143 y tal (ver historial de edición), no en modo de clúster, donde no se informa ningún error , la aplicación simplemente se cuelga .
Desde cero323: ¿Por qué el algoritmo Spark Mllib KMeans es extremadamente lento? está relacionado, pero creo que él es testigo de algún progreso, mientras que el mío se cuelga, dejé un comentario ...
Creo que el "ahorcamiento" se debe a que sus ejecutores siguen muriendo. Como mencioné en una conversación paralela, este código funciona bien para mí, localmente y en un clúster, en Pyspark y Scala. Sin embargo, lleva mucho más tiempo del que debería. Se pasa casi todo el tiempo en k-means || inicialización
Abrí https://issues.apache.org/jira/browse/SPARK-17389 para rastrear dos mejoras principales, una de las cuales puede usar ahora. Editar: realmente, vea también https://issues.apache.org/jira/browse/SPARK-11560
Primero, hay algunas optimizaciones de código que acelerarían el inicio en aproximadamente un 13%.
Sin embargo, la mayor parte del problema es que por defecto tiene 5 pasos de k-means || init, cuando parece que 2 es casi siempre igual de bueno. Puede establecer los pasos de inicialización en 2 para ver una aceleración, especialmente en la etapa que está colgando ahora.
En mi prueba (más pequeña) en mi computadora portátil, el tiempo de inicio pasó de 5:54 a 1:41 con ambos cambios, principalmente debido a la configuración de los pasos de inicio.
Si su RDD es tan grande, collectAsMap intentará copiar cada elemento individual en el RDD en el programa de controlador único, y luego se quedará sin memoria y se bloqueará. Aunque haya particionado los datos, collectAsMap envía todo al controlador y su trabajo se bloquea. Puede asegurarse de que el número de elementos que devuelve esté limitado llamando a take o takeSample, o tal vez filtrando o muestreando su RDD. Del mismo modo, tenga cuidado con estas otras acciones también a menos que esté seguro de que el tamaño de su conjunto de datos es lo suficientemente pequeño como para caber en la memoria:
countByKey, countByValue, collect
Si realmente necesita cada uno de estos valores del RDD y los datos son demasiado grandes para caber en la memoria, puede escribir el RDD en archivos o exportar el RDD a una base de datos que sea lo suficientemente grande como para contener todos los datos. Como está utilizando una API, creo que no puede hacerlo (¿quizás reescribir todo el código? ¿Aumentar la memoria?). Creo que este collectAsMap en el método runAlgorithm es algo realmente malo en Kmeans ( https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html ) ...