tutorial spark example español createorreplacetempview python apache-spark scikit-learn spark-streaming

español - spark python example



Cómo evitar una ventana Spark Streaming bloqueando otra ventana con ambos ejecutando código nativo de Python (2)

Descargo de responsabilidad: este es solo un conjunto de ideas. Ninguno de estos ha sido probado en la práctica.

Un par de cosas que puedes probar:

  1. No collect para predict scikit-learn modelos scikit-learn suelen ser serializables, por lo que el proceso de predicción puede manejarse fácilmente en el clúster:

    def predict(time, rdd): ... model = Custom_ModelContainer.getmodel() pred = (df.rdd.map(lambda lp: lp.features.toArray()) .mapPartitions(lambda iter: model.predict(np.array(list(iter))))) ...

    No solo debe paralelizar las predicciones, sino también, si los datos sin procesar no se pasan a la GUI, reducen la cantidad de datos que se deben recopilar.

  2. Intenta collect y enviar datos de forma asincrónica. PySpark no proporciona el método collectAsync pero puede intentar lograr algo similar con concurrent.futures :

    from pyspark.rdd import RDD from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=4) def submit_to_gui(*args): ... def submit_if_success(f): if not f.exception(): executor.submit(submit_to_gui, f.result())

    continuar desde 1.

    def predict(time, rdd): ... f = executor.submit(RDD.collect, pred) f.add_done_callback(submit_if_success) ...

  3. Si realmente desea utilizar el scikit-learn local scikit-learn , intente collect y fit utilizando futuros como se scikit-learn anteriormente. También puede intentar recopilar solo una vez, especialmente si los datos no están en caché:

    def collect_and_train(df): y, X = zip(*((p.label, p.features.toArray()) for p in df.collect())) ... return SVR().fit(X_train, y_train) def set_if_success(f): if not f.exception(): Custom_ModelContainer.setModel(f.result()) def trainModel(time, rdd): ... f = excutor.submit(collect_and_train, df) f.add_done_callback(set_if_success) ...

  4. Mueva el proceso de capacitación al clúster ya sea utilizando soluciones ya existentes como spark-sklearn o enfoque personalizado:

    • solución ingenua: prepare sus datos, coalesce(1) y entrene un solo modelo usando mapPartitions .
    • solución distribuida: cree y valide un modelo separado por partición usando mapPartitions , recopile modelos y utilícelos como un conjunto, por ejemplo, tomando una predicción promedio o mediana.
  5. Deseche scikit-learn y utilice un modelo que pueda formarse y mantenerse en un entorno de transmisión distribuido (por ejemplo, StreamingLinearRegressionWithSGD ).

    Su enfoque actual hace que Spark quede obsoleto. Si puede entrenar el modelo localmente, existe una buena posibilidad de que pueda realizar todas las demás tareas mucho más rápido en la máquina local. De lo contrario, su programa simplemente fallará en collect .

Estoy ejecutando Spark Streaming con dos ventanas diferentes (en la ventana para entrenar un modelo con SKLearn y la otra para predecir valores basados ​​en ese modelo) y me pregunto cómo puedo evitar una ventana (la ventana de capacitación "lenta") para entrenar un modelo, sin "bloquear" la ventana de predicción "rápida".
Mi código simplificado se ve de la siguiente manera:

conf = SparkConf() conf.setMaster("local[4]") sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 1) stream = ssc.socketTextStream("localhost", 7000) import Custom_ModelContainer ### Window 1 ### ### predict data based on model computed in window 2 ### def predict(time, rdd): try: # ... rdd conversion to df, feature extraction etc... # regular python code X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) pred = Custom_ModelContainer.getmodel().predict(X) # send prediction to GUI except Exception, e: print e predictionStream = stream.window(60,60) predictionStream.foreachRDD(predict) ### Window 2 ### ### fit new model ### def trainModel(time, rdd): try: # ... rdd conversion to df, feature extraction etc... X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) y = np.array(df.map(lambda lp: lp.label).collect()) # train test split etc... model = SVR().fit(X_train, y_train) Custom_ModelContainer.setModel(model) except Exception, e: print e modelTrainingStream = stream.window(600,600) modelTrainingStream.foreachRDD(trainModel)

(Nota: Custom_ModelContainer es una clase que escribí para guardar y recuperar el modelo entrenado)

En general, mi configuración funciona bien, con la excepción de que cada vez que se entrena un nuevo modelo en la segunda ventana (que demora alrededor de un minuto), la primera ventana no calcula las predicciones hasta que finaliza el entrenamiento modelo. En realidad, creo que esto tiene sentido, ya que el ajuste del modelo y las predicciones se calculan en el nodo maestro (en una configuración no distribuida, debido a SKLearn).

Entonces mi pregunta es la siguiente: ¿sería posible entrenar el modelo en un solo nodo trabajador (en lugar del nodo maestro)? Si es así, ¿cómo podría lograr esto último y eso realmente resolvería mi problema?

Si no, ¿hay alguna otra sugerencia sobre cómo puedo hacer que dicha configuración funcione sin retrasar los cálculos en la ventana 1?

Cualquier ayuda es muy apreciada.

EDITAR: Creo que la pregunta más general sería: ¿cómo puedo ejecutar dos tareas diferentes en dos trabajadores diferentes en paralelo?


Creo que lo que está buscando es la propiedad: "spark.streaming.concurrentJobs", que por defecto es 1. Aumentar esto debería permitirle ejecutar múltiples funciones foreachRDD en paralelo.

En JobScheduler.scala:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)

Solo un recordatorio para que también tenga en cuenta la seguridad del hilo en su contenedor de modelo personalizado si va a mutar y leer en paralelo. :)