From edab583e9206c733146e1a63b7c4d24dc35c65af Mon Sep 17 00:00:00 2001 From: Axel Kittenberger Date: Tue, 19 Oct 2010 20:14:55 +0000 Subject: [PATCH] --- lsyncd.c | 258 +++++++++++++++++++++++++++++++---------------------- lsyncd.lua | 24 ++--- 2 files changed, 157 insertions(+), 125 deletions(-) diff --git a/lsyncd.c b/lsyncd.c index a579178..b253279 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -47,10 +47,6 @@ #define time_after_eq(a,b) ((long)(a) - (long)(b) >= 0) #define time_before_eq(a,b) time_after_eq(b,a) -/** - * Number of inotifies to read max. at once from the kernel. - */ -#define INOTIFY_BUF_LEN (64 * (sizeof(struct inotify_event) + 16)) /** * The Lua part of lsyncd. @@ -117,6 +113,20 @@ s_malloc(size_t size) return r; } +/** + * "secured" realloc. + */ +void * +s_realloc(void *ptr, size_t size) +{ + void *r = realloc(ptr, size); + if (r == NULL) { + printf("Out of memory!\n"); + exit(-1); + } + return r; +} + /** * "secured" strdup. */ @@ -132,7 +142,6 @@ s_strdup(const char *src) } - /***************************************************************************** * Library calls for lsyncd.lua * @@ -351,6 +360,103 @@ l_terminate(lua_State *L) return 0; } +/** + * Waits after startup for a table of child processes. + * + * @param (Lua stack) a table of the children pids. + * @param (Lua stack) a function of a collector to be called + * when a child finishes. + */ +void +l_wait_pids(lua_State *L) +{ + /* the number of pids in table */ + int pidn; + /* the pid table */ + int *pids; + /* the number of children to be waited for */ + int remaining = 0; + int i; + /* global function to call on finished processes */ + const char * collector; + /* checks if Lua script returned a table */ + luaL_checktype(L, 1, LUA_TTABLE); + if (lua_type(L, 2) == LUA_TNIL) { + collector = NULL; + } else { + collector = luaL_checkstring(L, 2); + } + + /* determines size of the pid-table */ + pidn = lua_objlen (L, 1); + if (pidn == 0) { + /* nothing to do on zero pids */ + return; + } + /* reads the pid table from Lua stack */ + pids = s_calloc(pidn, sizeof(int)); + for(i = 0; i < pidn; i++) { + lua_rawgeti(L, 1, i + 1); + pids[i] = luaL_checkinteger(L, -1); + lua_pop(L, 1); + /* ignores zero pids */ + if (pids[i]) { + remaining++; + } + } + /* starts waiting for the children */ + while(remaining) { + /* argument for waitpid, and exitcode of child */ + int status, exitcode; + /* new process id in case of retry */ + int newp; + /* process id of terminated child process */ + int wp = waitpid(0, &status, 0); + + /* if nothing really finished ignore */ + if (wp == 0 || !WIFEXITED(status)) { + continue; + } + + exitcode = WEXITSTATUS(status); + /* checks if the pid is one waited for */ + for(i = 0; i < pidn; i++) { + if (pids[i] == wp) { + break; + } + } + if (i >= pidn) { + /* not a pid waited for */ + continue; + } + /* calls the lua collector to determine further actions */ + if (collector) { + lua_getglobal(L, collector); + lua_pushinteger(L, wp); + lua_pushinteger(L, exitcode); + lua_call(L, 2, 1); + newp = luaL_checkinteger(L, -1); + lua_pop(L, 1); + } else { + newp = 0; + } + + /* replace the new pid in the pidtable, + or zero it on no new pid */ + for(i = 0; i < pidn; i++) { + if (pids[i] == wp) { + pids[i] = newp; + if (newp == 0) { + remaining--; + } + /* does not break, in case there are duplicate pids (whyever) */ + } + } + } + free(pids); +} + + static const luaL_reg lsyncdlib[] = { {"add_watch", l_add_watch }, {"now", l_now }, @@ -359,6 +465,7 @@ static const luaL_reg lsyncdlib[] = { {"stackdump", l_stackdump }, {"sub_dirs", l_sub_dirs }, {"terminate", l_terminate }, + {"wait_pids", l_wait_pids }, {NULL, NULL} }; @@ -403,94 +510,13 @@ get_settings(lua_State *L) lua_pop(L, 1); } + /** - * Waits after startup for all children. - * - * @param (Lua stack) a table of the children pids. + * Buffer for MOVE_FROM events. + * Lsyncd buffers MOVE_FROM events to check if */ -void -wait_startup(lua_State *L) -{ - /* the number of pids in table */ - int pidn; - /* the pid table */ - int *pids; - /* the number of children to be waited for */ - int remaining = 0; - int i; - /* checks if Lua script returned a table */ - if (lua_type(L, 1) == LUA_TNIL) { - printf("Lua function startup did not return a pidtable!\n"); - exit(-1); // ERRNO - } - /* determines size of the pid-table */ - pidn = lua_objlen (L, -1); - if (pidn == 0) { - /* nothing to do on zero pids */ - return; - } - /* reads the pid table from Lua stack */ - pids = s_calloc(pidn, sizeof(int)); - for(i = 0; i < pidn; i++) { - lua_rawgeti(L, -1, i + 1); - pids[i] = luaL_checkinteger(L, -1); - lua_pop(L, 1); - /* ignores zero pids */ - if (pids[i]) { - remaining++; - } - } - /* since contents are copied into pids[] pop the lua table */ - lua_pop(L, 1); - - /* starts waiting for the children */ - while(remaining) { - /* argument for waitpid, and exitcode of child */ - int status, exitcode; - /* new process id in case of retry */ - int newp; - /* process id of terminated child process */ - int wp = waitpid(0, &status, 0); - - /* if nothing really finished ignore */ - if (wp == 0 || !WIFEXITED(status)) { - continue; - } - - exitcode = WEXITSTATUS(status); - /* checks if the pid is one waited for */ - for(i = 0; i < pidn; i++) { - if (pids[i] == wp) { - break; - } - } - if (i >= pidn) { - /* not a pid waited for */ - continue; - } - /* calls the lua script to determine what to do on child failure */ - lua_getglobal(L, "startup_returned"); - lua_pushinteger(L, wp); - lua_pushinteger(L, exitcode); - lua_call(L, 2, 1); - newp = luaL_checkinteger(L, -1); - lua_pop(L, 1); - - /* replace the new pid in the pidtable, - or zero it on no new pid */ - for(i = 0; i < pidn; i++) { - if (pids[i] == wp) { - pids[i] = newp; - if (newp == 0) { - remaining--; - } - /* does not break, in case there are duplicate pids (whyever) */ - } - } - } - free(pids); -} - +struct inotify_event * move_event_buf = NULL; +size_t move_event_buf_size = 0; /** * Handles an inotify event. @@ -526,7 +552,15 @@ void handle_event(lua_State *L, struct inotify_event *event) { printf("OPEN id=%d mask=%d cookie=%d name=%s\n", event->wd, event->mask, event->cookie, event->name); } if (IN_MOVED_FROM & event->mask) { + /* special case, buffers this event, and wait if next event is a matching + * MOVED_TO of this was an unary move out of the watched tree. */ + if (move_event_buf_size < sizeof(struct inotify_event) + event->len) { + move_event_buf_size = sizeof(struct inotify_event) + event->len; + move_event_buf = s_realloc(move_event_buf, move_event_buf_size); + } + memcpy(move_event_buf, event, sizeof(struct inotify_event) + event->len); printf("MOVED_FROM id=%d mask=%d cookie=%d name=%s\n", event->wd, event->mask, event->cookie, event->name); + } if (IN_MOVED_TO & event->mask) { printf("MOVED_TO id=%d mask=%d cookie=%d name=%s\n", event->wd, event->mask, event->cookie, event->name); @@ -557,8 +591,9 @@ void handle_event(lua_State *L, struct inotify_event *event) { void masterloop(lua_State *L) { + size_t readbuf_size = 2048; + char *readbuf = s_malloc(readbuf_size); while(!reset) { - char readbuf[INOTIFY_BUF_LEN]; int alarm_state; clock_t now = times(NULL); clock_t alarm_time; @@ -575,15 +610,15 @@ masterloop(lua_State *L) if (alarm_state < 0) { - /* there is a delay that wants to be handled already */ - /* thus do not read from inotify_fd and jump directly to its handling */ + /* there is a delay that wants to be handled already + * thus do not read from inotify_fd and jump directly to its handling */ printf("core: immediately handling delayed entries\n"); do_read = 0; } else if (alarm_state > 0) { - /* use select() to determine what happens next */ - /* + a new event on inotify */ - /* + an alarm on timeout */ - /* + the return of a child process */ + /* use select() to determine what happens next + * + a new event on inotify + * + an alarm on timeout + * + the return of a child process */ fd_set readfds; struct timeval tv; @@ -595,8 +630,8 @@ masterloop(lua_State *L) tv.tv_sec = (alarm_time - now) / clocks_per_sec; tv.tv_usec = (alarm_time - now) * 1000000 / clocks_per_sec % 1000000; - /* if select returns a positive number there is data on inotify */ - /* on zero the timemout occured. */ + /* if select returns a positive number there is data on inotify * + * on zero the timemout occured. */ FD_ZERO(&readfds); FD_SET(inotify_fd, &readfds); do_read = select(inotify_fd + 1, &readfds, NULL, NULL, &tv); @@ -616,7 +651,19 @@ masterloop(lua_State *L) do { int i = 0; if (do_read) { - len = read (inotify_fd, readbuf, INOTIFY_BUF_LEN); + do { + len = read (inotify_fd, readbuf, readbuf_size); + if (len < 0 && errno == EINVAL) { + /* kernel > 2.6.21 indicates that way that way that + * the buffer was too small to fit a filename. + * double its size and try again. When using a lower + * kernel and a filename > 2KB appears lsyncd + * will fail. (but does a 2KB filename really happen?)*/ + readbuf_size *= 2; + readbuf = s_realloc(readbuf, readbuf_size); + continue; + } + } while(0); } else { len = 0; } @@ -725,15 +772,8 @@ main(int argc, char *argv[]) /* load core settings into core */ get_settings(L); - /* startup */ - /* lua code will perform startup calls like recursive rsync */ + /* let lua code will perform startup calls like recursive rsync */ lua_getglobal(L, "startup"); - lua_call(L, 0, 1); - /* wait for children spawned at startup */ - wait_startup(L); - - /* enter normal operation */ - lua_getglobal(L, "normalop"); lua_call(L, 0, 0); masterloop(L); diff --git a/lsyncd.lua b/lsyncd.lua index dec700e..e2a4e1d 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -149,10 +149,10 @@ on_move_self = default_event ----- -- Called by core after initialization. -- --- Returns a table of integers (pid of children) the core will --- wait for before entering normal operation. +-- Default function will start an simultanous action for every +-- source -> destination pair. And waits for these processes to finish -- --- User can override this function by specifing his/her own +-- The user can override this function by specifing his/her own -- "startup". (and yet may still call default startup) -- function default_startup() @@ -164,7 +164,8 @@ function default_startup() pid = lsyncd.exec("/usr/bin/rsync", "-ltrs", o.source, o.targetpath) table.insert(pids, pid) end - return pids + lsyncd.wait_pids(pids, "startup_collector") + print("--- Entering normal operation with " .. #watches .. " monitored directories ---") end startup = default_startup @@ -174,27 +175,18 @@ startup = default_startup -- -- Parameters are pid and exitcode of child process -- --- Can returns either a new pid if another child process +-- Can return either a new pid if one other child process -- has been spawned as replacement (e.g. retry) or 0 if -- finished/ok. -- -function default_startup_returned(pid, exitcode) +function default_startup_collector(pid, exitcode) if exitcode ~= 0 then print("Startup process", pid, " failed") lsyncd.terminate(-1) -- ERRNO end return 0 end -startup_returned = default_startup_returned - ------ --- Called by core after startup phase when finished waiting for --- children spawned at startup. --- -function default_normalop() - print("--- Entering normal operation with " .. #watches .. " monitored directories ---") -end -normalop = default_normalop +startup_collector = default_startup_collector ---- -- other functions the user might want to use