while true terminar for ejercicios ejemplos como ciclo bucle python windows multithreading multiprocessing

true - Python usa el bucle while para mantener actualizados los scripts de trabajo y multiprocesar las tareas en cola



while en python 3 (3)

Estoy tratando de escribir una secuencia de comandos de Python que analiza una carpeta y recopilar secuencias de comandos SQL actualizadas, y luego extraer datos automáticamente para la secuencia de comandos SQL. En el código, un bucle while escanea un nuevo archivo SQL y lo envía a la función de extracción de datos. Tengo problemas para entender cómo hacer una cola dinámica con el bucle while, pero también tengo multiproceso para ejecutar las tareas en la cola.

El siguiente código tiene el problema de que la iteración del bucle while funcionará en un trabajo largo antes de pasar a la siguiente iteración y recopilará otros trabajos para llenar el procesador vacío.

Actualizar:

  1. Gracias a @pbacterio por detectar el error, y ahora el mensaje de error desaparece. Después de cambiar el código, el código de Python puede tomar todos los scripts de trabajo durante una iteración y distribuir los scripts a cuatro procesadores. Sin embargo, se quedará bloqueado por un trabajo largo para pasar a la siguiente iteración, escaneando y enviando los scripts de trabajo recién agregados. ¿Alguna idea de cómo reconstruir el código?

  2. Finalmente encontré la solución, vea la respuesta a continuación. Resultó que lo que buscaba es

    the_queue = Queue ()
    the_pool = Pool (4, worker_main, (the_queue,))

  3. Para aquellos que se topan con la idea similar, a continuación se muestra la arquitectura completa de este script de automatización que convierte una unidad compartida en un ''servidor para la extracción de SQL'' o cualquier otro ''servidor'' en la cola de trabajos.

    a. El script de python auto_data_pull.py como se muestra en la respuesta. Necesitas agregar tu propia función de trabajo.

    segundo. Un ''script por lotes'' con lo siguiente:

    iniciar C: / Anaconda2 / python.exe C: / Users / bin / auto_data_pull.py

    do. Agregue una tarea activada por la computadora de inicio, ejecute el ''script por lotes'' Eso es todo. Funciona.

Código Python:

from glob import glob import os, time import sys import CSV import re import subprocess import pandas as PD import pypyodbc from multiprocessing import Process, Queue, current_process, freeze_support # # Function run by worker processes # def worker(input, output): for func, args in iter(input.get, ''STOP''): result = compute(func, args) output.put(result) # # Function used to compute result # def compute(func, args): result = func(args) return ''%s says that %s%s = %s'' % / (current_process().name, func.__name__, args, result) def query_sql(sql_file): #test func #jsl file processing and SQL querying, data table will be saved to csv. fo_name = os.path.splitext(sql_file)[0] + ''.csv'' fo = open(fo_name, ''w'') print sql_file fo.write("sql_file {0} is done/n".format(sql_file)) return "Query is done for /n".format(sql_file) def check_files(path): """ arguments -- root path to monitor returns -- dictionary of {file: timestamp, ...} """ sql_query_dirs = glob(path + "/*/IDABox/") files_dict = {} for sql_query_dir in sql_query_dirs: for root, dirs, filenames in os.walk(sql_query_dir): [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for filename in filenames if filename.endswith(''.jsl'')] return files_dict ##### working in single thread def single_thread(): path = "Y:/" before = check_files(path) sql_queue = [] while True: time.sleep(3) after = check_files(path) added = [f for f in after if not f in before] deleted = [f for f in before if not f in after] overlapped = list(set(list(after)) & set(list(before))) updated = [f for f in overlapped if before[f] < after[f]] before = after sql_queue = added + updated # print sql_queue for sql_file in sql_queue: try: query_sql(sql_file) except: pass ##### not working in queue def multiple_thread(): NUMBER_OF_PROCESSES = 4 path = "Y:/" sql_queue = [] before = check_files(path) # get the current dictionary of sql_files task_queue = Queue() done_queue = Queue() while True: #while loop to check the changes of the files time.sleep(5) after = check_files(path) added = [f for f in after if not f in before] deleted = [f for f in before if not f in after] overlapped = list(set(list(after)) & set(list(before))) updated = [f for f in overlapped if before[f] < after[f]] before = after sql_queue = added + updated TASKS = [(query_sql, sql_file) for sql_file in sql_queue] # Create queues #submit task for task in TASKS: task_queue.put(task) for i in range(NUMBER_OF_PROCESSES): p = Process(target=worker, args=(task_queue, done_queue)).start() # try: # p = Process(target=worker, args=(task_queue)) # p.start() # except: # pass # Get and print results print ''Unordered results:'' for i in range(len(TASKS)): print ''/t'', done_queue.get() # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put(''STOP'') # single_thread() if __name__ == ''__main__'': # freeze_support() multiple_thread()

Referencia:

  1. monitorea los cambios en el archivo con el script de Python: http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
  2. Multiprocesamiento:
    https://docs.python.org/2/library/multiprocessing.html

¿Dónde sql_file en multiple_thread() en

multiprocessing.Process(target=query_sql, args=(sql_file)).start()

No ha definido sql_file en el método y además ha usado esa variable en un bucle for. El alcance de la variable solo se limita al bucle for.


He resuelto esto Gracias por la respuesta inspiró el pensamiento. Ahora el script puede ejecutar un bucle while para monitorear la carpeta en busca de un nuevo script SQL actualizado / agregado, y luego distribuir la extracción de datos a múltiples hilos. La solución proviene de queue.get () y queue.put (). Asumo que el objeto de cola se encarga de la comunicación por sí mismo.

Este es el código final -

from glob import glob import os, time import sys import pypyodbc from multiprocessing import Process, Queue, Event, Pool, current_process, freeze_support def query_sql(sql_file): #test func #jsl file processing and SQL querying, data table will be saved to csv. fo_name = os.path.splitext(sql_file)[0] + ''.csv'' fo = open(fo_name, ''w'') print sql_file fo.write("sql_file {0} is done/n".format(sql_file)) return "Query is done for /n".format(sql_file) def check_files(path): """ arguments -- root path to monitor returns -- dictionary of {file: timestamp, ...} """ sql_query_dirs = glob(path + "/*/IDABox/") files_dict = {} try: for sql_query_dir in sql_query_dirs: for root, dirs, filenames in os.walk(sql_query_dir): [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for filename in filenames if filename.endswith(''.jsl'')] except: pass return files_dict def worker_main(queue): print os.getpid(),"working" while True: item = queue.get(True) query_sql(item) def main(): the_queue = Queue() the_pool = Pool(4, worker_main,(the_queue,)) path = "Y:/" before = check_files(path) # get the current dictionary of sql_files while True: #while loop to check the changes of the files time.sleep(5) sql_queue = [] after = check_files(path) added = [f for f in after if not f in before] deleted = [f for f in before if not f in after] overlapped = list(set(list(after)) & set(list(before))) updated = [f for f in overlapped if before[f] < after[f]] before = after sql_queue = added + updated if sql_queue: for jsl_file in sql_queue: try: the_queue.put(jsl_file) except: print "{0} failed with error {1}. /n".format(jsl_file, str(sys.exc_info()[0])) pass else: pass if __name__ == "__main__": main()


Intenta reemplazar esto:

result = func(*args)

por esto:

result = func(args)