python 2.7 - ¿Cómo ejecutar transformaciones independientes en paralelo usando PySpark?
python-2.7 apache-spark (1)
Solo use hilos y asegúrese de que el clúster tenga suficientes recursos para procesar ambas tareas al mismo tiempo.
from threading import Thread
import time
def process(rdd, f):
def delay(x):
time.sleep(1)
return f(x)
return rdd.map(delay).sum()
rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))
t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2 = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()
Podría decirse que esto no es tan útil en la práctica, pero de lo contrario debería funcionar bien.
Puede utilizar aún más la programación
en la aplicación
con el
planificador
FAIR
y los grupos de planificadores para un mejor control sobre la estrategia de ejecución.
También puede probar
pyspark-asyncactions
(descargo de responsabilidad: el autor de esta respuesta también es el autor del paquete) que proporciona un conjunto de envoltorios alrededor de Spark API y
concurrent.futures
:
import asyncactions
import concurrent.futures
f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()
[x.result() for x in concurrent.futures.as_completed([f1, f2])]
Estoy tratando de ejecutar 2 funciones haciendo transformaciones completamente independientes en un único RDD en paralelo usando PySpark. ¿Cuáles son algunos métodos para hacer lo mismo?
def doXTransforms(sampleRDD):
(X transforms)
def doYTransforms(sampleRDD):
(Y Transforms)
if __name__ == "__main__":
sc = SparkContext(appName="parallelTransforms")
sqlContext = SQLContext(sc)
hive_context = HiveContext(sc)
rows_rdd = hive_context.sql("select * from tables.X_table")
p1 = Process(target=doXTransforms , args=(rows_rdd,))
p1.start()
p2 = Process(target=doYTransforms, args=(rows_rdd,))
p2.start()
p1.join()
p2.join()
sc.stop()
Esto no funciona y ahora entiendo que esto no funcionará. Pero, ¿hay alguna forma alternativa de hacer que esto funcione? Específicamente, ¿hay alguna solución específica para python-spark?