español - spark python example
Cómo evitar una ventana Spark Streaming bloqueando otra ventana con ambos ejecutando código nativo de Python (2)
Descargo de responsabilidad: este es solo un conjunto de ideas. Ninguno de estos ha sido probado en la práctica.
Un par de cosas que puedes probar:
No
collect
parapredict
scikit-learn
modelosscikit-learn
suelen ser serializables, por lo que el proceso de predicción puede manejarse fácilmente en el clúster:def predict(time, rdd): ... model = Custom_ModelContainer.getmodel() pred = (df.rdd.map(lambda lp: lp.features.toArray()) .mapPartitions(lambda iter: model.predict(np.array(list(iter))))) ...
No solo debe paralelizar las predicciones, sino también, si los datos sin procesar no se pasan a la GUI, reducen la cantidad de datos que se deben recopilar.
Intenta
collect
y enviar datos de forma asincrónica. PySpark no proporciona el métodocollectAsync
pero puede intentar lograr algo similar conconcurrent.futures
:from pyspark.rdd import RDD from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=4) def submit_to_gui(*args): ... def submit_if_success(f): if not f.exception(): executor.submit(submit_to_gui, f.result())
continuar desde 1.
def predict(time, rdd): ... f = executor.submit(RDD.collect, pred) f.add_done_callback(submit_if_success) ...
Si realmente desea utilizar el
scikit-learn
localscikit-learn
, intentecollect
yfit
utilizando futuros como sescikit-learn
anteriormente. También puede intentar recopilar solo una vez, especialmente si los datos no están en caché:def collect_and_train(df): y, X = zip(*((p.label, p.features.toArray()) for p in df.collect())) ... return SVR().fit(X_train, y_train) def set_if_success(f): if not f.exception(): Custom_ModelContainer.setModel(f.result()) def trainModel(time, rdd): ... f = excutor.submit(collect_and_train, df) f.add_done_callback(set_if_success) ...
Mueva el proceso de capacitación al clúster ya sea utilizando soluciones ya existentes como
spark-sklearn
o enfoque personalizado:- solución ingenua: prepare sus datos,
coalesce(1)
y entrene un solo modelo usandomapPartitions
. - solución distribuida: cree y valide un modelo separado por partición usando
mapPartitions
, recopile modelos y utilícelos como un conjunto, por ejemplo, tomando una predicción promedio o mediana.
- solución ingenua: prepare sus datos,
Deseche
scikit-learn
y utilice un modelo que pueda formarse y mantenerse en un entorno de transmisión distribuido (por ejemplo,StreamingLinearRegressionWithSGD
).Su enfoque actual hace que Spark quede obsoleto. Si puede entrenar el modelo localmente, existe una buena posibilidad de que pueda realizar todas las demás tareas mucho más rápido en la máquina local. De lo contrario, su programa simplemente fallará en
collect
.
Estoy ejecutando Spark Streaming con dos ventanas diferentes (en la ventana para entrenar un modelo con SKLearn y la otra para predecir valores basados en ese modelo) y me pregunto cómo puedo evitar una ventana (la ventana de capacitación "lenta") para entrenar un modelo, sin "bloquear" la ventana de predicción "rápida".
Mi código simplificado se ve de la siguiente manera:
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)
import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)
# send prediction to GUI
except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)
### Window 2 ###
### fit new model ###
def trainModel(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
y = np.array(df.map(lambda lp: lp.label).collect())
# train test split etc...
model = SVR().fit(X_train, y_train)
Custom_ModelContainer.setModel(model)
except Exception, e: print e
modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)
(Nota: Custom_ModelContainer es una clase que escribí para guardar y recuperar el modelo entrenado)
En general, mi configuración funciona bien, con la excepción de que cada vez que se entrena un nuevo modelo en la segunda ventana (que demora alrededor de un minuto), la primera ventana no calcula las predicciones hasta que finaliza el entrenamiento modelo. En realidad, creo que esto tiene sentido, ya que el ajuste del modelo y las predicciones se calculan en el nodo maestro (en una configuración no distribuida, debido a SKLearn).
Entonces mi pregunta es la siguiente: ¿sería posible entrenar el modelo en un solo nodo trabajador (en lugar del nodo maestro)? Si es así, ¿cómo podría lograr esto último y eso realmente resolvería mi problema?
Si no, ¿hay alguna otra sugerencia sobre cómo puedo hacer que dicha configuración funcione sin retrasar los cálculos en la ventana 1?
Cualquier ayuda es muy apreciada.
EDITAR: Creo que la pregunta más general sería: ¿cómo puedo ejecutar dos tareas diferentes en dos trabajadores diferentes en paralelo?
Creo que lo que está buscando es la propiedad: "spark.streaming.concurrentJobs", que por defecto es 1. Aumentar esto debería permitirle ejecutar múltiples funciones foreachRDD en paralelo.
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
Solo un recordatorio para que también tenga en cuenta la seguridad del hilo en su contenedor de modelo personalizado si va a mutar y leer en paralelo. :)