xticks xlabel python queue python-multithreading

xlabel - ¿Usando la cola multiproceso en python de la manera correcta?



plt.xlabel size (2)

Me gusta esta explicación de las ventajas y diferencias entre el uso de subprocesos y procesos - "... Pero hay un lado positivo: los procesos pueden avanzar en varios subprocesos de ejecución simultáneamente. Dado que un proceso principal no comparte el GIL con su procesos secundarios, todos los procesos pueden ejecutarse simultáneamente (sujeto a las restricciones del hardware y del sistema operativo) .... "

Tiene algunas explicaciones excelentes para moverse por GIL y cómo mejorar el rendimiento.

Leer más aquí:

http://jeffknupp.com/blog/2013/06/30/pythons-hardest-problem-revisited/

Estoy tratando de usar The Queue in python, que será multiproceso. Solo quería saber si el enfoque que estoy usando es correcto o no. Y si estoy haciendo algo redundante o si hay un mejor enfoque que debería usar.

Estoy tratando de obtener nuevas solicitudes de una tabla y programarlas usando alguna lógica para realizar alguna operación como ejecutar una consulta.

Así que aquí, desde el hilo principal, se genera un hilo separado para la cola.

if __name__==''__main__'': request_queue = SetQueue(maxsize=-1) worker = Thread(target=request_queue.process_queue) worker.setDaemon(True) worker.start() while True: try: #Connect to the database get all the new requests to be verified db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0) #Get new requests for verification verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE JOB_STATUS=''%s'' ORDER BY JOB_ID" % (username_testschema, ''INITIATED'')) #If there are some requests to be verified, put them in the queue. if len(verify_these) > 0: for row in verify_these: print "verifying : %s" % row[0] verify_id = row[0] request_queue.put(verify_id) except Exception as e: logger.exception(e) finally: time.sleep(10)

Ahora en la clase Setqueue tengo una función process_queue que se utiliza para procesar las 2 solicitudes principales en cada ejecución que se agregaron a la cola.

'''''' Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time, '''''' class SetQueue(Queue.Queue): def _init(self, maxsize): Queue.Queue._init(self, maxsize) self.all_items = set() def _put(self, item): if item not in self.all_items: Queue.Queue._put(self, item) self.all_items.add(item) '''''' The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec. This way max two requests per run will be processed. '''''' def process_queue(self): while True: scheduler_obj = Scheduler() try: if self.qsize() > 0: for i in range(2): job_id = self.get() t = Thread(target=scheduler_obj.verify_func, args=(job_id,)) t.start() for i in range(2): t.join(timeout=1) self.task_done() except Exception as e: logger.exception( "QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE") finally: time.sleep(10)

Quiero ver si mi comprensión es correcta y si puede haber algún problema con ella.

Por lo tanto, el hilo principal que se ejecuta mientras True en la función principal se conecta a la base de datos obtiene nuevas solicitudes y las pone en la cola. El subproceso de trabajo (daemon) para la cola sigue recibiendo nuevas solicitudes de la cola y los subprocesos no daemon de la bifurcación que realizan el procesamiento y, dado que el tiempo de espera para la unión es 1, el subproceso de trabajo continuará recibiendo nuevas solicitudes sin ser bloqueado, y El hilo hijo se seguirá procesando en el fondo. ¿Correcto?

Entonces, en caso de que el proceso principal salga, estos no se eliminarán hasta que terminen su trabajo, pero el hilo del demonio de trabajo se cerrará. Duda: Si el padre es un demonio y el niño no es un demonio y si el padre sale ¿el hijo sale?

También leí aquí: - David Beazley multiprocesamiento

Por David Beazley en el uso de un grupo como una sección de coprocesador de subprocesos donde intenta resolver un problema similar. Así que debo seguir sus pasos: 1. Crear un conjunto de procesos. 2. Abra un hilo como lo estoy haciendo para request_queue 3. En ese hilo

def process_verification_queue(self): while True: try: if self.qsize() > 0: job_id = self.get() pool.apply_async(Scheduler.verify_func, args=(job_id,)) except Exception as e: logger.exception("QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")

Utilice un proceso de la agrupación y ejecute el proceso Verify_func en paralelo. ¿Esto me dará más rendimiento?


Si bien es posible crear un nuevo subproceso independiente para la cola y procesar esos datos por separado de la forma en que lo está haciendo, creo que es más común que cada subproceso de trabajo independiente publique mensajes en una cola que ya "conocen". Luego esa cola se procesa desde algún otro hilo extrayendo mensajes de esa cola.

Idea de diseño

La forma en que invisiono tu aplicación sería tres hilos. El hilo principal, y dos hilos de trabajo. 1 subproceso de trabajo obtendría solicitudes de la base de datos y las pondría en la cola. El otro subproceso de trabajo procesaría los datos de la cola

El hilo principal solo esperaría a que los otros hilos terminen usando las funciones de hilo .join ()

Protegería la cola a la que tienen acceso los subprocesos y la hace segura mediante un mutex. También he visto este patrón en muchos otros diseños en otros idiomas.

Lectura sugerida

"Effective Python" de Brett Slatkin tiene un gran ejemplo de esta pregunta.

En lugar de heredar de la cola, simplemente crea un contenedor en su clase llamada MyQueue y agrega una función de obtener () y poner (mensaje).

Incluso proporciona el código fuente en su repositorio de Github.

https://github.com/bslatkin/effectivepython/blob/master/example_code/item_39.py

No estoy afiliado al libro ni a su autor, pero lo recomiendo mucho, ya que aprendí algunas cosas de él :)