tutorial started spark significado resilient lazy getting apache-spark rdd lazy-sequences

apache spark - started - Lazy foreach en un Spark RDD



rdd spark (2)

Realmente no. No hay método de find , como en las colecciones de Scala que inspiraron las API de Spark, que dejarían de buscar una vez que se encuentre un elemento que satisfaga un predicado. Probablemente su mejor opción sea utilizar una fuente de datos que minimice el escaneo excesivo, como Cassandra, donde el controlador empuja hacia abajo algunos parámetros de consulta. También puede ver el proyecto más experimental de Berkeley llamado BlinkDB.

En pocas palabras, Spark está diseñado más para escanear conjuntos de datos, como MapReduce antes, en lugar de las consultas tradicionales de tipo base de datos.

Tengo un gran RDD de Strings (obtenido a través de una unión de varios sc.textFile(...)) .

Ahora quiero buscar una cadena dada en ese RDD, y quiero que la búsqueda se detenga cuando se haya encontrado una coincidencia "lo suficientemente buena".

Podría retroalimentar foreach , o filter , o map para este propósito, pero todos estos iterarán a través de cada elemento en ese RDD, independientemente de si se ha alcanzado la coincidencia.

¿Hay alguna manera de cortocircuitar este proceso y evitar iterar a través de todo el RDD?


Podría adaptar Foreach, o filtrar, o mapear para este propósito, pero todos estos iterarán a través de cada elemento en ese RDD

En realidad, estás equivocado. Spark Engine es lo suficientemente inteligente como para optimizar los cálculos si limita los resultados (utilizando take o first ):

import numpy as np from __future__ import print_function np.random.seed(323) acc = sc.accumulator(0) def good_enough(x, threshold=7000): global acc acc += 1 return x > threshold rdd = sc.parallelize(np.random.randint(0, 10000) for i in xrange(1000000)) x = rdd.filter(good_enough).first()

Ahora veamos accum:

>>> print("Checked {0} items, found {1}".format(acc.value, x)) Checked 6 items, found 7109

y solo para estar seguro si todo funciona como se espera:

acc = sc.accumulator(0) rdd.filter(lambda x: good_enough(x, 100000)).take(1) assert acc.value == rdd.count()

Lo mismo podría hacerse, probablemente de una manera más eficiente usando marcos de datos y udf.

Nota : En algunos casos, incluso es posible usar una secuencia infinita en Spark y aún así obtener un resultado. Puede verificar mi respuesta a la función Spark FlatMap para listas enormes para un ejemplo.