This commit is contained in:
Axel Kittenberger 2010-10-19 20:14:55 +00:00
parent 7fd0ddadab
commit edab583e92
2 changed files with 157 additions and 125 deletions

258
lsyncd.c
View File

@ -47,10 +47,6 @@
#define time_after_eq(a,b) ((long)(a) - (long)(b) >= 0)
#define time_before_eq(a,b) time_after_eq(b,a)
/**
* Number of inotifies to read max. at once from the kernel.
*/
#define INOTIFY_BUF_LEN (64 * (sizeof(struct inotify_event) + 16))
/**
* The Lua part of lsyncd.
@ -117,6 +113,20 @@ s_malloc(size_t size)
return r;
}
/**
* "secured" realloc.
*/
void *
s_realloc(void *ptr, size_t size)
{
void *r = realloc(ptr, size);
if (r == NULL) {
printf("Out of memory!\n");
exit(-1);
}
return r;
}
/**
* "secured" strdup.
*/
@ -132,7 +142,6 @@ s_strdup(const char *src)
}
/*****************************************************************************
* Library calls for lsyncd.lua
*
@ -351,6 +360,103 @@ l_terminate(lua_State *L)
return 0;
}
/**
* Waits after startup for a table of child processes.
*
* @param (Lua stack) a table of the children pids.
* @param (Lua stack) a function of a collector to be called
* when a child finishes.
*/
void
l_wait_pids(lua_State *L)
{
/* the number of pids in table */
int pidn;
/* the pid table */
int *pids;
/* the number of children to be waited for */
int remaining = 0;
int i;
/* global function to call on finished processes */
const char * collector;
/* checks if Lua script returned a table */
luaL_checktype(L, 1, LUA_TTABLE);
if (lua_type(L, 2) == LUA_TNIL) {
collector = NULL;
} else {
collector = luaL_checkstring(L, 2);
}
/* determines size of the pid-table */
pidn = lua_objlen (L, 1);
if (pidn == 0) {
/* nothing to do on zero pids */
return;
}
/* reads the pid table from Lua stack */
pids = s_calloc(pidn, sizeof(int));
for(i = 0; i < pidn; i++) {
lua_rawgeti(L, 1, i + 1);
pids[i] = luaL_checkinteger(L, -1);
lua_pop(L, 1);
/* ignores zero pids */
if (pids[i]) {
remaining++;
}
}
/* starts waiting for the children */
while(remaining) {
/* argument for waitpid, and exitcode of child */
int status, exitcode;
/* new process id in case of retry */
int newp;
/* process id of terminated child process */
int wp = waitpid(0, &status, 0);
/* if nothing really finished ignore */
if (wp == 0 || !WIFEXITED(status)) {
continue;
}
exitcode = WEXITSTATUS(status);
/* checks if the pid is one waited for */
for(i = 0; i < pidn; i++) {
if (pids[i] == wp) {
break;
}
}
if (i >= pidn) {
/* not a pid waited for */
continue;
}
/* calls the lua collector to determine further actions */
if (collector) {
lua_getglobal(L, collector);
lua_pushinteger(L, wp);
lua_pushinteger(L, exitcode);
lua_call(L, 2, 1);
newp = luaL_checkinteger(L, -1);
lua_pop(L, 1);
} else {
newp = 0;
}
/* replace the new pid in the pidtable,
or zero it on no new pid */
for(i = 0; i < pidn; i++) {
if (pids[i] == wp) {
pids[i] = newp;
if (newp == 0) {
remaining--;
}
/* does not break, in case there are duplicate pids (whyever) */
}
}
}
free(pids);
}
static const luaL_reg lsyncdlib[] = {
{"add_watch", l_add_watch },
{"now", l_now },
@ -359,6 +465,7 @@ static const luaL_reg lsyncdlib[] = {
{"stackdump", l_stackdump },
{"sub_dirs", l_sub_dirs },
{"terminate", l_terminate },
{"wait_pids", l_wait_pids },
{NULL, NULL}
};
@ -403,94 +510,13 @@ get_settings(lua_State *L)
lua_pop(L, 1);
}
/**
* Waits after startup for all children.
*
* @param (Lua stack) a table of the children pids.
* Buffer for MOVE_FROM events.
* Lsyncd buffers MOVE_FROM events to check if
*/
void
wait_startup(lua_State *L)
{
/* the number of pids in table */
int pidn;
/* the pid table */
int *pids;
/* the number of children to be waited for */
int remaining = 0;
int i;
/* checks if Lua script returned a table */
if (lua_type(L, 1) == LUA_TNIL) {
printf("Lua function startup did not return a pidtable!\n");
exit(-1); // ERRNO
}
/* determines size of the pid-table */
pidn = lua_objlen (L, -1);
if (pidn == 0) {
/* nothing to do on zero pids */
return;
}
/* reads the pid table from Lua stack */
pids = s_calloc(pidn, sizeof(int));
for(i = 0; i < pidn; i++) {
lua_rawgeti(L, -1, i + 1);
pids[i] = luaL_checkinteger(L, -1);
lua_pop(L, 1);
/* ignores zero pids */
if (pids[i]) {
remaining++;
}
}
/* since contents are copied into pids[] pop the lua table */
lua_pop(L, 1);
/* starts waiting for the children */
while(remaining) {
/* argument for waitpid, and exitcode of child */
int status, exitcode;
/* new process id in case of retry */
int newp;
/* process id of terminated child process */
int wp = waitpid(0, &status, 0);
/* if nothing really finished ignore */
if (wp == 0 || !WIFEXITED(status)) {
continue;
}
exitcode = WEXITSTATUS(status);
/* checks if the pid is one waited for */
for(i = 0; i < pidn; i++) {
if (pids[i] == wp) {
break;
}
}
if (i >= pidn) {
/* not a pid waited for */
continue;
}
/* calls the lua script to determine what to do on child failure */
lua_getglobal(L, "startup_returned");
lua_pushinteger(L, wp);
lua_pushinteger(L, exitcode);
lua_call(L, 2, 1);
newp = luaL_checkinteger(L, -1);
lua_pop(L, 1);
/* replace the new pid in the pidtable,
or zero it on no new pid */
for(i = 0; i < pidn; i++) {
if (pids[i] == wp) {
pids[i] = newp;
if (newp == 0) {
remaining--;
}
/* does not break, in case there are duplicate pids (whyever) */
}
}
}
free(pids);
}
struct inotify_event * move_event_buf = NULL;
size_t move_event_buf_size = 0;
/**
* Handles an inotify event.
@ -526,7 +552,15 @@ void handle_event(lua_State *L, struct inotify_event *event) {
printf("OPEN id=%d mask=%d cookie=%d name=%s\n", event->wd, event->mask, event->cookie, event->name);
}
if (IN_MOVED_FROM & event->mask) {
/* special case, buffers this event, and wait if next event is a matching
* MOVED_TO of this was an unary move out of the watched tree. */
if (move_event_buf_size < sizeof(struct inotify_event) + event->len) {
move_event_buf_size = sizeof(struct inotify_event) + event->len;
move_event_buf = s_realloc(move_event_buf, move_event_buf_size);
}
memcpy(move_event_buf, event, sizeof(struct inotify_event) + event->len);
printf("MOVED_FROM id=%d mask=%d cookie=%d name=%s\n", event->wd, event->mask, event->cookie, event->name);
}
if (IN_MOVED_TO & event->mask) {
printf("MOVED_TO id=%d mask=%d cookie=%d name=%s\n", event->wd, event->mask, event->cookie, event->name);
@ -557,8 +591,9 @@ void handle_event(lua_State *L, struct inotify_event *event) {
void
masterloop(lua_State *L)
{
size_t readbuf_size = 2048;
char *readbuf = s_malloc(readbuf_size);
while(!reset) {
char readbuf[INOTIFY_BUF_LEN];
int alarm_state;
clock_t now = times(NULL);
clock_t alarm_time;
@ -575,15 +610,15 @@ masterloop(lua_State *L)
if (alarm_state < 0) {
/* there is a delay that wants to be handled already */
/* thus do not read from inotify_fd and jump directly to its handling */
/* there is a delay that wants to be handled already
* thus do not read from inotify_fd and jump directly to its handling */
printf("core: immediately handling delayed entries\n");
do_read = 0;
} else if (alarm_state > 0) {
/* use select() to determine what happens next */
/* + a new event on inotify */
/* + an alarm on timeout */
/* + the return of a child process */
/* use select() to determine what happens next
* + a new event on inotify
* + an alarm on timeout
* + the return of a child process */
fd_set readfds;
struct timeval tv;
@ -595,8 +630,8 @@ masterloop(lua_State *L)
tv.tv_sec = (alarm_time - now) / clocks_per_sec;
tv.tv_usec = (alarm_time - now) * 1000000 / clocks_per_sec % 1000000;
/* if select returns a positive number there is data on inotify */
/* on zero the timemout occured. */
/* if select returns a positive number there is data on inotify *
* on zero the timemout occured. */
FD_ZERO(&readfds);
FD_SET(inotify_fd, &readfds);
do_read = select(inotify_fd + 1, &readfds, NULL, NULL, &tv);
@ -616,7 +651,19 @@ masterloop(lua_State *L)
do {
int i = 0;
if (do_read) {
len = read (inotify_fd, readbuf, INOTIFY_BUF_LEN);
do {
len = read (inotify_fd, readbuf, readbuf_size);
if (len < 0 && errno == EINVAL) {
/* kernel > 2.6.21 indicates that way that way that
* the buffer was too small to fit a filename.
* double its size and try again. When using a lower
* kernel and a filename > 2KB appears lsyncd
* will fail. (but does a 2KB filename really happen?)*/
readbuf_size *= 2;
readbuf = s_realloc(readbuf, readbuf_size);
continue;
}
} while(0);
} else {
len = 0;
}
@ -725,15 +772,8 @@ main(int argc, char *argv[])
/* load core settings into core */
get_settings(L);
/* startup */
/* lua code will perform startup calls like recursive rsync */
/* let lua code will perform startup calls like recursive rsync */
lua_getglobal(L, "startup");
lua_call(L, 0, 1);
/* wait for children spawned at startup */
wait_startup(L);
/* enter normal operation */
lua_getglobal(L, "normalop");
lua_call(L, 0, 0);
masterloop(L);

View File

@ -149,10 +149,10 @@ on_move_self = default_event
-----
-- Called by core after initialization.
--
-- Returns a table of integers (pid of children) the core will
-- wait for before entering normal operation.
-- Default function will start an simultanous action for every
-- source -> destination pair. And waits for these processes to finish
--
-- User can override this function by specifing his/her own
-- The user can override this function by specifing his/her own
-- "startup". (and yet may still call default startup)
--
function default_startup()
@ -164,7 +164,8 @@ function default_startup()
pid = lsyncd.exec("/usr/bin/rsync", "-ltrs", o.source, o.targetpath)
table.insert(pids, pid)
end
return pids
lsyncd.wait_pids(pids, "startup_collector")
print("--- Entering normal operation with " .. #watches .. " monitored directories ---")
end
startup = default_startup
@ -174,27 +175,18 @@ startup = default_startup
--
-- Parameters are pid and exitcode of child process
--
-- Can returns either a new pid if another child process
-- Can return either a new pid if one other child process
-- has been spawned as replacement (e.g. retry) or 0 if
-- finished/ok.
--
function default_startup_returned(pid, exitcode)
function default_startup_collector(pid, exitcode)
if exitcode ~= 0 then
print("Startup process", pid, " failed")
lsyncd.terminate(-1) -- ERRNO
end
return 0
end
startup_returned = default_startup_returned
-----
-- Called by core after startup phase when finished waiting for
-- children spawned at startup.
--
function default_normalop()
print("--- Entering normal operation with " .. #watches .. " monitored directories ---")
end
normalop = default_normalop
startup_collector = default_startup_collector
----
-- other functions the user might want to use