python loops apache-spark iteration pyspark

python - El tiempo de iteración de chispa aumenta exponencialmente cuando se usa la unión



loops apache-spark (3)

Soy bastante nuevo en Spark e intento implementar algún algoritmo iterativo para la agrupación (expectativa-maximización) con centroide representado por el modelo de Markov. Entonces necesito hacer iteraciones y uniones.

Un problema que experimento es que cada iteración mide el tiempo exponencialmente.
Después de experimentar un poco, descubrí que cuando se hacen iteraciones, es necesario persistir el RDD que se reutilizará en la próxima iteración; de lo contrario, cada chispa de iteración creará un plan de ejecución que recalculará el RDD desde el principio, aumentando así el tiempo de cálculo.

init = sc.parallelize(xrange(10000000), 3) init.cache() for i in range(6): print i start = datetime.datetime.now() init2 = init.map(lambda n: (n, n*3)) init = init2.map(lambda n: n[0]) # init.cache() print init.count() print str(datetime.datetime.now() - start)

Resultados en:

0 10000000 0:00:04.283652 1 10000000 0:00:05.998830 2 10000000 0:00:08.771984 3 10000000 0:00:11.399581 4 10000000 0:00:14.206069 5 10000000 0:00:16.856993

Por lo tanto, agregar caché () ayuda y el tiempo de iteración se vuelve constante.

init = sc.parallelize(xrange(10000000), 3) init.cache() for i in range(6): print i start = datetime.datetime.now() init2 = init.map(lambda n: (n, n*3)) init = init2.map(lambda n: n[0]) init.cache() print init.count() print str(datetime.datetime.now() - start) 0 10000000 0:00:04.966835 1 10000000 0:00:04.609885 2 10000000 0:00:04.324358 3 10000000 0:00:04.248709 4 10000000 0:00:04.218724 5 10000000 0:00:04.223368

Pero al hacer Join dentro de la iteración, el problema vuelve. Aquí hay un código simple que demuestro el problema. Incluso hacer caché en cada transformación RDD no resuelve el problema:

init = sc.parallelize(xrange(10000), 3) init.cache() for i in range(6): print i start = datetime.datetime.now() init2 = init.map(lambda n: (n, n*3)) init2.cache() init3 = init.map(lambda n: (n, n*2)) init3.cache() init4 = init2.join(init3) init4.count() init4.cache() init = init4.map(lambda n: n[0]) init.cache() print init.count() print str(datetime.datetime.now() - start)

Y aquí está la salida. Como puede ver, el tiempo de iteración crece exponencialmente :(

0 10000 0:00:00.674115 1 10000 0:00:00.833377 2 10000 0:00:01.525314 3 10000 0:00:04.194715 4 10000 0:00:08.139040 5 10000 0:00:17.852815

Realmente agradeceré cualquier ayuda :)


El problema es (como zero323 señaló en su respuesta completa) que llamar a join sin especificar el número de particiones puede (sí) resultar en un número creciente de particiones. El número de particiones puede crecer (aparentemente) sin límite. Hay (al menos) dos formas de evitar que el número de particiones crezca (sin límite) cuando se llama a unir repetidamente.

Método 1:

Como zero323 señaló, puede especificar el número de particiones manualmente cuando llama a join. Por ejemplo

rdd1.join(rdd2, numPartitions)

Esto asegurará que el número de Particiones no exceda numPartitions y, en particular, el número de particiones no crecerá continuamente.

Método 2:

Cuando crea su SparkConf, puede especificar el nivel predeterminado de paralelismo. Si se establece este valor, cuando llame a funciones como join sin especificar numPartitions, se utilizará el paralelismo predeterminado, que limitará efectivamente el número de particiones y evitará que crezcan. Puede configurar este parámetro como

conf=SparkConf.set("spark.default.parallelism", numPartitions) sc = SparkContex(conf=conf)


Los Rdds son inmutables. Intenta hacer rdd = rdd.cache()


Resumen :

En general, los algoritmos iterativos, especialmente aquellos con autounión o autounión, requieren un control sobre:

  • Longitud del linaje (véase, por ejemplo, debido a un largo linaje RDD y unionAll dando como resultado ).
  • Número de particiones.

El problema descrito aquí es el resultado de la falta del primero. En cada iteración, el número de particiones aumenta con la autounión que conduce a un patrón exponencial. Para abordar eso, debe controlar el número de particiones en cada iteración (ver más abajo) o utilizar herramientas globales como spark.default.parallelism (ver una respuesta proporcionada por Travis ). En general, el primer enfoque proporciona mucho más control en general y no afecta a otras partes del código.

Respuesta original :

Por lo que puedo decir, hay dos problemas intercalados aquí: un número creciente de particiones y arrastrando los pies durante las uniones. Ambos se pueden manejar fácilmente, así que vamos paso a paso.

Primero, creemos un ayudante para recopilar las estadísticas:

import datetime def get_stats(i, init, init2, init3, init4, start, end, desc, cache, part, hashp): return { "i": i, "init": init.getNumPartitions(), "init1": init2.getNumPartitions(), "init2": init3.getNumPartitions(), "init4": init4.getNumPartitions(), "time": str(end - start), "timen": (end - start).seconds + (end - start).microseconds * 10 **-6, "desc": desc, "cache": cache, "part": part, "hashp": hashp }

otro ayudante para manejar el almacenamiento en caché / particionamiento

def procRDD(rdd, cache=True, part=False, hashp=False, npart=16): rdd = rdd if not part else rdd.repartition(npart) rdd = rdd if not hashp else rdd.partitionBy(npart) return rdd if not cache else rdd.cache()

