recorrer - que es un diccionario en python
Multiproceso de Python: ¿cómo comparto un dict entre múltiples procesos? (4)
Un programa que crea varios procesos que funcionan en una cola que puede unirse, Q
, y puede eventualmente manipular un diccionario global D
para almacenar resultados. (de modo que cada proceso secundario puede usar D
para almacenar su resultado y también ver qué resultados están produciendo los otros procesos secundarios)
Si imprimo el diccionario D en un proceso hijo, veo las modificaciones que se han realizado en él (es decir, en D). Pero después de que el proceso principal se une a Q, si imprimo D, ¡es un dict vacío!
Entiendo que es un problema de sincronización / bloqueo. ¿Puede alguien decirme qué está pasando aquí y cómo puedo sincronizar el acceso a D?
Me gustaría compartir mi propio trabajo que es más rápido que el dict Manager y es más simple y más estable que la biblioteca pyshmht que usa toneladas de memoria y no funciona para Mac OS. Aunque mi dict solo funciona para cadenas simples y es inmutable actualmente. Utilizo la implementación de sondeo lineal y almacene las claves y pares de valores en un bloque de memoria separado después de la tabla.
from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable
class shared_immutable_dict:
def __init__(self, a):
self.hs = 1 << (len(a) * 3).bit_length()
kvp = self.hs * 4
ht = [0xffffffff] * self.hs
kvl = []
for k, v in a.iteritems():
h = self.hash(k)
while ht[h] != 0xffffffff:
h = (h + 1) & (self.hs - 1)
ht[h] = kvp
kvp += self.kvlen(k) + self.kvlen(v)
kvl.append(k)
kvl.append(v)
self.m = mmap(-1, kvp)
for p in ht:
self.m.write(uint_format.pack(p))
for x in kvl:
if len(x) <= 0x7f:
self.m.write_byte(chr(len(x)))
else:
self.m.write(uint_format.pack(0x80000000 + len(x)))
self.m.write(x)
def hash(self, k):
h = hash(k)
h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
return h
def get(self, k, d=None):
h = self.hash(k)
while True:
x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
if x == 0xffffffff:
return d
self.m.seek(x)
if k == self.read_kv():
return self.read_kv()
h = (h + 1) & (self.hs - 1)
def read_kv(self):
sz = ord(self.m.read_byte())
if sz & 0x80:
sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
return self.m.read(sz)
def kvlen(self, k):
return len(k) + (1 if len(k) <= 0x7f else 4)
def __contains__(self, k):
return self.get(k, None) is not None
def close(self):
self.m.close()
uint_format = struct.Struct(''>I'')
def uget(a, k, d=None):
return to_unicode(a.get(to_str(k), d))
def uin(a, k):
return to_str(k) in a
def to_unicode(s):
return s.decode(''utf-8'') if isinstance(s, str) else s
def to_str(s):
return s.encode(''utf-8'') if isinstance(s, unicode) else s
def mmap_test():
n = 1000000
d = shared_immutable_dict({str(i * 2): ''1'' for i in xrange(n)})
start_time = default_timer()
for i in xrange(n):
if bool(d.get(str(i))) != (i % 2 == 0):
raise Exception(i)
print ''mmap speed: %d gets per sec'' % (n / (default_timer() - start_time))
def manager_test():
n = 100000
d = Manager().dict({str(i * 2): ''1'' for i in xrange(n)})
start_time = default_timer()
for i in xrange(n):
if bool(d.get(str(i))) != (i % 2 == 0):
raise Exception(i)
print ''manager speed: %d gets per sec'' % (n / (default_timer() - start_time))
def shm_test():
n = 1000000
d = HashTable(''tmp'', n)
d.update({str(i * 2): ''1'' for i in xrange(n)})
start_time = default_timer()
for i in xrange(n):
if bool(d.get(str(i))) != (i % 2 == 0):
raise Exception(i)
print ''shm speed: %d gets per sec'' % (n / (default_timer() - start_time))
if __name__ == ''__main__'':
mmap_test()
manager_test()
shm_test()
En mi computadora portátil los resultados de rendimiento son:
mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec
ejemplo de uso simple:
ht = shared_immutable_dict({''a'': ''1'', ''b'': ''2''})
print ht.get(''a'')
Tal vez puedas probar pyshmht , compartiendo la extensión de tabla hash basada en la memoria para Python.
darse cuenta
No está completamente probado, solo para su referencia.
Actualmente carece de mecanismos de bloqueo / sem para multiprocesamiento.
Una respuesta general implica el uso de un objeto Manager
. Adaptado de los documentos:
from multiprocessing import Process, Manager
def f(d):
d[1] += ''1''
d[''2''] += 2
if __name__ == ''__main__'':
manager = Manager()
d = manager.dict()
d[1] = ''1''
d[''2''] = 2
p1 = Process(target=f, args=(d,))
p2 = Process(target=f, args=(d,))
p1.start()
p2.start()
p1.join()
p2.join()
print d
Salida:
$ python mul.py
{1: ''111'', ''2'': 6}
multiprocesamiento no es como enhebrar. Cada proceso secundario obtendrá una copia de la memoria del proceso principal. En general, el estado se comparte a través de la comunicación (tuberías / enchufes), señales o memoria compartida.
El multiprocesamiento hace que algunas abstracciones estén disponibles para su caso de uso: estado compartido que se trata como local mediante el uso de proxies o memoria compartida: Manager
Secciones relevantes: