pipes example ejemplo linux bash logging fifo

example - Linux fifo sin bloqueo(registro a petición)



mkfifo example (8)

Sin embargo, esto crearía un archivo de registro cada vez mayor, incluso si no se utiliza hasta que el disco se quede sin espacio.

¿Por qué no rotar periódicamente los registros? Incluso hay un programa para hacerlo por usted logrotate .

También hay un sistema para generar mensajes de registro y hacer cosas diferentes con ellos según el tipo. Se llama syslog .

Incluso podrías combinar los dos. Haga que su programa genere mensajes syslog, configure syslog para colocarlos en un archivo y use logrotate para asegurarse de que no llenen el disco.

Si resultó que estaba escribiendo para un pequeño sistema integrado y la salida del programa es pesada, hay una variedad de técnicas que podría considerar.

  • Syslog remoto: envía los mensajes syslog a un servidor syslog en la red.
  • Use los niveles de gravedad disponibles en syslog para hacer cosas diferentes con los mensajes. Por ejemplo, descartar "INFO" pero registrar y reenviar "ERR" o superior. Por ejemplo, a la consola
  • Use un controlador de señal en su programa para volver a leer la configuración en HUP y variar la generación de registros "a pedido" de esta manera.
  • Haga que su programa escuche en un socket Unix y escriba mensajes cuando esté abierto. Incluso podría implementar una consola interactiva en su programa de esta manera.
  • Utilizando un archivo de configuración, proporcione control granular de la salida de registro.

Me gusta registrar una salida de programas ''a pedido''. P.ej. la salida se registra en el terminal, pero otro proceso puede enlazarse en la salida actual en cualquier momento.

La forma clásica sería:

myprogram 2>&1 | tee /tmp/mylog

y bajo demanda

tail /tmp/mylog

Sin embargo, esto crearía un archivo de registro cada vez mayor, incluso si no se utiliza hasta que el disco se quede sin espacio. Entonces mi intento fue:

mkfifo /tmp/mylog myprogram 2>&1 | tee /tmp/mylog

y bajo demanda

cat /tmp/mylog

Ahora puedo leer / tmp / mylog en cualquier momento. Sin embargo, cualquier salida bloquea el programa hasta que se lea / tmp / mylog. Me gusta el fifo para eliminar cualquier dato entrante que no se vuelva a leer. ¿Como hacer eso?


BusyBox de uso frecuente en dispositivos integrados puede crear un registro almacenado en RAM por

syslogd -C

que puede ser llenado por

logger

y leer por

logread

Funciona bastante bien, pero solo proporciona un registro global.


El problema con el enfoque dado de fifo es que todo se bloqueará cuando se llene el búfer de la tubería y no se lleve a cabo ningún proceso de lectura.

Para el enfoque fifo al trabajo, creo que tendrías que implementar un modelo de servidor cliente de canalización similar al mencionado en BASH: Mejor arquitectura para leer de dos flujos de entrada (ver el código ligeramente modificado a continuación, código de ejemplo 2).

Para una solución alternativa, también podría usar un while ... read constructo en lugar de tee stdout a una tubería con nombre implementando un mecanismo de conteo dentro del ciclo while ... read que sobrescribirá el archivo de registro periódicamente por un número específico de líneas. Esto evitaría un archivo de registro cada vez mayor (código de muestra 1).

