diff --git a/lsyncd-conf.lua b/lsyncd-conf.lua index 96f397f..817ce5d 100644 --- a/lsyncd-conf.lua +++ b/lsyncd-conf.lua @@ -26,6 +26,10 @@ slowbash = { event.source, event.target) end, + onModify = function(event) + spawn(event, "/home/axel/lsyncd2/in", "<", "tuhutu\n", "2") + end, + onCreate = function(event) local s = event.sourcePathname local t = event.targetPathname @@ -33,12 +37,12 @@ slowbash = { spawnShell(event, prefix..[[cp -r "$1" "$2"]], s, t) end, - onModify = function(event) - local s = event.sourcePathname - local t = event.targetPathname - log("Normal", "Spawning Modify ",s," -> ",t) - spawnShell(event, prefix..[[cp -r "$1" "$2"]], s, t) - end, +-- onModify = function(event) +-- local s = event.sourcePathname +-- local t = event.targetPathname +-- log("Normal", "Spawning Modify ",s," -> ",t) +-- spawnShell(event, prefix..[[cp -r "$1" "$2"]], s, t) +-- end, onDelete = function(event) local t = event.targetPathname diff --git a/lsyncd.c b/lsyncd.c index 2070d4b..aab09bf 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -393,6 +393,79 @@ s_strdup(const char *src) return s; } +/** + * Sets the close-on-exit flag for an fd + */ +static void +close_exec_fd(int fd) +{ + int flags; + flags = fcntl(fd, F_GETFD); + if (flags == -1) { + logstring("Error", "cannot get descriptor flags!"); + exit(-1); // ERRNO + } + flags |= FD_CLOEXEC; + if (fcntl(fd, F_SETFD, flags) == -1) { + logstring("Error", "cannot set descripptor flags!"); + exit(-1); // ERRNO + } +} + +/** + * Sets the non-blocking flag for an fd + */ +static void +non_block_fd(int fd) +{ + int flags; + flags = fcntl(fd, F_GETFL); + if (flags == -1) { + logstring("Error", "cannot get status flags!"); + exit(-1); // ERRNO + } + flags |= O_NONBLOCK;; + if (fcntl(fd, F_SETFL, flags) == -1) { + logstring("Error", "cannot set status flags!"); + exit(-1); // ERRNO + } +} + + +/** + * A child process gets text piped longer than on + * write() can manage. + */ +struct pipemsg { + /* pipe file descriptor */ + int fd; + + /* message to send */ + char *text; + + /* length of text */ + int tlen; + + /* position in message */ + int pos; +}; + +/** + * All pipes currently active. + */ +static struct pipemsg *pipes = NULL; + +/** + * amount of pipes allocated. + */ +size_t pipes_size = 0; + +/** + * number of pipes used. + */ +size_t pipes_len = 0; + + /***************************************************************************** * Library calls for lsyncd.lua * @@ -606,37 +679,31 @@ l_exec(lua_State *L) if (argc >= 2 && !strcmp(luaL_checkstring(L, 2), "<")) { /* pipes something into stdin */ - int flags; pipe_text = luaL_checkstring(L, 3); -printf("pipe_text = %s\n", pipe_text); /* creates the pipe */ if (pipe(pipefd) == -1) { logstring("Error", "cannot create a pipe!"); - terminate(-1); // ERRNO - } - - /* sets the write end of the pipe to close on exec */ - if (flags = fcntl(fd, F_GETFD) == -1) { - logstring("Error", "cannot get pipe flage!"); - terminate(-1); // ERRNO - } - flags |= FD_CLOEXEC; - if (fcntl(fd, F_SETFD, flags) == -1) { - logstring("Error", "cannot set pipe flage!"); - terminate(-1); // ERRNO + exit(-1); // ERRNO } + /* always close the write end for child processes */ + close_exec_fd(pipefd[1]); + /* set the write end on non-blocking */ + non_block_fd(pipefd[1]); argc -= 2; li += 2; } - argv = s_calloc(argc + 2, sizeof(char *)); - argv[0] = binary; - for(i = 1; i <= argc; i++) { - argv[i] = luaL_checkstring(L, i + li); + { + /* prepares the arguments */ + int i; + argv = s_calloc(argc + 2, sizeof(char *)); + argv[0] = binary; + for(i = 1; i <= argc; i++) { + argv[i] = luaL_checkstring(L, i + li); + } + argv[i] = NULL; } - argv[i] = NULL; - pid = fork(); if (pid == 0) { @@ -665,9 +732,33 @@ printf("pipe_text = %s\n", pipe_text); } if (pipe_text) { + int tlen = strlen(pipe_text); + int len; /* first closes read-end of pipe, this is for child process only */ close(pipefd[0]); - + /* start filling the pipe */ + len = write(pipefd[1], pipe_text, tlen); + if (len < 0) { + logstring("Normal", "broken pipe."); + close(pipefd[0]); + } + if (len == tlen) { + /* usual and best case, the pipe accepted all input -> close */ + close(pipefd[1]); + logstring("Exec", "one-sweeped pipe"); + } else { + logstring("Exec", "adding delayed pipe"); + int p = pipes_len; + pipes_len++; + if (pipes_len > pipes_size) { + pipes_size = pipes_len; + pipes = s_realloc(pipes, pipes_size*sizeof(struct pipemsg)); + } + pipes[p].fd = pipefd[1]; + pipes[p].tlen = tlen; + pipes[p].text = s_strdup(pipe_text); + } + close(pipefd[0]); } free(argv); lua_pushnumber(L, pid); @@ -1075,7 +1166,7 @@ masterloop(lua_State *L) bool have_alarm; clock_t now = times(NULL); clock_t alarm_time; - int do_read; + bool do_read = false; ssize_t len; /* queries runner about soonest alarm */ @@ -1096,16 +1187,12 @@ masterloop(lua_State *L) /* there is a delay that wants to be handled already thus do not * read from inotify_fd and jump directly to its handling */ logstring("Masterloop", "immediately handling delays."); - do_read = 0; } else { /* use select() to determine what happens next * + a new event on inotify * + an alarm on timeout * + the return of a child process */ - sigset_t sigset; - fd_set readfds; struct timespec tv; - sigemptyset(&sigset); if (have_alarm) { double d = ((double)(alarm_time - now)) / clocks_per_sec; @@ -1118,18 +1205,39 @@ masterloop(lua_State *L) } /* if select returns a positive number there is data on inotify * on zero the timemout occured. */ - FD_ZERO(&readfds); - FD_SET(inotify_fd, &readfds); - do_read = pselect(inotify_fd + 1, &readfds, NULL, NULL, - have_alarm ? &tv : NULL, &sigset); + { + fd_set rfds; + fd_set wfds; + sigset_t sigset; + sigemptyset(&sigset); + int nfds = inotify_fd; + int pi; - logstring("Masterloop", do_read > 0 ? - "theres data on inotify." : - "core: select() timeout or signal."); + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_SET(inotify_fd, &rfds); + for(pi = 0; pi < pipes_len; pi++) { + int pfd = pipes[pi].fd; + nfds = pfd > nfds ? pfd : nfds; + FD_SET(pfd, &wfds); + } + + /* reuse pi for result */ + pi = pselect(nfds + 1, &rfds, NULL, NULL, + have_alarm ? &tv : NULL, &sigset); + if (pi >= 0) { + do_read = FD_ISSET(inotify_fd, &rfds); + } + if (do_read) { + logstring("Masterloop", do_read > 0 ? + "theres data on inotify." : + "core: select() timeout or signal."); + } + } } /* reads possible events from inotify stream */ - while(do_read > 0) { + while(do_read) { int i = 0; do { len = read (inotify_fd, readbuf, readbuf_size); @@ -1158,8 +1266,8 @@ masterloop(lua_State *L) FD_ZERO(&readfds); FD_SET(inotify_fd, &readfds); do_read = pselect(inotify_fd + 1, &readfds, - NULL, NULL, &tv, NULL); - if (do_read > 0) { + NULL, NULL, &tv, NULL) > 0; + if (do_read) { logstring("Masterloop", "there is more data on inotify."); } } @@ -1170,6 +1278,33 @@ masterloop(lua_State *L) handle_event(L, NULL); } + { + /* writes into pipes if any */ + int pi; + for(pi = 0; pi < pipes_len; pi++) { + struct pipemsg *pm = pipes + pi; + int len = write(pm->fd, pm->text + pm->pos, pm->tlen - pm->pos); + bool do_close = false; + pm->pos += len; + if (len < 0) { + logstring("Normal", "broken pipe."); + do_close = true; + } else if (pm->pos >= pm->tlen) { + logstring("Debug", "finished pipe."); + do_close = true; + } + if (do_close) { + close(pm->fd); + free(pm->text); + pipes_len--; + memmove(pipes + pi, pipes + pi + 1, + (pipes_len - pi) * sizeof(struct pipemsg)); + pi--; + continue; + } + } + } + /* collects zombified child processes */ while(1) { int status; @@ -1429,14 +1564,14 @@ main(int argc, char *argv[]) } /* opens inotify */ - inotify_fd = inotify_init1(IN_CLOEXEC); + inotify_fd = inotify_init(); if (inotify_fd == -1) { printlogf(L, "Error", "Cannot create inotify instance! (%d:%s)", errno, strerror(errno)); return -1; // ERRNO } - + close_exec_fd(inotify_fd); { /* adds signal handlers * diff --git a/lsyncd.lua b/lsyncd.lua index 5295e7c..d02754b 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -1474,8 +1474,8 @@ end -- @param ... arguments -- function spawn(agent, binary, ...) - if agent == nil then - error("spawning with a nil agent", 2) + if agent == nil or type(agent) ~= "table" then + error("spawning with an invalid agent", 2) end local pid = lsyncd.exec(binary, ...) if pid and pid > 0 then