sqlcontext spark print manipulating examples example dataframes createdataframe crear python apache-spark

python - print - rdd to dataframe pyspark



Crear un diccionario grande en pyspark (2)

Estoy tratando de resolver el siguiente problema usando pyspark. Tengo un archivo en hdfs en el formato que es un volcado de la tabla de búsqueda.

key1, value1 key2, value2 ...

Quiero cargar esto en el diccionario python en pyspark y usarlo para otro propósito. Así que traté de hacer:

table = {} def populateDict(line): (k,v) = line.split(",", 1) table[k] = v kvfile = sc.textFile("pathtofile") kvfile.foreach(populateDict)

Encontré que la variable de la tabla no está modificada. Entonces, ¿hay alguna manera de crear una gran tabla hash inmemory en chispa?


foreach es un cálculo distribuido por lo que no se puede esperar que modifique una estructura de datos solo visible en el controlador. Lo que quieres es

kv.map(line => { line.split(" ") match { case Array(k,v) => (k,v) case _ => ("","") }.collectAsMap()

Esto está en scala pero entiendes la idea, la función importante es collectAsMap() que devuelve un mapa al controlador.

Si sus datos son muy grandes, puede usar un PairRDD como mapa. Primer mapa a pares

kv.map(line => { line.split(" ") match { case Array(k,v) => (k,v) case _ => ("","") }

a continuación, puede acceder con rdd.lookup("key") que devuelve una secuencia de valores asociados con la clave, aunque esto definitivamente no será tan eficiente como otras tiendas KV distribuidas, ya que la chispa no está realmente construida para eso.


Para mayor eficiencia, consulte: sortByKey () y lookup ()

búsqueda (clave):

Devuelva la lista de valores en el RDD para la clave clave. Esta operación se realiza de manera eficiente si el RDD tiene un particionador conocido buscando solo la partición a la que se asigna la clave.

El RDD se volverá a particionar mediante sortByKey () ( ver: OrderedRDD ) y se buscará de manera eficiente durante las llamadas de lookup() . En código, algo así como,

kvfile = sc.textFile("pathtofile") sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey() sorted_kv.lookup(''key1'').take(10)

hará el truco tanto como un RDD y de manera eficiente.