# sample code 1 # terminal window 1 rm -f /tmp/mylog touch /tmp/mylog while sleep 2; do date ''+%Y-%m-%d_%H.%M.%S''; done 2>&1 | while IFS="" read -r line; do lno=$((lno+1)) #echo $lno array[${lno}]="${line}" if [[ $lno -eq 10 ]]; then lno=$((lno+1)) array[${lno}]="-------------" printf ''%s/n'' "${array[@]}" > /tmp/mylog unset lno array fi printf ''%s/n'' "${line}" done # terminal window 2 tail -f /tmp/mylog #------------------------ # sample code 2 # code taken from: # https://.com/questions/6702474/bash-best-architecture-for-reading-from-two-input-streams # terminal window 1 # server ( rm -f /tmp/to /tmp/from mkfifo /tmp/to /tmp/from while true; do while IFS="" read -r -d $''/n'' line; do printf ''%s/n'' "${line}" done </tmp/to >/tmp/from & bgpid=$! exec 3>/tmp/to exec 4</tmp/from trap "kill -TERM $bgpid; exit" 0 1 2 3 13 15 wait "$bgpid" echo "restarting..." done ) & serverpid=$! #kill -TERM $serverpid # client ( exec 3>/tmp/to; exec 4</tmp/from; while IFS="" read -r -d $''/n'' <&4 line; do if [[ "${line:0:1}" == $''/177'' ]]; then printf ''line from stdin: %s/n'' "${line:1}" > /dev/null else printf ''line from fifo: %s/n'' "$line" > /dev/null fi done & trap "kill -TERM $"''!; exit'' 1 2 3 13 15 while IFS="" read -r -d $''/n'' line; do # can we make it atomic? # sleep 0.5 # dd if=/tmp/to iflag=nonblock of=/dev/null # flush fifo printf ''/177%s/n'' "${line}" done >&3 ) & # kill -TERM $! # terminal window 2 # tests echo hello > /tmp/to yes 1 | nl > /tmp/to yes 1 | nl | tee /tmp/to while sleep 2; do date ''+%Y-%m-%d_%H.%M.%S''; done 2>&1 | tee -a /tmp/to # terminal window 3 cat /tmp/to | head -n 10


Este es un hilo (muy) viejo, pero me he encontrado con un problema similar últimamente. De hecho, lo que necesitaba era una clonación de stdin a stdout con una copia a una tubería que no bloquea. la persona propuesta en la primera respuesta realmente ayudó allí, pero fue (para mi caso de uso) demasiado volátil. Lo que significa que perdí datos que podría haber procesado si hubiera llegado a tiempo.

El escenario al que me enfrenté es que tengo un proceso (some_process) que agrega algunos datos y escribe sus resultados cada tres segundos para stdout. La configuración (simplificada) se veía así (en la configuración real estoy usando una tubería con nombre):

some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz

Ahora, raw_data.gz tiene que estar comprimido y debe estar completo. Ftee hace este trabajo muy bien. Pero la tubería que estoy usando en el medio era demasiado lenta para captar los datos vaciados, pero fue lo suficientemente rápida como para procesar todo si pudiera llegar a ella, lo cual se probó con un tee normal. Sin embargo, un tee normal bloquea si algo le sucede a la tubería sin nombre, y como quiero ser capaz de enganchar a la demanda, el tee no es una opción. Volver al tema: Mejoró cuando puse un búfer en el medio, lo que resulta en:

some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz

Pero eso todavía estaba perdiendo datos que podría haber procesado. Así que seguí y extendí la frase propuesta anteriormente a una versión en búfer (bftee). Todavía tiene todas las mismas propiedades, pero utiliza un búfer interno (¿ineficiente?) En caso de que falle una escritura. Todavía pierde datos si el búfer se ejecuta completo, pero funciona muy bien para mi caso. Como siempre, hay mucho margen de mejora, pero al copiar el código de aquí me gustaría compartirlo con las personas que podrían usarlo.

