From bc8dfc9d9594f9987f03fe57e594c117f3c401b5 Mon Sep 17 00:00:00 2001 From: Axel Kittenberger Date: Wed, 3 Nov 2010 11:37:25 +0000 Subject: [PATCH] --- lsyncd-conf.lua | 2 +- lsyncd.c | 24 ++-- lsyncd.lua | 329 +++++++++++++++++++++++++++--------------------- 3 files changed, 201 insertions(+), 154 deletions(-) diff --git a/lsyncd-conf.lua b/lsyncd-conf.lua index ca0614b..fe1a560 100644 --- a/lsyncd-conf.lua +++ b/lsyncd-conf.lua @@ -51,5 +51,5 @@ slowbash = { } -sync{source="s", target="d/", config=slowbash} +sync{slowbash, source="s", target="d/"} diff --git a/lsyncd.c b/lsyncd.c index db51393..b4341bc 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -189,11 +189,12 @@ logstring0(enum loglevel level, #define logstring(loglevel, message) logstring0(loglevel | CORE, message) static void -printlogf(lua_State *L, +printlogf0(lua_State *L, enum loglevel level, const char *fmt, ...) __attribute__((format(printf, 3, 4))); - +#define printlogf(L, level, ...) \ + printlogf0(L, level | CORE, __VA_ARGS__) /** * "secured" calloc. @@ -205,7 +206,7 @@ s_calloc(size_t nmemb, size_t size) if (r == NULL) { logstring(ERROR, "Out of memory!"); exit(-1); // ERRNO - } + } return r; } @@ -763,6 +764,7 @@ l_wait_pids(lua_State *L) lua_getglobal(L, collector); lua_pushinteger(L, wp); lua_pushinteger(L, exitcode); + printlogf(L, DEBUG, "calling startup collector"); lua_call(L, 2, 1); newp = luaL_checkinteger(L, -1); lua_pop(L, 1); @@ -847,7 +849,7 @@ static const luaL_reg lsyncdlib[] = { * Let the core print logmessage comfortably. */ static void -printlogf(lua_State *L, +printlogf0(lua_State *L, enum loglevel level, const char *fmt, ...) { @@ -860,8 +862,8 @@ printlogf(lua_State *L, va_start(ap, fmt); lua_pushvfstring(L, fmt, ap); va_end(ap); - logstring(level, luaL_checkstring(L, -1)); - lua_pop(L, -1); + logstring0(level, luaL_checkstring(L, -1)); + lua_pop(L, 1); return; } @@ -898,6 +900,7 @@ void handle_event(lua_State *L, struct inotify_event *event) { if (event && (IN_Q_OVERFLOW & event->mask)) { /* and overflow happened, lets runner/user decide what to do. */ lua_getglobal(L, "overflow"); + printlogf(L, DEBUG, "calling overflow()"); lua_call(L, 0, 0); return; } @@ -984,6 +987,7 @@ void handle_event(lua_State *L, struct inotify_event *event) { lua_pushstring(L, event->name); lua_pushnil(L); } + printlogf(L, DEBUG, "calling lysncd_inotify_event()"); lua_call(L, 6, 0); /* if there is a buffered event executes it */ @@ -1009,6 +1013,7 @@ masterloop(lua_State *L) /* query runner about soonest alarm */ lua_getglobal(L, "lsyncd_get_alarm"); + printlogf(L, DEBUG, "calling lsycnd_get_alarm()"); lua_call(L, 0, 2); have_alarm = lua_toboolean(L, -2); alarm_time = (clock_t) luaL_checkinteger(L, -1); @@ -1099,6 +1104,7 @@ masterloop(lua_State *L) lua_getglobal(L, "lsyncd_collect_process"); lua_pushinteger(L, pid); lua_pushinteger(L, WEXITSTATUS(status)); + printlogf(L, DEBUG, "calling lsyncd_collect_process()."); lua_call(L, 2, 0); } @@ -1115,9 +1121,12 @@ masterloop(lua_State *L) break; } /* calls the lua runner to write the status. */ + lua_getglobal(L, "lsyncd_call_error"); lua_getglobal(L, "lsyncd_status_report"); lua_pushinteger(L, fd); - lua_call(L, 1, 0); + printlogf(L, DEBUG, "calling lysncd_status_report()"); + lua_pcall(L, 1, 0, -3); + printlogf(L, DEBUG, "returnd()"); /* TODO */ fsync(fd); @@ -1128,6 +1137,7 @@ masterloop(lua_State *L) /* lets the runner spawn new processes */ lua_getglobal(L, "lsyncd_alarm"); lua_pushinteger(L, times(NULL)); + printlogf(L, DEBUG, "calling lsyncd_alarm()."); lua_call(L, 1, 0); } } diff --git a/lsyncd.lua b/lsyncd.lua index 35be70e..f359321 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -78,9 +78,11 @@ local CountArray = (function() -- Metatable local mt = {} + ----- -- key to native table local k_nt = {} + ----- -- on accessing a nil index. mt.__index = function(t, k) if type(k) ~= "number" then @@ -89,6 +91,7 @@ local CountArray = (function() return t[k_nt][k] end + ----- -- on assigning a new index. mt.__newindex = function(t, k, v) if type(k) ~= "number" then @@ -97,9 +100,9 @@ local CountArray = (function() -- value before local vb = t[k_nt][k] if v and not vb then - t.size = t.size + 1 + t._size = t._size + 1 elseif not v and vb then - t.size = t.size - 1 + t._size = t._size - 1 end t[k_nt][k] = v end @@ -108,11 +111,18 @@ local CountArray = (function() local function iwalk(self) return ipairs(self[k_nt]) end - + + ------ + -- returns the count + local function size(self) + return self._size + end + + ---- -- creates a new count array local function new() -- k_nt is native table, private for this object. - local o = {size = 0, iwalk = iwalk, [k_nt] = {} } + local o = {_size = 0, iwalk = iwalk, size = size, [k_nt] = {} } setmetatable(o, mt) return o end @@ -199,7 +209,7 @@ local Delay = (function() end)() ----- --- TODO +-- Holds information about one observed directory inclusively subdirs. -- local Origin = (function() ---- @@ -224,7 +234,7 @@ end)() ----- -- Puts an action on the delay stack. -- -function Origin.delay(origin, ename, time, pathname, pathname2) +function Origin:delay(origin, ename, time, pathname, pathname2) log("Debug", "delay ", ename, " ", pathname) local o = origin local delays = o.delays @@ -295,27 +305,36 @@ local Origins = (function() local list = Array.new() ----- - -- inheritly copies all from a configs 'config' key + -- inheritly copies all non integer keys from -- @cd copy destination + -- to -- @cs copy source + -- all integer keys are treated as new copy sources + -- local function inherit(cd, cs) - if cs.config then - -- recurse into source - inherit(cd, cs.config) + -- first recurses into all integer keyed tables + for i, v in ipairs(cs) do + if type(v) == "table" then + inherit(cd, v) + end end + -- does nothing further if source == destination (first step) + if (cd == cs) then + return + end + -- then copies from the source all non integer keyed values for k, v in pairs(cs) do - if k ~= "config" and not cd[k] then + if type(k) ~= "number" and not cd[k] then cd[k] = v end end end ----- - -- adds a configuration - local add = function(config) - if config.config then - inherit(config, config.config) - end + -- Adds a new directory to observe. + -- + local function add(config) + inherit(config, config) -- raises an error if 'name' isnt in opts local function require_opt(name) @@ -358,7 +377,6 @@ local Origins = (function() optional("action") optional("max_processes") optional("collapse_table") - local o = Origin.new(config.source, config.target, config) table.insert(list, o) end @@ -378,79 +396,151 @@ local Origins = (function() end)() ----- --- inotifies +-- Interface to inotify, watches recursively subdirs and +-- sends events. -- --- contains all inotify watches. +-- All inotify specific implementation should be enclosed here. +-- So lsyncd can work with other notifications mechanisms just +-- by changing this. -- --- structure: --- a list indexed by watch descriptor --- [wd] --- of a numeric list of all origins watching this dir. --- [#] --- of inotify { --- .origin .. link to origin --- .path .. relative path of dir --- } --- } --- -local inotifies = CountArray.new() -local proto_inotify = {origin=true, path=true} +local Inotifies = (function() + ----- + -- A list indexed by inotifies watch descriptor. + -- Contains a list of all origins observing this directory + -- (directly or by recurse) + local wdlist = CountArray.new() + + ----- + -- Adds watches for a directory including all subdirectories. + -- + -- @param path relative path of this dir to origin + -- @param origin link to the observer to be notified. + -- Note: Inotifies should handle this opaquely + -- @param recurse true if recursing into subdirs + -- + local function add(path, origin, recurse) + -- register watch and receive watch descriptor + local wd = lsyncd.add_watch(path); + if wd < 0 then + -- failed adding the watch + log("Error", "Failure adding watch ", path, " -> ignored ") + return + end + + local ilist = wdlist[wd] + if not ilist then + ilist = Array.new() + wdlist[wd] = ilist + end + local inotify = { path = path, origin = origin, recurse = recurse} + table.insert(ilist, inotify) + + -- registers and adds watches for all subdirectories + if recurse then + local subdirs = lsyncd.sub_dirs(path) + for _, dirname in ipairs(subdirs) do + add(path..dirname.."/", origin, true) + end + end + end + + ----- + -- Called when an event has occured. + -- + -- @param ename "Attrib", "Mofify", "Create", "Delete", "Move") + -- @param wd watch descriptor (matches lsyncd.add_watch()) + -- @param isdir true if filename is a directory + -- @param time time of event + -- @param filename string filename without path + -- @param filename2 + -- + function event(ename, wd, isdir, time, filename, filename2) + local ftype; + if isdir then + ftype = "directory" + else + ftype = "file" + end + if filename2 then + log("Debug", "got event ", ename, " of ", ftype, " ", filename, + " to ", filename2) + else + log("Debug", "got event ", ename, " of ", ftype, " ", filename) + end + + local ilist = wdlist[wd] + -- looks up the watch descriptor id + if not ilist then + -- this is normal in case of deleted subdirs + log("Normal", "event belongs to unknown/deleted watch descriptor.") + return + end + + -- works through all observers interested in this directory + for _, inotify in ipairs(ilist) do + local pathname = inotify.path .. filename + local pathname2 + if filename2 then + pathname2 = inotify.path..filename2 + end + Origin.delay(inotify.origin, ename, time, pathname, pathname2) + -- add subdirs for new directories + if inotify.recurse and isdir then + if ename == "Create" then + add(pathname.."/", inotify.origin, true) + -- TODO remove / + end + end + end + end + + ----- + -- Writes a status report about inotifies to a filedescriptor + -- + local function status_report(fd) + local w = lsyncd.writefd + w(fd, "Watching ", wdlist:size(), " directories\n") + for wd, v in wdlist:iwalk() do + w(fd, " ", wd, ": ") + for _, v in ipairs(v) do +print(v.origin) +print(v.path) +print(v.origin.source) + w(fd, "(", v.origin.source, "/", (v.path) or ")") + end + w(fd, "\n") + end + end + + ----- + -- Returns the number of directories watched in total. + local function size() + return wdlist:size() + end + + -- public interface + return { add = add, size = size, status_report = status_report } +end)() + +--============================================================================ +-- lsyncd runner plugs. These functions will be called from core. +--============================================================================ ----- --- A list of names of the event types the core sends. --- (Also makes sure the strings are not collected) +-- Called from core whenever a lua failed. -- -local valid_events = { - Attrib = true, - Modify = true, - Create = true, - Delete = true, - Move = true, - MoveFrom = true, - MoveTo = true, -} - ---============================================================================ --- The lsyncd runner ---============================================================================ - - ----- --- Adds watches for a directory including all subdirectories. --- --- @param origin link to origins[] entry --- @param path relative path of this dir to origin --- @param parent link to parent directory in watches[] --- -local function inotify_watch_dir(origin, path) - local op = origin.source .. path - -- register watch and receive watch descriptor - local wd = lsyncd.add_watch(op); - if wd < 0 then - -- failed adding the watch - log("Error", "Failure adding watch ", op, " -> ignored ") - return - end - - local ilist = inotifies[wd] - if not ilist then - ilist = Array.new() - inotifies[wd] = ilist - end - local inotify = { origin = origin, path = path } - set_prototype(inotify, proto_inotify) - table.insert(ilist, inotify) - - -- on a warmstart add a Create for the directory - if not origin.config.startup then - -- TODO BROKEN - origin:delay("Create", sync, nil, nil, nil) - end - - -- registers and adds watches for all subdirectories - local subdirs = lsyncd.sub_dirs(op) - for _, dirname in ipairs(subdirs) do - inotify_watch_dir(origin, path..dirname.."/") +function lsyncd_call_error(message) + log("Error", "IN LUA: ", message) + -- prints backtrace + local level = 2 + while true do + local info = debug.getinfo(level, "Sl") + if not info then + terminate(-1) -- ERRNO + end + log("Error", "Backtrace ", level - 1, " :", + info.short_src, ":", info.currentline) + level = level + 1 end end @@ -533,14 +623,7 @@ end function lsyncd_status_report(fd) local w = lsyncd.writefd w(fd, "Lsyncd status report at ", os.date(), "\n\n") - w(fd, "Watching ", inotifies.size, " directories\n") - for wd, v in inotifies:iwalk() do - w(fd, " ", wd, ": ") - for _, inotify in ipairs(v) do - w(fd, "(", inotify.origin.source, "/", (inotify.path) or ")") - end - w(fd, "\n") - end + Inotifies.status_report(fd) end ---- @@ -553,7 +636,7 @@ function lsyncd_alarm(now) -- goes through all targets and spawns more actions -- if possible for _, o in Origins.iwalk() do - if o.processes.size < o.config.max_processes then + if o.processes:size() < o.config.max_processes then local delays = o.delays local d = delays[1] if d and lsyncd.before_eq(d.alarm, now) then @@ -651,10 +734,10 @@ function lsyncd_initialize(args) have_startup = true end -- adds the dir watch inclusively all subdirs - inotify_watch_dir(o, "") + Inotifies.add(o.source, "", true) end - -- from this point on use logging facilities as configured. + -- from now on use logging as configured instead of stdout/err. lsyncd.configure("running"); if have_startup then @@ -669,10 +752,10 @@ function lsyncd_initialize(args) end lsyncd.wait_pids(pids, "startup_collector") log("Normal", "- Entering normal operation with ", - inotifies.size, " monitored directories -") + Inotifies.size(), " monitored directories -") else log("Normal", "- Warmstart into normal operation with ", - inotifies.size, " monitored directories -") + Inotifies.size(), " monitored directories -") end end @@ -688,7 +771,7 @@ function lsyncd_get_alarm() local alarm = 0 for _, o in Origins.iwalk() do if o.delays[1] and - o.processes.size < o.config.max_processes then + o.processes:size() < o.config.max_processes then if have_alarm then alarm = lsyncd.earlier(alarm, o.delays[1].alarm) else @@ -700,53 +783,7 @@ function lsyncd_get_alarm() return have_alarm, alarm end ------ --- Called by core on inotify event --- --- @param ename "Attrib", "Mofify", "Create", "Delete", "Move") --- @param wd watch descriptor (matches lsyncd.add_watch()) --- @param time time of event --- @param filename string filename without path --- @param filename2 --- -function lsyncd_inotify_event(ename, wd, isdir, time, filename, filename2) - local ftype; - if isdir then - ftype = "directory" - else - ftype = "file" - end - if filename2 then - log("Debug", "got event ", ename, " of ", ftype, " ", filename, - " to ", filename2) - else - log("Debug", "got event ", ename, " of ", ftype, " ", filename) - end - - -- looks up the watch descriptor id - local ilist = inotifies[wd] - if not ilist then - log("Normal", "event belongs to unknown or deleted watch descriptor.") - return - end - - -- works through all possible source->target pairs - for _, inotify in ipairs(ilist) do - local pathname2 - if filename2 then - pathname2 = inotify.path..filename2 - end - Origin.delay(inotify.origin, ename, time, - inotify.path..filename, pathname2) - -- add subdirs for new directories - if isdir then - if ename == "Create" then - inotify_watch_dir(inotify.origin, - inotify.path .. filename .. "/") - end - end - end -end +lsyncd_inotify_event = Inotifies.event ----- -- Collector for every child process that finished in startup phase