txt - leer una linea especifica de un archivo en python
Obtenga las últimas n líneas de un archivo con Python, similar a la cola (25)
Estoy escribiendo un visor de archivos de registro para una aplicación web y para eso quiero paginar a través de las líneas del archivo de registro. Los elementos en el archivo se basan en línea con el elemento más nuevo en la parte inferior.
Entonces necesito un método tail()
que pueda leer n
líneas desde la parte inferior y soporte un desplazamiento. Lo que se me ocurrió es así:
def tail(f, n, offset=0):
"""Reads a n lines from f with an offset of offset lines."""
avg_line_length = 74
to_read = n + offset
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None]
avg_line_length *= 1.3
¿Es este un enfoque razonable? ¿Cuál es la forma recomendada de rastrear archivos de registro con compensaciones?
Actualice la solución @papercrane a python3. Abra el archivo con open(filename, ''rb'')
y:
def tail(f, window=20):
"""Returns the last `window` lines of file `f` as a list.
"""
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
remaining_bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and remaining_bytes > 0:
if remaining_bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
bunch = f.read(BUFSIZ)
else:
# file too small, start from beginning
f.seek(0, 0)
# only read what was not read
bunch = f.read(remaining_bytes)
bunch = bunch.decode(''utf-8'')
data.insert(0, bunch)
size -= bunch.count(''/n'')
remaining_bytes -= BUFSIZ
block -= 1
return ''''.join(data).splitlines()[-window:]
Aquí está mi respuesta. Pitón puro. Usar timeit parece bastante rápido. Contabilizando 100 líneas de un archivo de registro que tiene 100.000 líneas:
>>> timeit.timeit(''tail.tail(f, 100, 4098)'', ''import tail; f = open("log.txt", "r");'', number=10)
0.0014600753784179688
>>> timeit.timeit(''tail.tail(f, 100, 4098)'', ''import tail; f = open("log.txt", "r");'', number=100)
0.00899195671081543
>>> timeit.timeit(''tail.tail(f, 100, 4098)'', ''import tail; f = open("log.txt", "r");'', number=1000)
0.05842900276184082
>>> timeit.timeit(''tail.tail(f, 100, 4098)'', ''import tail; f = open("log.txt", "r");'', number=10000)
0.5394978523254395
>>> timeit.timeit(''tail.tail(f, 100, 4098)'', ''import tail; f = open("log.txt", "r");'', number=100000)
5.377126932144165
Aquí está el código:
import os
def tail(f, lines=1, _buffer=4098):
"""Tail a file and get X lines from the end"""
# place holder for the lines found
lines_found = []
# block counter will be multiplied by buffer
# to get the block size from the end
block_counter = -1
# loop until we find X lines
while len(lines_found) < lines:
try:
f.seek(block_counter * _buffer, os.SEEK_END)
except IOError: # either file is too small, or too many lines requested
f.seek(0)
lines_found = f.readlines()
break
lines_found = f.readlines()
# we found enough lines, get out
# Removed this line because it was redundant the while will catch
# it, I left it for history
# if len(lines_found) > lines:
# break
# decrement the block counter to get the
# next X bytes
block_counter -= 1
return lines_found[-lines:]
Aquí hay una implementación bastante simple:
with open(''/etc/passwd'', ''r'') as f:
try:
f.seek(0,2)
s = ''''
while s.count(''/n'') < 11:
cur = f.tell()
f.seek((cur - 10))
s = f.read(10) + s
f.seek((cur - 10))
print s
except Exception as e:
f.readlines()
Asume que un sistema unix en Python 2 puede hacer:
import os
def tail(f, n, offset=0):
stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
stdin.close()
lines = stdout.readlines(); stdout.close()
return lines[:,-offset]
Para python 3 puedes hacer:
import subprocess
def tail(f, n, offset=0):
proc = subprocess.Popen([''tail'', ''-n'', n + offset, f], stdout=subprocess.PIPE)
lines = proc.stdout.readlines()
return lines[:, -offset]
Aunque este no es realmente el lado eficiente con archivos grandes, este código es bastante directo:
- Lee el objeto de archivo,
f
. - Divide la cadena devuelta usando líneas nuevas,
/n
. Obtiene la matriz que muestra los últimos índices, usando el signo negativo para representar los últimos índices, y
:
para obtener un subcampo.def tail(f,n): return "/n".join(f.read().split("/n")[-n:])
El código que terminé usando. Creo que esto es lo mejor hasta ahora:
def tail(f, n, offset=None):
"""Reads a n lines from f with an offset of offset lines. The return
value is a tuple in the form ``(lines, has_more)`` where `has_more` is
an indicator that is `True` if there are more lines in the file.
"""
avg_line_length = 74
to_read = n + (offset or 0)
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None], /
len(lines) > to_read or pos > 0
avg_line_length *= 1.3
Esto puede ser más rápido que el tuyo. No hace suposiciones sobre la longitud de la línea. Retrocede por el archivo un bloque a la vez hasta que encuentra el número correcto de ''/ n'' caracteres.
def tail( f, lines=20 ):
total_lines_wanted = lines
BLOCK_SIZE = 1024
f.seek(0, 2)
block_end_byte = f.tell()
lines_to_go = total_lines_wanted
block_number = -1
blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
# from the end of the file
while lines_to_go > 0 and block_end_byte > 0:
if (block_end_byte - BLOCK_SIZE > 0):
# read the last block we haven''t yet read
f.seek(block_number*BLOCK_SIZE, 2)
blocks.append(f.read(BLOCK_SIZE))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count(''/n'')
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = ''''.join(reversed(blocks))
return ''/n''.join(all_read_text.splitlines()[-total_lines_wanted:])
No me gustan las suposiciones engañosas sobre la longitud de la línea cuando, como cuestión práctica, nunca se puede saber cosas como esa.
En general, esto ubicará las últimas 20 líneas en el primer o segundo pase a través del ciclo. Si su elemento de 74 caracteres es realmente preciso, usted hace el tamaño de bloque 2048 y alineará 20 líneas casi de inmediato.
Además, no quemo una gran cantidad de calorías cerebrales tratando de alinear la finura con los bloques físicos del sistema operativo. Usando estos paquetes de E / S de alto nivel, dudo que veas alguna consecuencia en el rendimiento al tratar de alinearte en los límites del bloque del sistema operativo. Si usa E / S de nivel inferior, entonces puede ver una aceleración.
Existen algunas implementaciones de tail en pypi que puedes instalar usando pip:
- mtFileUtil
- multitail
- log4tailer
- ...
Dependiendo de su situación, puede haber ventajas al usar una de estas herramientas existentes.
La respuesta de S.Lott arriba casi funciona para mí, pero termina dándome líneas parciales. Resulta que corrompe los datos en los límites del bloque porque los datos contienen los bloques de lectura en orden inverso. Cuando se llama a '''' .join (data), los bloques están en el orden incorrecto. Esto arregla eso.
def tail(f, window=20):
"""
Returns the last `window` lines of file `f` as a list.
"""
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and bytes > 0:
if bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
data.insert(0, f.read(BUFSIZ))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
data.insert(0, f.read(bytes))
linesFound = data[0].count(''/n'')
size -= linesFound
bytes -= BUFSIZ
block -= 1
return ''''.join(data).splitlines()[-window:]
Me pareció que el Popen anterior es la mejor solución. Es rápido y sucio y funciona. Para Python 2.6 en la máquina Unix, utilicé lo siguiente
def GetLastNLines(self, n, fileName):
"""
Name: Get LastNLines
Description: Gets last n lines using Unix tail
Output: returns last n lines of a file
Keyword argument:
n -- number of last lines to return
filename -- Name of the file you need to tail into
"""
p=subprocess.Popen([''tail'',''-n'',str(n),self.__fileName], stdout=subprocess.PIPE)
soutput,sinput=p.communicate()
return soutput
soutput tendrá contendrá las últimas n líneas del código. para iterar a través del surput línea por línea do:
for line in GetLastNLines(50,''myfile.log'').split(''/n''):
print line
No es el primer ejemplo que usa una deque, pero es más simple. Este es general: funciona en cualquier objeto iterable, no solo en un archivo.
#!/usr/bin/env python
import sys
import collections
def tail(iterable, N):
deq = collections.deque()
for thing in iterable:
if len(deq) >= N:
deq.popleft()
deq.append(thing)
for thing in deq:
yield thing
if __name__ == ''__main__'':
for line in tail(sys.stdin,10):
sys.stdout.write(line)
Para una mayor eficiencia con archivos muy grandes (común en situaciones de archivos de registro donde puede querer utilizar cola), generalmente quiere evitar leer todo el archivo (incluso si lo hace sin leer todo el archivo en la memoria a la vez). Sin embargo, lo hace Necesita de alguna manera resolver el desplazamiento en líneas en lugar de caracteres. Una posibilidad es leer hacia atrás con seek () char by char, pero esto es muy lento. En cambio, es mejor procesar en bloques más grandes.
Tengo una función de utilidad que escribí hace un tiempo para leer los archivos al revés que pueden usarse aquí.
import os, itertools
def rblocks(f, blocksize=4096):
"""Read file as series of blocks from end of file to start.
The data itself is in normal order, only the order of the blocks is reversed.
ie. "hello world" -> ["ld","wor", "lo ", "hel"]
Note that the file must be opened in binary mode.
"""
if ''b'' not in f.mode.lower():
raise Exception("File must be opened using binary mode.")
size = os.stat(f.name).st_size
fullblocks, lastblock = divmod(size, blocksize)
# The first(end of file) block will be short, since this leaves
# the rest aligned on a blocksize boundary. This may be more
# efficient than having the last (first in file) block be short
f.seek(-lastblock,2)
yield f.read(lastblock)
for i in range(fullblocks-1,-1, -1):
f.seek(i * blocksize)
yield f.read(blocksize)
def tail(f, nlines):
buf = ''''
result = []
for block in rblocks(f):
buf = block + buf
lines = buf.splitlines()
# Return all lines except the first (since may be partial)
if lines:
result.extend(lines[1:]) # First line may not be complete
if(len(result) >= nlines):
return result[-nlines:]
buf = lines[0]
return ([buf]+result)[-nlines:]
f=open(''file_to_tail.txt'',''rb'')
for line in tail(f, 20):
print line
[Editar] Se agregó una versión más específica (evita la necesidad de invertir dos veces)
Pensándolo bien, esto es probablemente tan rápido como cualquier cosa aquí.
def tail( f, window=20 ):
lines= ['''']*window
count= 0
for l in f:
lines[count%window]= l
count += 1
print lines[count%window:], lines[:count%window]
Es mucho más simple. Y parece desgarrarse a buen ritmo.
Publicar una respuesta a instancias de los comentaristas sobre mi respuesta a una pregunta similar donde se utilizó la misma técnica para mutar la última línea de un archivo, no solo obtenerlo.
Para un archivo de tamaño significativo, mmap
es la mejor manera de hacerlo. Para mejorar la respuesta de mmap
existente, esta versión es portátil entre Windows y Linux, y debe ejecutarse más rápido (aunque no funcionará sin algunas modificaciones en Python de 32 bits con archivos en el rango de GB, consulte la otra respuesta para obtener sugerencias sobre el manejo esto, y para modificar para trabajar en Python 2 ).
import io # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap
def skip_back_lines(mm, numlines, startidx):
''''''Factored out to simplify handling of n and offset''''''
for _ in itertools.repeat(None, numlines):
startidx= mm.rfind(b''/n'', 0, startidx)
if startidx< 0:
break
return startidx
def tail(f, n, offset=0):
# Reopen file in binary mode
with io.open(f.name, ''rb'') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# len(mm) - 1 handles files ending w/newline by getting the prior line
startofline = skip_back_lines(mm, offset, len(mm) - 1)
if startofline < 0:
return [] # Offset lines consumed whole file, nothing to return
# If using a generator function (yield-ing, see below),
# this should be a plain return, no empty list
endoflines = startofline + 1 # Slice end to omit offset lines
# Find start of lines to capture (add 1 to move from newline to beginning of following line)
startofline = skip_back_lines(mm, n, startofline) + 1
# Passing True to splitlines makes it return the list of lines without
# removing the trailing newline (if any), so list mimics f.readlines()
return mm[startofline:endoflines].splitlines(True)
# If Windows style /r/n newlines need to be normalized to /n, and input
# is ASCII compatible, can normalize newlines with:
# return mm[startofline:endoflines].replace(os.linesep.encode(''ascii''), b''/n'').splitlines(True)
Esto supone que la cantidad de líneas conectadas es lo suficientemente pequeña como para poder leerlas todas en la memoria de una vez; también podría hacer de esto una función de generador y leer manualmente una línea a la vez reemplazando la línea final por:
mm.seek(startofline)
# Call mm.readline n times, or until EOF, whichever comes first
for line in itertools.islice(iter(mm.readline, b''''), n):
yield line
Por último, esto se lee en modo binario (es necesario usar mmap
) para que dé líneas str
(Py2) y líneas de bytes
(Py3); si desea unicode
(Py2) o str
(Py3), el enfoque iterativo podría modificarse para descodificarlo y / o corregir nuevas líneas:
lines = itertools.islice(iter(mm.readline, b''''), n)
if f.encoding: # Decode if the passed file was opened with a specific encoding
lines = (line.decode(f.encoding) for line in lines)
if ''b'' not in f.mode: # Fix line breaks if passed file opened in text mode
lines = (line.replace(os.linesep, ''/n'') for line in lines)
for line in lines:
yield line
Nota: Escribí todo esto en una máquina donde no tengo acceso a Python para probar. Por favor, avíseme si escribí algo; esto fue lo suficientemente similar a mi otra respuesta que creo que debería funcionar, pero los ajustes (por ejemplo, el manejo de un offset
) podrían conducir a errores sutiles. Por favor, hágamelo saber en los comentarios si hay algún error.
Según la respuesta de Eyecue (10 de junio de 2010 a las 21:28): esta clase agrega el método head () y tail () al objeto de archivo.
class File(file):
def head(self, lines_2find=1):
self.seek(0) #Rewind file
return [self.next() for x in xrange(lines_2find)]
def tail(self, lines_2find=1):
self.seek(0, 2) #go to end of file
bytes_in_file = self.tell()
lines_found, total_bytes_scanned = 0, 0
while (lines_2find+1 > lines_found and
bytes_in_file > total_bytes_scanned):
byte_block = min(1024, bytes_in_file-total_bytes_scanned)
self.seek(-(byte_block+total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += self.read(1024).count(''/n'')
self.seek(-total_bytes_scanned, 2)
line_list = list(self.readlines())
return line_list[-lines_2find:]
Uso:
f = File(''path/to/file'', ''r'')
f.head(3)
f.tail(3)
Si leer el archivo completo es aceptable, utilice un deque.
from collections import deque
deque(f, maxlen=n)
Antes de 2.6, deques no tenía una opción máxima, pero es bastante fácil de implementar.
import itertools
def maxque(items, size):
items = iter(items)
q = deque(itertools.islice(items, size))
for item in items:
del q[0]
q.append(item)
return q
Si es un requisito leer el archivo desde el final, entonces use una búsqueda de galope (también conocido como exponencial).
def tail(f, n):
assert n >= 0
pos, lines = n+1, []
while len(lines) <= n:
try:
f.seek(-pos, 2)
except IOError:
f.seek(0)
break
finally:
lines = list(f)
pos *= 2
return lines[-n:]
Solución simple y rápida con mmap:
import mmap
import os
def tail(filename, n):
"""Returns last n lines from the filename. No exception handling"""
size = os.path.getsize(filename)
with open(filename, "rb") as f:
# for Windows the mmap parameters are different
fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
try:
for i in xrange(size - 1, -1, -1):
if fm[i] == ''/n'':
n -= 1
if n == -1:
break
return fm[i + 1 if i else 0:].splitlines()
finally:
fm.close()
Tuve que leer un valor específico de la última línea de un archivo y tropecé con este hilo. En lugar de reinventar la rueda en Python, terminé con un pequeño script de shell, guardado como / usr / local / bin / get_last_netp:
#! /bin/bash
tail -n1 /home/leif/projects/transfer/export.log | awk {''print $14''}
Y en el programa Python:
from subprocess import check_output
last_netp = int(check_output("/usr/local/bin/get_last_netp"))
Una versión compatible con python3 aún más limpia que no se inserta sino que agrega y revierte:
def tail(f, window=1):
"""
Returns the last `window` lines of file `f` as a list of bytes.
"""
if window == 0:
return b''''
BUFSIZE = 1024
f.seek(0, 2)
end = f.tell()
nlines = window + 1
data = []
while nlines > 0 and end > 0:
i = max(0, end - BUFSIZE)
nread = min(end, BUFSIZE)
f.seek(i)
chunk = f.read(nread)
data.append(chunk)
nlines -= chunk.count(b''/n'')
end -= nread
return b''/n''.join(b''''.join(reversed(data)).splitlines()[-window:])
Úselo así:
with open(path, ''rb'') as f:
last_lines = tail(f, 3).decode(''utf-8'')
Varias de estas soluciones tienen problemas si el archivo no termina en / n o si se asegura que se lea la primera línea completa.
def tail(file, n=1, bs=1024):
f = open(file)
f.seek(-1,2)
l = 1-f.read(1).count(''/n'') # If file doesn''t end in /n, count it anyway.
B = f.tell()
while n >= l and B > 0:
block = min(bs, B)
B -= block
f.seek(B, 0)
l += f.read(block).count(''/n'')
f.seek(B, 0)
l = min(l,n) # discard first (incomplete) line if l > n
lines = f.readlines()[-l:]
f.close()
return lines
basado en la respuesta más votado de S.Lott (25 de septiembre de 2008 a las 21:43), pero corregido para archivos pequeños.
def tail(the_file, lines_2find=20):
the_file.seek(0, 2) #go to end of file
bytes_in_file = the_file.tell()
lines_found, total_bytes_scanned = 0, 0
while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned:
byte_block = min(1024, bytes_in_file-total_bytes_scanned)
the_file.seek(-(byte_block+total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += the_file.read(1024).count(''/n'')
the_file.seek(-total_bytes_scanned, 2)
line_list = list(the_file.readlines())
return line_list[-lines_2find:]
#we read at least 21 line breaks from the bottom, block by block for speed
#21 to ensure we don''t get a half line
Espero que esto sea útil.
puede ir al final de su archivo con f.seek (0, 2) y luego leer líneas una por una con el siguiente reemplazo para readline ():
def readline_backwards(self, f):
backline = ''''
last = ''''
while not last == ''/n'':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
backline = last
last = ''''
while not last == ''/n'':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
f.seek(1, 1)
return backline
This is my version of tailf
import sys, time, os
filename = ''path to file''
try:
with open(filename) as f:
size = os.path.getsize(filename)
if size < 1024:
s = size
else:
s = 999
f.seek(-s, 2)
l = f.read()
print l
while True:
line = f.readline()
if not line:
time.sleep(1)
continue
print line
except IOError:
pass
import itertools
fname = ''log.txt''
offset = 5
n = 10
with open(fname) as f:
n_last_lines = list(reversed([x for x in itertools.islice(f, None)][-(offset+1):-(offset+n+1):-1]))
import time
attemps = 600
wait_sec = 5
fname = "YOUR_PATH"
with open(fname, "r") as f:
where = f.tell()
for i in range(attemps):
line = f.readline()
if not line:
time.sleep(wait_sec)
f.seek(where)
else:
print line, # already has newline