/* bftee - clone stdin to stdout and to a buffered, non-blocking pipe (c) racic@ (c) fabraxias@ WTFPL Licence */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include <signal.h> #include <unistd.h> // the number of sBuffers that are being held at a maximum #define BUFFER_SIZE 4096 #define BLOCK_SIZE 2048 typedef struct { char data[BLOCK_SIZE]; int bytes; } sBuffer; typedef struct { sBuffer *data; //array of buffers int bufferSize; // number of buffer in data int start; // index of the current start buffer int end; // index of the current end buffer int active; // number of active buffer (currently in use) int maxUse; // maximum number of buffers ever used int drops; // number of discarded buffer due to overflow int sWrites; // number of buffer written to stdout int pWrites; // number of buffers written to pipe } sQueue; void InitQueue(sQueue*, int); // initialized the Queue void PushToQueue(sQueue*, sBuffer*, int); // pushes a buffer into Queue at the end sBuffer *RetrieveFromQueue(sQueue*); // returns the first entry of the buffer and removes it or NULL is buffer is empty sBuffer *PeakAtQueue(sQueue*); // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer void ShrinkInQueue(sQueue *queue, int); // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty void DelFromQueue(sQueue *queue); // removes the first entry of the queue static void sigUSR1(int); // signal handled for SUGUSR1 - used for stats output to stderr static void sigINT(int); // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ? sQueue queue; // Buffer storing the overflow volatile int quit; // for quiting the main loop int main(int argc, char *argv[]) { int readfd, writefd; struct stat status; char *fifonam; sBuffer buffer; ssize_t bytes; int bufferSize = BUFFER_SIZE; signal(SIGPIPE, SIG_IGN); signal(SIGUSR1, sigUSR1); signal(SIGTERM, sigINT); signal(SIGINT, sigINT); /** Handle commandline args and open the pipe for non blocking writing **/ if(argc < 2 || argc > 3) { printf("Usage:/n someprog 2>&1 | %s FIFO [BufferSize]/n" "FIFO - path to a named pipe, required argument/n" "BufferSize - temporary Internal buffer size in case write to FIFO fails/n", argv[0]); exit(EXIT_FAILURE); } fifonam = argv[1]; if (argc == 3) { bufferSize = atoi(argv[2]); if (bufferSize == 0) bufferSize = BUFFER_SIZE; } readfd = open(fifonam, O_RDONLY | O_NONBLOCK); if(-1==readfd) { perror("bftee: readfd: open()"); exit(EXIT_FAILURE); } if(-1==fstat(readfd, &status)) { perror("bftee: fstat"); close(readfd); exit(EXIT_FAILURE); } if(!S_ISFIFO(status.st_mode)) { printf("bftee: %s in not a fifo!/n", fifonam); close(readfd); exit(EXIT_FAILURE); } writefd = open(fifonam, O_WRONLY | O_NONBLOCK); if(-1==writefd) { perror("bftee: writefd: open()"); close(readfd); exit(EXIT_FAILURE); } close(readfd); InitQueue(&queue, bufferSize); quit = 0; while(!quit) { // read from STDIN bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data)); // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading if (bytes < 0 && errno == EINTR) continue; if (bytes <= 0) break; // save the number if read bytes in the current buffer to be processed buffer.bytes = bytes; // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less. bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes); queue.sWrites++; if(-1==bytes) { perror("ftee: writing to stdout"); break; } sBuffer *tmpBuffer = NULL; // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write // the buffered data to the pipe. This continues until the Buffer is empty or the write fails. // NOTE: bytes cannot be -1 (that would have failed just before) when the loop is entered. while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) { // write the oldest buffer to the pipe bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes); // the written bytes are equal to the buffer size, the write is successful - remove the buffer and continue if (bytes == tmpBuffer->bytes) { DelFromQueue(&queue); queue.pWrites++; } else if (bytes > 0) { // on a positive bytes value there was a partial write. we shrink the current buffer // and handle this as a write failure ShrinkInQueue(&queue, bytes); bytes = -1; } } // There are several cases here: // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped. if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes); // again, there are several cases what can happen here // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue if (bytes != buffer.bytes) PushToQueue(&queue, &buffer, bytes); else queue.pWrites++; } // once we are done with STDIN, try to flush the buffer to the named pipe if (queue.active > 0) { //set output buffer to block - here we wait until we can write everything to the named pipe // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. int saved_flags = fcntl(writefd, F_GETFL); int new_flags = saved_flags & ~O_NONBLOCK; int res = fcntl(writefd, F_SETFL, new_flags); sBuffer *tmpBuffer = NULL; //TODO: this does not handle partial writes yet while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) { int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes); if (bytes != -1) DelFromQueue(&queue); } } close(writefd); } /** init a given Queue **/ void InitQueue (sQueue *queue, int bufferSize) { queue->data = calloc(bufferSize, sizeof(sBuffer)); queue->bufferSize = bufferSize; queue->start = 0; queue->end = 0; queue->active = 0; queue->maxUse = 0; queue->drops = 0; queue->sWrites = 0; queue->pWrites = 0; } /** push a buffer into the Queue**/ void PushToQueue(sQueue *queue, sBuffer *p, int offset) { if (offset < 0) offset = 0; // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead if (offset == p->bytes) return; // in this case there are 0 bytes to add to the queue. Nothing to write // this should never happen - offset cannot be bigger than the buffer itself. Panic action if (offset > p->bytes) {perror("got more bytes to buffer than we read/n"); exit(EXIT_FAILURE);} // debug output on a partial write. TODO: remove this line // if (offset > 0 ) fprintf(stderr, "partial write to buffer/n"); // copy the data from the buffer into the queue and remember its size memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset); queue->data[queue->end].bytes = p->bytes - offset; // move the buffer forward queue->end = (queue->end + 1) % queue->bufferSize; // there is still space in the buffer if (queue->active < queue->bufferSize) { queue->active++; if (queue->active > queue->maxUse) queue->maxUse = queue->active; } else { // Overwriting the oldest. Move start to next-oldest queue->start = (queue->start + 1) % queue->bufferSize; queue->drops++; } } /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/ sBuffer *RetrieveFromQueue(sQueue *queue) { if (!queue->active) { return NULL; } queue->start = (queue->start + 1) % queue->bufferSize; queue->active--; return &(queue->data[queue->start]); } /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/ sBuffer *PeakAtQueue(sQueue *queue) { if (!queue->active) { return NULL; } return &(queue->data[queue->start]); } /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/ void ShrinkInQueue(sQueue *queue, int bytes) { // cannot remove negative amount of bytes - this is an error case. Ignore it if (bytes <= 0) return; // remove the entry if the offset is equal to the buffer size if (queue->data[queue->start].bytes == bytes) { DelFromQueue(queue); return; }; // this is a partial delete if (queue->data[queue->start].bytes > bytes) { //shift the memory by the offset memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes); queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes; return; } // panic is the are to remove more than we have the buffer if (queue->data[queue->start].bytes < bytes) { perror("we wrote more than we had - this should never happen/n"); exit(EXIT_FAILURE); return; } } /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/ void DelFromQueue(sQueue *queue) { if (queue->active > 0) { queue->start = (queue->start + 1) % queue->bufferSize; queue->active--; } } /** Stats output on SIGUSR1 **/ static void sigUSR1(int signo) { fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i/n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops); } /** handle signal for terminating **/ static void sigINT(int signo) { quit++; if (quit > 1) exit(EXIT_FAILURE); }

