valor recorrer que elementos ejemplo diccionario dentro buscar agregar python multiprocessing

recorrer - que es un diccionario en python



Multiproceso de Python: ¿cómo comparto un dict entre múltiples procesos? (4)

Un programa que crea varios procesos que funcionan en una cola que puede unirse, Q , y puede eventualmente manipular un diccionario global D para almacenar resultados. (de modo que cada proceso secundario puede usar D para almacenar su resultado y también ver qué resultados están produciendo los otros procesos secundarios)

Si imprimo el diccionario D en un proceso hijo, veo las modificaciones que se han realizado en él (es decir, en D). Pero después de que el proceso principal se une a Q, si imprimo D, ¡es un dict vacío!

Entiendo que es un problema de sincronización / bloqueo. ¿Puede alguien decirme qué está pasando aquí y cómo puedo sincronizar el acceso a D?


Me gustaría compartir mi propio trabajo que es más rápido que el dict Manager y es más simple y más estable que la biblioteca pyshmht que usa toneladas de memoria y no funciona para Mac OS. Aunque mi dict solo funciona para cadenas simples y es inmutable actualmente. Utilizo la implementación de sondeo lineal y almacene las claves y pares de valores en un bloque de memoria separado después de la tabla.

from mmap import mmap import struct from timeit import default_timer from multiprocessing import Manager from pyshmht import HashTable class shared_immutable_dict: def __init__(self, a): self.hs = 1 << (len(a) * 3).bit_length() kvp = self.hs * 4 ht = [0xffffffff] * self.hs kvl = [] for k, v in a.iteritems(): h = self.hash(k) while ht[h] != 0xffffffff: h = (h + 1) & (self.hs - 1) ht[h] = kvp kvp += self.kvlen(k) + self.kvlen(v) kvl.append(k) kvl.append(v) self.m = mmap(-1, kvp) for p in ht: self.m.write(uint_format.pack(p)) for x in kvl: if len(x) <= 0x7f: self.m.write_byte(chr(len(x))) else: self.m.write(uint_format.pack(0x80000000 + len(x))) self.m.write(x) def hash(self, k): h = hash(k) h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1) return h def get(self, k, d=None): h = self.hash(k) while True: x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0] if x == 0xffffffff: return d self.m.seek(x) if k == self.read_kv(): return self.read_kv() h = (h + 1) & (self.hs - 1) def read_kv(self): sz = ord(self.m.read_byte()) if sz & 0x80: sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000 return self.m.read(sz) def kvlen(self, k): return len(k) + (1 if len(k) <= 0x7f else 4) def __contains__(self, k): return self.get(k, None) is not None def close(self): self.m.close() uint_format = struct.Struct(''>I'') def uget(a, k, d=None): return to_unicode(a.get(to_str(k), d)) def uin(a, k): return to_str(k) in a def to_unicode(s): return s.decode(''utf-8'') if isinstance(s, str) else s def to_str(s): return s.encode(''utf-8'') if isinstance(s, unicode) else s def mmap_test(): n = 1000000 d = shared_immutable_dict({str(i * 2): ''1'' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print ''mmap speed: %d gets per sec'' % (n / (default_timer() - start_time)) def manager_test(): n = 100000 d = Manager().dict({str(i * 2): ''1'' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print ''manager speed: %d gets per sec'' % (n / (default_timer() - start_time)) def shm_test(): n = 1000000 d = HashTable(''tmp'', n) d.update({str(i * 2): ''1'' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print ''shm speed: %d gets per sec'' % (n / (default_timer() - start_time)) if __name__ == ''__main__'': mmap_test() manager_test() shm_test()

En mi computadora portátil los resultados de rendimiento son:

mmap speed: 247288 gets per sec manager speed: 33792 gets per sec shm speed: 691332 gets per sec

ejemplo de uso simple:

ht = shared_immutable_dict({''a'': ''1'', ''b'': ''2''}) print ht.get(''a'')


Tal vez puedas probar pyshmht , compartiendo la extensión de tabla hash basada en la memoria para Python.

darse cuenta

  1. No está completamente probado, solo para su referencia.

  2. Actualmente carece de mecanismos de bloqueo / sem para multiprocesamiento.


Una respuesta general implica el uso de un objeto Manager . Adaptado de los documentos:

from multiprocessing import Process, Manager def f(d): d[1] += ''1'' d[''2''] += 2 if __name__ == ''__main__'': manager = Manager() d = manager.dict() d[1] = ''1'' d[''2''] = 2 p1 = Process(target=f, args=(d,)) p2 = Process(target=f, args=(d,)) p1.start() p2.start() p1.join() p2.join() print d

Salida:

$ python mul.py {1: ''111'', ''2'': 6}


multiprocesamiento no es como enhebrar. Cada proceso secundario obtendrá una copia de la memoria del proceso principal. En general, el estado se comparte a través de la comunicación (tuberías / enchufes), señales o memoria compartida.

El multiprocesamiento hace que algunas abstracciones estén disponibles para su caso de uso: estado compartido que se trata como local mediante el uso de proxies o memoria compartida: Manager

Secciones relevantes: