python - txt - ¿Cómo veo un archivo en busca de cambios?
ejercicios de archivos en python (20)
Tengo un archivo de registro que está siendo escrito por otro proceso que quiero observar para detectar cambios. Cada vez que se produce un cambio, me gustaría leer los nuevos datos para procesarlos.
¿Cuál es la mejor manera de hacer esto? Esperaba que hubiera algún tipo de gancho de la biblioteca PyWin32. He encontrado la función win32file.FindNextChangeNotification
pero no tengo idea de cómo pedirle que vea un archivo específico.
Si alguien ha hecho algo como esto, estaría realmente agradecido de escuchar cómo ...
[Editar] Debería haber mencionado que buscaba una solución que no requiere sondeo.
[Editar] Maldiciones! Parece que esto no funciona en una unidad de red asignada. Supongo que Windows no "escucha" ninguna actualización del archivo como lo hace en un disco local.
¿Intentaste usar Watchdog ?
Utilidades de shell y biblioteca de API de Python para monitorear eventos del sistema de archivos.
El monitoreo de directorios facilitado con
- Una API multiplataforma.
- Una herramienta de shell para ejecutar comandos en respuesta a cambios de directorio.
Comience rápidamente con un simple ejemplo en Inicio Quickstart ...
¿Ya ha consultado la documentación disponible en http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html ? Si solo necesita que funcione con Windows, el segundo ejemplo parece ser exactamente lo que desea (si intercambia la ruta del directorio con la del archivo que desea ver).
De lo contrario, el sondeo probablemente sea la única opción realmente independiente de la plataforma.
Nota: No he probado ninguna de estas soluciones.
Aquí hay un ejemplo orientado a ver archivos de entrada que escriben no más de una línea por segundo, pero generalmente mucho menos. El objetivo es agregar la última línea (la escritura más reciente) al archivo de salida especificado. He copiado esto de uno de mis proyectos y acabo de borrar todas las líneas irrelevantes. Tendrás que rellenar o cambiar los símbolos que faltan.
from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow # Qt Creator gen''d
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None):
QMainWindow.__init__(self, parent)
Ui_MainWindow.__init__(self)
self._fileWatcher = QFileSystemWatcher()
self._fileWatcher.fileChanged.connect(self.fileChanged)
def fileChanged(self, filepath):
QThread.msleep(300) # Reqd on some machines, give chance for write to complete
# ^^ About to test this, may need more sophisticated solution
with open(filepath) as file:
lastLine = list(file)[-1]
destPath = self._filemap[filepath][''dest file'']
with open(destPath, ''a'') as out_file: # a= append
out_file.writelines([lastLine])
Por supuesto, la clase QMainWindow que abarca no es estrictamente necesaria, es decir. puede utilizar QFileSystemWatcher solo.
Aquí hay una versión simplificada del código de Kender que parece hacer el mismo truco y no importa el archivo completo:
# Check file for new data.
import time
f = open(r''c:/temp/test.txt'', ''r'')
while True:
line = f.readline()
if not line:
time.sleep(1)
print ''Nothing New''
else:
print ''Call Function: '', line
Bueno, después de un poco de pirateo del guión de Tim Golden, tengo lo siguiente que parece funcionar bastante bien:
import os
import win32file
import win32con
path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt
def ProcessNewData( newData ):
print "Text added: %s"%newData
# Set up the bits we''ll need for output
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
path_to_watch,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
# Open the file we''re interested in
a = open(file_to_watch, "r")
# Throw away any exising log data
a.read()
# Wait for new data and call ProcessNewData for each new chunk that''s written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)
# For each change, check to see if it''s updating the file we''re interested in
for action, file in results:
full_filename = os.path.join (path_to_watch, file)
#print file, ACTIONS.get (action, "Unknown")
if file == file_to_watch:
newText = a.read()
if newText != "":
ProcessNewData( newText )
Probablemente podría hacerlo con una carga más de comprobación de errores, pero simplemente viendo un archivo de registro y procesándolo antes de escupirlo a la pantalla, esto funciona bien.
Gracias a todos por su aporte, ¡cosas geniales!
Bueno, ya que estás usando Python, puedes abrir un archivo y seguir leyendo las líneas.
f = open(''file.log'')
Si la línea leída no está vacía , la procesa.
line = f.readline()
if line:
// Do what you want with the line
Puede que falte en que está bien seguir llamando a readline
en el EOF. Simplemente seguirá devolviendo una cadena vacía en este caso. Y cuando se agrega algo al archivo de registro, la lectura continuará desde donde se detuvo, según sea necesario.
Si está buscando una solución que use eventos o una biblioteca en particular, especifique esto en su pregunta. De lo contrario, creo que esta solución está bien.
Como puede ver en http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html , señalado por , WIN32 es relativamente complejo y mira directorios, no un solo archivo.
Me gustaría sugerirle que busque en IronPython , que es una implementación de .NET en Python. Con IronPython puede usar toda la funcionalidad .NET , incluyendo
System.IO.FileSystemWatcher
Que maneja archivos individuales con una interfaz de evento simple.
Echa un vistazo a pyinotify .
inotify reemplaza a dnotify (de una respuesta anterior) en linuxes más nuevos y permite la supervisión a nivel de archivo en lugar de a nivel de directorio.
Esta es otra modificación de la secuencia de comandos de Tim Goldan que se ejecuta en Linux y agrega un observador simple para la modificación de archivos utilizando un dict (archivo => tiempo).
uso: lo que sea.nombre.py ruta_de_dir_dir_watch
#!/usr/bin/env python
import os, sys, time
def files_to_timestamp(path):
files = [os.path.join(path, f) for f in os.listdir(path)]
return dict ([(f, os.path.getmtime(f)) for f in files])
if __name__ == "__main__":
path_to_watch = sys.argv[1]
print "Watching ", path_to_watch
before = files_to_timestamp(path_to_watch)
while 1:
time.sleep (2)
after = files_to_timestamp(path_to_watch)
added = [f for f in after.keys() if not f in before.keys()]
removed = [f for f in before.keys() if not f in after.keys()]
modified = []
for f in before.keys():
if not f in removed:
if os.path.getmtime(f) != before.get(f):
modified.append(f)
if added: print "Added: ", ", ".join(added)
if removed: print "Removed: ", ", ".join(removed)
if modified: print "Modified ", ", ".join(modified)
before = after
Este es un ejemplo de comprobación de un archivo para los cambios. Una que puede no ser la mejor manera de hacerlo, pero seguro que es un camino corto.
Herramienta práctica para reiniciar la aplicación cuando se han realizado cambios en la fuente. Hice esto cuando jugaba con pygame para poder ver los efectos inmediatamente después de guardar el archivo.
Cuando se utiliza en pygame, asegúrate de que las cosas en el bucle ''while'' estén ubicadas en tu ciclo de juego, también conocido como actualización o lo que sea. De lo contrario, su aplicación se atascará en un bucle infinito y no verá la actualización de su juego.
file_size_stored = os.stat(''neuron.py'').st_size
while True:
try:
file_size_current = os.stat(''neuron.py'').st_size
if file_size_stored != file_size_current:
restart_program()
except:
pass
Por si querías el código de reinicio que encontré en la web. Aquí está. (No es relevante para la pregunta, aunque podría ser útil)
def restart_program(): #restart application
python = sys.executable
os.execl(python, python, * sys.argv)
Diviértete haciendo electrones, haz lo que quieras que hagan.
La mejor y más simple solución es usar pygtail: https://pypi.python.org/pypi/pygtail
from pygtail import Pygtail
while True:
for line in Pygtail("some.log"):
sys.stdout.write(line)
La solución más simple para mí es usar la herramienta watchmed de watchmedo.
Desde https://pypi.python.org/pypi/watchdog ahora tengo un proceso que busca los archivos SQL en un directorio y los ejecuta si es necesario.
watchmedo shell-command /
--patterns="*.sql" /
--recursive /
--command=''~/Desktop/load_files_into_mysql_database.sh'' /
.
No conozco ninguna función específica de Windows. Puede intentar obtener el hash MD5 del archivo cada segundo / minuto / hora (depende de qué tan rápido lo necesite) y compararlo con el último hash. Cuando difiere, sabe que el archivo se ha modificado y ha leído las líneas más recientes.
No debería funcionar en Windows (¿quizás con cygwin?), Pero para usuarios de Unix, debe usar la llamada al sistema "fcntl". Aquí hay un ejemplo en Python. Es mayormente el mismo código si necesita escribirlo en C (los mismos nombres de funciones)
import time
import fcntl
import os
import signal
FNAME = "/HOME/TOTO/FILETOWATCH"
def handler(signum, frame):
print "File %s modified" % (FNAME,)
signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME, os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)
while True:
time.sleep(10000)
Para ver un solo archivo con sondeo y dependencias mínimas, aquí hay un ejemplo completo, basado en la respuesta de Deestan (arriba):
import os
import sys
import time
class Watcher(object):
running = True
refresh_delay_secs = 1
# Constructor
def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
self._cached_stamp = 0
self.filename = watch_file
self.call_func_on_change = call_func_on_change
self.args = args
self.kwargs = kwargs
# Look for changes
def look(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...
print(''File changed'')
if self.call_func_on_change is not None:
self.call_func_on_change(*self.args, **self.kwargs)
# Keep watching in a loop
def watch(self):
while self.running:
try:
# Look for changes
time.sleep(self.refresh_delay_secs)
self.look()
except KeyboardInterrupt:
print(''/nDone'')
break
except FileNotFoundError:
# Action on file not found
pass
except:
print(''Unhandled error: %s'' % sys.exc_info()[0])
# Call this function each time a change happens
def custom_action(text):
print(text)
watch_file = ''my_file.txt''
# watcher = Watcher(watch_file) # simple
watcher = Watcher(watch_file, custom_action, text=''yes, changed'') # also call custom action function
watcher.watch() # start the watch going
Probaría algo como esto.
try:
f = open(filePath)
except IOError:
print "No such file: %s" % filePath
raw_input("Press Enter to close window")
try:
lines = f.readlines()
while True:
line = f.readline()
try:
if not line:
time.sleep(1)
else:
functionThatAnalisesTheLine(line)
except Exception, e:
# handle the exception somehow (for example, log the trace) and raise the same exception again
raw_input("Press Enter to close window")
raise e
finally:
f.close()
El bucle comprueba si hay una o más líneas desde la última vez que se leyó el archivo; si lo hay, se lee y se pasa a la functionThatAnalisesTheLine
functionThatAnalisesTheLine. Si no, el script espera 1 segundo y vuelve a intentar el proceso.
Revisa mi respuesta a una pregunta similar . Podrías probar el mismo bucle en Python. Esta página sugiere:
import time
while 1:
where = file.tell()
line = file.readline()
if not line:
time.sleep(1)
file.seek(where)
else:
print line, # already has newline
También vea la pregunta tail () un archivo con Python .
Si desea una solución multiplataforma, verifique QFileSystemWatcher . Aquí un código de ejemplo (no saneado):
from PyQt4 import QtCore
@QtCore.pyqtSlot(str)
def directory_changed(path):
print(''Directory Changed!!!'')
@QtCore.pyqtSlot(str)
def file_changed(path):
print(''File Changed!!!'')
fs_watcher = QtCore.QFileSystemWatcher([''/path/to/files_1'', ''/path/to/files_2'', ''/path/to/files_3''])
fs_watcher.connect(fs_watcher, QtCore.SIGNAL(''directoryChanged(QString)''), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL(''fileChanged(QString)''), file_changed)
Si el sondeo es lo suficientemente bueno para usted, simplemente observaría si cambia la estadística del archivo de "hora modificada". Para leerlo:
os.stat(filename).st_mtime
(También tenga en cuenta que la solución de evento de cambio nativo de Windows no funciona en todas las circunstancias, por ejemplo, en unidades de red).
import os
class Monkey(object):
def __init__(self):
self._cached_stamp = 0
self.filename = ''/path/to/file''
def ook(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
class myThread (threading.Thread):
def __init__(self, threadID, fileName, directory, origin):
threading.Thread.__init__(self)
self.threadID = threadID
self.fileName = fileName
self.daemon = True
self.dir = directory
self.originalFile = origin
def run(self):
startMonitor(self.fileName, self.dir, self.originalFile)
def startMonitor(fileMonitoring,dirPath,originalFile):
hDir = win32file.CreateFile (
dirPath,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
# Wait for new data and call ProcessNewData for each new chunk that''s
# written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)
# For each change, check to see if it''s updating the file we''re
# interested in
for action, file_M in results:
full_filename = os.path.join (dirPath, file_M)
#print file, ACTIONS.get (action, "Unknown")
if len(full_filename) == len(fileMonitoring) and action == 3:
#copy to main file
...