Esta versión toma un argumento más (opcional) que especifica el número de bloques que se almacenan en búfer para la tubería. Mi llamada de muestra ahora se ve así:

some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz

lo que resulta en 16384 bloques para ser almacenados en el buffer antes de que ocurran los descartes. esto usa aproximadamente 32 Mbytes más de memoria, pero ... ¿a quién le importa?

Por supuesto, en el entorno real estoy usando un tubo con nombre para que pueda conectar y desconectar según sea necesario. Hay un aspecto como este:

mkfifo named_pipe some_process | bftee named_pipe 16384 | gzip > raw_data.gz & cat named_pipe | onlineAnalysis.pl > results

Además, el proceso reacciona a las señales de la siguiente manera: SIGUSR1 -> imprime los contadores en STDERR SIGTERM, SIGINT -> primero sale del bucle principal y vacía el búfer a la tubería, el segundo finaliza el programa inmediatamente.

Tal vez esto ayude a alguien en el futuro ... Disfruta


Inspirado por su pregunta, he escrito un programa simple que le permitirá hacer esto:

$ myprogram 2>&1 | ftee /tmp/mylog

Se comporta de manera similar a tee pero clona el stdin a stdout y a una tubería con nombre (un requisito por ahora) sin bloqueo. Esto significa que si quieres iniciar sesión de esta manera, es posible que pierdas tus datos de registro, pero supongo que es aceptable en tu situación. El truco es bloquear la señal SIGPIPE e ignorar el error al escribir en un fifo roto. Esta muestra se puede optimizar de varias maneras, por supuesto, pero hasta ahora, hace el trabajo, supongo.

