Так,
tail -q -n+1 -f --pid=stop-tail-when-this-is-gone fifo1 fifo2 fifo3
почти работает (как уже упоминалось в первоначальных комментариях об этой более ранней версии моего ответа, хотя вам может понадобиться «для f in fifo *; cat </dev/null> $ f & done» заранее, чтобы гарантировать, что все FIFO открыты для записи потому что coreutils tail открывает их O_RDONLY без O_NONBLOCK).
К сожалению, есть ошибка в том, что tail
осторожен с окончаниями строк / записей только с входами из каналов на stdin, но не с входами из именованных каналов / FIFO в аргументах. Когда-нибудь кто-нибудь может починить coreutils хвост.
Тем временем, чтобы получить настоящую очередь для нескольких потребителей / одного производителя с учетом окончания строки, вы можете использовать простую программу на 100 строк из C, которую я называю tailpipes.c
:
#include <stdio.h>
#include <stdlib.h>
#include <string.h> //TODO: Find&document build environments lacking memrchr
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <errno.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#define errstr strerror(errno)
char const * const Use = "%s: %s\n\nUsage:\n\n"
" %s [-p PID] [-o OPEN_MODE(RW)] [-d DLM(\\n)] [-s SEC(.01)] PATH1 PATH2..\n\n"
"Read delimited records (lines by default) from all input paths, writing only\n"
"complete records to stdout and changing to a stop-at-EOF mode upon receiving\n"
"SIGHUP (unlike \"tail -fqn+1\" which just dies) OR when we first notice that\n"
"PID does not exist (if PID is given). Since by default fifos are opened RW,\n"
"signal/PID termination is needed to not loop forever, but said FIFOs may be\n"
"closed & reopened by other processes as often as is convenient. For one-shot\n"
"writing style, ending input reads at the first EOF, use \"-oRO\". Also, DLM\n"
"adjusts the record delimiter byte from the default newline, and SEC adjusts\n"
"max select sleep time. Any improperly terminated final records are sent to\n"
"stderr at the end of execution (with a label and bracketing).\n";
int writer_done;
void sig(int signum) { writer_done = 1; }
int main(int N, char *V[]) {
signed char ch;
char *buf[N-1], delim = '\n', *V0 = V[0], *eol;
int len[N-1], fds[N-1], nBf[N-1], i, fdMx = 0, nS = 0, nF = 0,
oFlags = O_RDWR;
pid_t pid = 0;
ssize_t nR, nW;
struct timespec tmOut = { 0, 10000000 }; //10 ms select time out
fd_set fdRdMaster, fdRd;
//If we get signaled before here, this program dies and data may be lost.
//If possible use -p PID option w/pre-extant PID of appropriate lifetime.
signal(SIGHUP, sig); //Install sig() for SIGHUP
memset((void *)fds, 0, sizeof fds);
memset((void *)len, 0, sizeof len);
FD_ZERO(&fdRdMaster);
fdRd = fdRdMaster;
while ((ch = getopt(N, V, "d:p:s:o:")) != -1)
switch (ch) { //For \0 do '' as a sep CLI arg
double tO;
case 'd': delim = optarg ? *optarg : '\n'; break;
case 'p': pid = optarg ? atoi(optarg) : 0; break;
case 's': tO = optarg ? atof(optarg) : .01;
tmOut.tv_sec = (long)tO;
tmOut.tv_nsec = 1e9 * (tO - tmOut.tv_sec);
break;
case 'o': oFlags = (optarg && strcasecmp(optarg, "ro") == 0) ?
O_RDONLY | O_NONBLOCK : O_RDWR;
break;
default: return fprintf(stderr, Use, V0, "bad option", V0), 1;
}
V += optind; N -= optind; //Shift off option args
if (N < 1)
return fprintf(stderr, Use, V0, "too few arguments", V0), 2;
setvbuf(stdout, NULL, _IONBF, 65536); //Full pipe on Linux
for (i = 0; i < N; i++) //Check for any available V[]
if ((fds[i] = open(V[i], oFlags)) != -1) {
struct stat st;
fstat(fds[i], &st);
if (!S_ISFIFO(st.st_mode))
return fprintf(stderr,"%s: %s not a named pipe\n", V0, V[i]), 3;
nF++;
FD_SET(fds[i], &fdRdMaster); //Add fd to master copy for pselect
buf[i] = malloc(nBf[i] = 4096);
if (fds[i] > fdMx)
fdMx = fds[i];
} else if (errno == EINTR) { //We may get signaled to finish up..
i--; continue; //..before we even this far.
} else
return fprintf(stderr, "%s: open(%s): %s\n", V0, V[i], errstr), 3;
fdMx++;
fdRd = fdRdMaster;
while (nF && (nS = pselect(fdMx, &fdRd, NULL, NULL, &tmOut, NULL)) != -99) {
if (pid && kill(pid, 0) != 0 && errno != EPERM) //Given pid didn't exist
writer_done = 1;
if (nS == 0 && writer_done) //No input & no writers
break;
else if (nS == -1) { //Some select error:
if (errno != EINTR && errno == EAGAIN) //..fatal or retry
return fprintf(stderr, "%s: select: %s\n", V0, errstr), 4;
continue;
}
for (i = 0; nS > 0 && i < N; i++) { //For all fds..
if (fds[i] < 0 || !FD_ISSET(fds[i], &fdRd)) //with readable data
continue;
if ((nR = read(fds[i], buf[i]+len[i], nBf[i] - len[i])) < 0) {
if (errno != EAGAIN && errno != EINTR)
fprintf(stderr, "%s: read: %s\n", V0, errstr);
continue;
} else if (oFlags == (O_RDONLY | O_NONBLOCK) && nR == 0) {
FD_CLR(fds[i], &fdRdMaster);
nF--;
free(buf[i]);
}
len[i] += nR; //Update Re: read data
if ((eol = memrchr(buf[i], delim, len[i]))) {
nW = eol - buf[i] + 1; //Only to last delim
if (fwrite(buf[i], nW, 1, stdout) == 1) {
memmove(buf[i], buf[i] + nW, len[i] - nW);
len[i] -= nW; //Residual buffer shift
} else
return fprintf(stderr, "%s: %d bytes->stdout failed: %s\n",
V0, len[i], errstr), 5;
} else if (len[i] == nBf[i]) { //NoDelim&FullBuf=>GROW
void *tmp;
if (nBf[i] >= 1 << 30)
return fprintf(stderr, "%s: record > 1 GiB\n", V0), 6;
nBf[i] *= 2;
if (!(tmp = realloc(buf[i], nBf[i])))
return fprintf(stderr,"%s: out of memory\n", V0), 7;
buf[i] = tmp;
}
}
fdRd = fdRdMaster;
}
for (i = 0; i < N; i++) //Ensure any residual data is..
if (len[i] > 0) { //..labeled,bracketed,=>stderr.
fprintf(stderr, "%s: %s: final unterminated record: {", V0, V[i]);
fwrite(buf[i], len[i], 1, stderr);
fputs("}\n", stderr);
}
return 0;
}
Установка - это вырезать и вставить & cc -Owhatever tailpipes.c -o somewhere-in-$PATH/tailpipes
. Протестировано на Linux и FreeBSD. Я получаю около 2500e6 байт / сек, но память может быть быстрее, чем блок 500e6 байт / сек.
Алгоритм примерно такой, как предлагается, но более общий. O_NONBLOCK требуется только с O_RDONLY и с некоторыми опциями для простоты использования, такими как открытие FIFO O_RDWR по умолчанию, чтобы авторы могли закрывать и открывать много раз и использовать -p PID-отслеживание для протокола без гонки. Вы можете передать -oRO, чтобы использовать EOF, если хотите. tailpipes
также обрабатывает неполные строки при завершении программы, отправляя их с метками и заключая в квадратные скобки в stderr в случае простой постобработки, которую можно выполнить, чтобы сделать записи целыми или если их журналы будут полезны для отладки.
Пример использования. GNU xargs
может быть частью для одного потребителя, с несколькими производителями / разветвителями в параллельном конвейере map- Reduce-Ish с tailpipes
работающими в качестве разветвителя для записи границ записи, и все без дискового пространства, используемого для временных файлов :
export MYTEMP=$(mktemp -d /tmp/MYPROG.XXXXX)
FIFOs=`n=0; while [ $n -lt 8 ]; do echo $MYTEMP/$n; n=$((n+1)); done`
mkfifo $FIFOs
sleep 2147483647 & p=$! #Cannot know xargs pid is good for long
( find . -print0 | xargs -0 -P8 --process-slot-var=MYSLOT MYPROGRAM
kill $p ) & #Inform tailpipes writers are done
tailpipes -p$p $FIFOs | CONSUMING-PIPELINE
rm -rf $MYTEMP
wait #Wait for xargs subshell to finish
В приведенном выше описании важно, чтобы A) n
переходил от 0
к соответствующей верхней границе, поскольку это схема, которую xargs
использует для MYSLOT
, и B) MYPROGRAM
направляет свои выходные данные во вновь назначенный файл с ключом $MYSLOT
такой как $MYTEMP/$MYSLOT
, например, exec > $MYTEMP/$MYSLOT
если MYPROGRAM
является сценарием оболочки. Оболочка оболочки / программы может быть исключена во многих случаях, если xargs
использует гипотетический --process-slot-out
для настройки своих дочерних stdouts.