python-2.7 apache-spark pyspark apache-spark-sql python-multiprocessing

python 2.7 - ¿Cómo ejecutar transformaciones independientes en paralelo usando PySpark?



python-2.7 apache-spark (1)

Solo use hilos y asegúrese de que el clúster tenga suficientes recursos para procesar ambas tareas al mismo tiempo.

from threading import Thread import time def process(rdd, f): def delay(x): time.sleep(1) return f(x) return rdd.map(delay).sum() rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2)) t1 = Thread(target=process, args=(rdd, lambda x: x * 2)) t2 = Thread(target=process, args=(rdd, lambda x: x + 1)) t1.start(); t2.start()

Podría decirse que esto no es tan útil en la práctica, pero de lo contrario debería funcionar bien.

Puede utilizar aún más la programación en la aplicación con el planificador FAIR y los grupos de planificadores para un mejor control sobre la estrategia de ejecución.

También puede probar pyspark-asyncactions (descargo de responsabilidad: el autor de esta respuesta también es el autor del paquete) que proporciona un conjunto de envoltorios alrededor de Spark API y concurrent.futures :

import asyncactions import concurrent.futures f1 = rdd.filter(lambda x: x % 3 == 0).countAsync() f2 = rdd.filter(lambda x: x % 11 == 0).countAsync() [x.result() for x in concurrent.futures.as_completed([f1, f2])]

Estoy tratando de ejecutar 2 funciones haciendo transformaciones completamente independientes en un único RDD en paralelo usando PySpark. ¿Cuáles son algunos métodos para hacer lo mismo?

def doXTransforms(sampleRDD): (X transforms) def doYTransforms(sampleRDD): (Y Transforms) if __name__ == "__main__": sc = SparkContext(appName="parallelTransforms") sqlContext = SQLContext(sc) hive_context = HiveContext(sc) rows_rdd = hive_context.sql("select * from tables.X_table") p1 = Process(target=doXTransforms , args=(rows_rdd,)) p1.start() p2 = Process(target=doYTransforms, args=(rows_rdd,)) p2.start() p1.join() p2.join() sc.stop()

Esto no funciona y ahora entiendo que esto no funcionará. Pero, ¿hay alguna forma alternativa de hacer que esto funcione? Específicamente, ¿hay alguna solución específica para python-spark?