/* ftee - clone stdin to stdout and to a named pipe (c) racic@ WTFPL Licence */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include <signal.h> #include <unistd.h> int main(int argc, char *argv[]) { int readfd, writefd; struct stat status; char *fifonam; char buffer[BUFSIZ]; ssize_t bytes; signal(SIGPIPE, SIG_IGN); if(2!=argc) { printf("Usage:/n someprog 2>&1 | %s FIFO/n FIFO - path to a" " named pipe, required argument/n", argv[0]); exit(EXIT_FAILURE); } fifonam = argv[1]; readfd = open(fifonam, O_RDONLY | O_NONBLOCK); if(-1==readfd) { perror("ftee: readfd: open()"); exit(EXIT_FAILURE); } if(-1==fstat(readfd, &status)) { perror("ftee: fstat"); close(readfd); exit(EXIT_FAILURE); } if(!S_ISFIFO(status.st_mode)) { printf("ftee: %s in not a fifo!/n", fifonam); close(readfd); exit(EXIT_FAILURE); } writefd = open(fifonam, O_WRONLY | O_NONBLOCK); if(-1==writefd) { perror("ftee: writefd: open()"); close(readfd); exit(EXIT_FAILURE); } close(readfd); while(1) { bytes = read(STDIN_FILENO, buffer, sizeof(buffer)); if (bytes < 0 && errno == EINTR) continue; if (bytes <= 0) break; bytes = write(STDOUT_FILENO, buffer, bytes); if(-1==bytes) perror("ftee: writing to stdout"); bytes = write(writefd, buffer, bytes); if(-1==bytes);//Ignoring the errors } close(writefd); return(0); }

Puedes compilarlo con este comando estándar:

$ gcc ftee.c -o ftee

Puede verificarlo rápidamente ejecutando, por ejemplo:

$ ping www.google.com | ftee /tmp/mylog

$ cat /tmp/mylog

También tenga en cuenta que este no es un multiplexor. Solo puede tener un proceso haciendo $ cat /tmp/mylog a la vez.


Parece que el operador de redirección bash <> ( 3.6.10 Abrir descriptores de archivo para lectura y escritura ) hace que la escritura en archivo / fifo se abra sin bloqueos. Esto debería funcionar:

$ mkfifo /tmp/mylog $ exec 4<>/tmp/mylog $ myprogram 2>&1 | tee >&4 $ cat /tmp/mylog # on demend

Solución dada por gniourf_gniourf en el canal IRC #bash.


Si puede instalar la pantalla en el dispositivo incorporado, puede ejecutar ''myprogram'' y separarlo, y volver a conectarlo en cualquier momento que desee ver el registro. Algo como:

$ screen -t sometitle myprogram Hit Ctrl+A, then d to detach it.

Cuando quiera ver el resultado, vuelva a conectarlo:

$ screen -DR sometitle Hit Ctrl-A, then d to detach it again.

De esta forma, no tendrá que preocuparse por la salida del programa usando espacio en disco.


Si su proceso escribe en cualquier archivo de registro y luego limpia el archivo y comienza de nuevo de vez en cuando, para que no sea demasiado grande, o usa logrotate.

tail --follow = name --retry my.log

Es todo lo que necesitas. Obtendrá la misma desviación que su terminal.

Nada no estándar es necesario.

No lo he intentado con pequeños archivos de registro, pero todos nuestros registros giran así y nunca he notado la pérdida de líneas.