extraer la lógica de la tubería:

def run(init, description, cache=True, part=False, hashp=False, npart=16, n=6): times = [] for i in range(n): start = datetime.datetime.now() init2 = procRDD( init.map(lambda n: (n, n*3)), cache, part, hashp, npart) init3 = procRDD( init.map(lambda n: (n, n*2)), cache, part, hashp, npart) # If part set to True limit number of the output partitions init4 = init2.join(init3, npart) if part else init2.join(init3) init = init4.map(lambda n: n[0]) if cache: init4.cache() init.cache() init.count() # Force computations to get time end = datetime.datetime.now() times.append(get_stats( i, init, init2, init3, init4, start, end, description, cache, part, hashp )) return times

y crear datos iniciales:

ncores = 8 init = sc.parallelize(xrange(10000), ncores * 2).cache()

Operación de numPartitions sola, si no se proporciona el argumento numPartitions , ajuste el número de particiones en la salida en función del número de particiones de los RDD de entrada. Significa un número creciente de particiones con cada iteración. Si el número de particiones es demasiado grande, las cosas se ponen feas. Puede lidiar con estos proporcionando el argumento numPartitions para unir o repartir los RDD con cada iteración.

timesCachePart = sqlContext.createDataFrame( run(init, "cache + partition", True, True, False, ncores * 2)) timesCachePart.select("i", "init1", "init2", "init4", "time", "desc").show() +-+-----+-----+-----+--------------+-----------------+ |i|init1|init2|init4| time| desc| +-+-----+-----+-----+--------------+-----------------+ |0| 16| 16| 16|0:00:01.145625|cache + partition| |1| 16| 16| 16|0:00:01.090468|cache + partition| |2| 16| 16| 16|0:00:01.059316|cache + partition| |3| 16| 16| 16|0:00:01.029544|cache + partition| |4| 16| 16| 16|0:00:01.033493|cache + partition| |5| 16| 16| 16|0:00:01.007598|cache + partition| +-+-----+-----+-----+--------------+-----------------+

Como puede ver cuando reparticionamos el tiempo de ejecución es más o menos constante. El segundo problema es que los datos anteriores se dividen aleatoriamente. Para garantizar el rendimiento de la unión, nos gustaría tener las mismas claves en una sola partición. Para lograr eso, podemos usar el divisor hash:

timesCacheHashPart = sqlContext.createDataFrame( run(init, "cache + hashpart", True, True, True, ncores * 2)) timesCacheHashPart.select("i", "init1", "init2", "init4", "time", "desc").show() +-+-----+-----+-----+--------------+----------------+ |i|init1|init2|init4| time| desc| +-+-----+-----+-----+--------------+----------------+ |0| 16| 16| 16|0:00:00.946379|cache + hashpart| |1| 16| 16| 16|0:00:00.966519|cache + hashpart| |2| 16| 16| 16|0:00:00.945501|cache + hashpart| |3| 16| 16| 16|0:00:00.986777|cache + hashpart| |4| 16| 16| 16|0:00:00.960989|cache + hashpart| |5| 16| 16| 16|0:00:01.026648|cache + hashpart| +-+-----+-----+-----+--------------+----------------+

El tiempo de ejecución es constante como antes y hay una pequeña mejora con respecto al particionamiento básico.

Ahora usemos el caché solo como referencia:

timesCacheOnly = sqlContext.createDataFrame( run(init, "cache-only", True, False, False, ncores * 2)) timesCacheOnly.select("i", "init1", "init2", "init4", "time", "desc").show() +-+-----+-----+-----+--------------+----------+ |i|init1|init2|init4| time| desc| +-+-----+-----+-----+--------------+----------+ |0| 16| 16| 32|0:00:00.992865|cache-only| |1| 32| 32| 64|0:00:01.766940|cache-only| |2| 64| 64| 128|0:00:03.675924|cache-only| |3| 128| 128| 256|0:00:06.477492|cache-only| |4| 256| 256| 512|0:00:11.929242|cache-only| |5| 512| 512| 1024|0:00:23.284508|cache-only| +-+-----+-----+-----+--------------+----------+

Como puede ver, el número de particiones (init2, init3, init4) para la versión solo de caché se duplica con cada iteración y el tiempo de ejecución es proporcional al número de particiones.

Finalmente, podemos verificar si podemos mejorar el rendimiento con una gran cantidad de particiones si usamos el particionador hash:

timesCacheHashPart512 = sqlContext.createDataFrame( run(init, "cache + hashpart 512", True, True, True, 512)) timesCacheHashPart512.select( "i", "init1", "init2", "init4", "time", "desc").show() +-+-----+-----+-----+--------------+--------------------+ |i|init1|init2|init4| time| desc| +-+-----+-----+-----+--------------+--------------------+ |0| 512| 512| 512|0:00:14.492690|cache + hashpart 512| |1| 512| 512| 512|0:00:20.215408|cache + hashpart 512| |2| 512| 512| 512|0:00:20.408070|cache + hashpart 512| |3| 512| 512| 512|0:00:20.390267|cache + hashpart 512| |4| 512| 512| 512|0:00:20.362354|cache + hashpart 512| |5| 512| 512| 512|0:00:19.878525|cache + hashpart 512| +-+-----+-----+-----+--------------+--------------------+

La mejora no es tan impresionante, pero si tiene un clúster pequeño y una gran cantidad de datos, vale la pena intentarlo.

Supongo que quitar mensaje aquí es particiones. Hay contextos donde se maneja por usted ( mllib , sql ) pero si utiliza operaciones de bajo nivel es su responsabilidad.