This commit is contained in:
Axel Kittenberger 2010-11-03 11:37:25 +00:00
parent 5b7bffd303
commit bc8dfc9d95
3 changed files with 201 additions and 154 deletions

View File

@ -51,5 +51,5 @@ slowbash = {
} }
sync{source="s", target="d/", config=slowbash} sync{slowbash, source="s", target="d/"}

View File

@ -189,11 +189,12 @@ logstring0(enum loglevel level,
#define logstring(loglevel, message) logstring0(loglevel | CORE, message) #define logstring(loglevel, message) logstring0(loglevel | CORE, message)
static void static void
printlogf(lua_State *L, printlogf0(lua_State *L,
enum loglevel level, enum loglevel level,
const char *fmt, ...) const char *fmt, ...)
__attribute__((format(printf, 3, 4))); __attribute__((format(printf, 3, 4)));
#define printlogf(L, level, ...) \
printlogf0(L, level | CORE, __VA_ARGS__)
/** /**
* "secured" calloc. * "secured" calloc.
@ -763,6 +764,7 @@ l_wait_pids(lua_State *L)
lua_getglobal(L, collector); lua_getglobal(L, collector);
lua_pushinteger(L, wp); lua_pushinteger(L, wp);
lua_pushinteger(L, exitcode); lua_pushinteger(L, exitcode);
printlogf(L, DEBUG, "calling startup collector");
lua_call(L, 2, 1); lua_call(L, 2, 1);
newp = luaL_checkinteger(L, -1); newp = luaL_checkinteger(L, -1);
lua_pop(L, 1); lua_pop(L, 1);
@ -847,7 +849,7 @@ static const luaL_reg lsyncdlib[] = {
* Let the core print logmessage comfortably. * Let the core print logmessage comfortably.
*/ */
static void static void
printlogf(lua_State *L, printlogf0(lua_State *L,
enum loglevel level, enum loglevel level,
const char *fmt, ...) const char *fmt, ...)
{ {
@ -860,8 +862,8 @@ printlogf(lua_State *L,
va_start(ap, fmt); va_start(ap, fmt);
lua_pushvfstring(L, fmt, ap); lua_pushvfstring(L, fmt, ap);
va_end(ap); va_end(ap);
logstring(level, luaL_checkstring(L, -1)); logstring0(level, luaL_checkstring(L, -1));
lua_pop(L, -1); lua_pop(L, 1);
return; return;
} }
@ -898,6 +900,7 @@ void handle_event(lua_State *L, struct inotify_event *event) {
if (event && (IN_Q_OVERFLOW & event->mask)) { if (event && (IN_Q_OVERFLOW & event->mask)) {
/* and overflow happened, lets runner/user decide what to do. */ /* and overflow happened, lets runner/user decide what to do. */
lua_getglobal(L, "overflow"); lua_getglobal(L, "overflow");
printlogf(L, DEBUG, "calling overflow()");
lua_call(L, 0, 0); lua_call(L, 0, 0);
return; return;
} }
@ -984,6 +987,7 @@ void handle_event(lua_State *L, struct inotify_event *event) {
lua_pushstring(L, event->name); lua_pushstring(L, event->name);
lua_pushnil(L); lua_pushnil(L);
} }
printlogf(L, DEBUG, "calling lysncd_inotify_event()");
lua_call(L, 6, 0); lua_call(L, 6, 0);
/* if there is a buffered event executes it */ /* if there is a buffered event executes it */
@ -1009,6 +1013,7 @@ masterloop(lua_State *L)
/* query runner about soonest alarm */ /* query runner about soonest alarm */
lua_getglobal(L, "lsyncd_get_alarm"); lua_getglobal(L, "lsyncd_get_alarm");
printlogf(L, DEBUG, "calling lsycnd_get_alarm()");
lua_call(L, 0, 2); lua_call(L, 0, 2);
have_alarm = lua_toboolean(L, -2); have_alarm = lua_toboolean(L, -2);
alarm_time = (clock_t) luaL_checkinteger(L, -1); alarm_time = (clock_t) luaL_checkinteger(L, -1);
@ -1099,6 +1104,7 @@ masterloop(lua_State *L)
lua_getglobal(L, "lsyncd_collect_process"); lua_getglobal(L, "lsyncd_collect_process");
lua_pushinteger(L, pid); lua_pushinteger(L, pid);
lua_pushinteger(L, WEXITSTATUS(status)); lua_pushinteger(L, WEXITSTATUS(status));
printlogf(L, DEBUG, "calling lsyncd_collect_process().");
lua_call(L, 2, 0); lua_call(L, 2, 0);
} }
@ -1115,9 +1121,12 @@ masterloop(lua_State *L)
break; break;
} }
/* calls the lua runner to write the status. */ /* calls the lua runner to write the status. */
lua_getglobal(L, "lsyncd_call_error");
lua_getglobal(L, "lsyncd_status_report"); lua_getglobal(L, "lsyncd_status_report");
lua_pushinteger(L, fd); lua_pushinteger(L, fd);
lua_call(L, 1, 0); printlogf(L, DEBUG, "calling lysncd_status_report()");
lua_pcall(L, 1, 0, -3);
printlogf(L, DEBUG, "returnd()");
/* TODO */ /* TODO */
fsync(fd); fsync(fd);
@ -1128,6 +1137,7 @@ masterloop(lua_State *L)
/* lets the runner spawn new processes */ /* lets the runner spawn new processes */
lua_getglobal(L, "lsyncd_alarm"); lua_getglobal(L, "lsyncd_alarm");
lua_pushinteger(L, times(NULL)); lua_pushinteger(L, times(NULL));
printlogf(L, DEBUG, "calling lsyncd_alarm().");
lua_call(L, 1, 0); lua_call(L, 1, 0);
} }
} }

View File

@ -78,9 +78,11 @@ local CountArray = (function()
-- Metatable -- Metatable
local mt = {} local mt = {}
-----
-- key to native table -- key to native table
local k_nt = {} local k_nt = {}
-----
-- on accessing a nil index. -- on accessing a nil index.
mt.__index = function(t, k) mt.__index = function(t, k)
if type(k) ~= "number" then if type(k) ~= "number" then
@ -89,6 +91,7 @@ local CountArray = (function()
return t[k_nt][k] return t[k_nt][k]
end end
-----
-- on assigning a new index. -- on assigning a new index.
mt.__newindex = function(t, k, v) mt.__newindex = function(t, k, v)
if type(k) ~= "number" then if type(k) ~= "number" then
@ -97,9 +100,9 @@ local CountArray = (function()
-- value before -- value before
local vb = t[k_nt][k] local vb = t[k_nt][k]
if v and not vb then if v and not vb then
t.size = t.size + 1 t._size = t._size + 1
elseif not v and vb then elseif not v and vb then
t.size = t.size - 1 t._size = t._size - 1
end end
t[k_nt][k] = v t[k_nt][k] = v
end end
@ -109,10 +112,17 @@ local CountArray = (function()
return ipairs(self[k_nt]) return ipairs(self[k_nt])
end end
------
-- returns the count
local function size(self)
return self._size
end
----
-- creates a new count array -- creates a new count array
local function new() local function new()
-- k_nt is native table, private for this object. -- k_nt is native table, private for this object.
local o = {size = 0, iwalk = iwalk, [k_nt] = {} } local o = {_size = 0, iwalk = iwalk, size = size, [k_nt] = {} }
setmetatable(o, mt) setmetatable(o, mt)
return o return o
end end
@ -199,7 +209,7 @@ local Delay = (function()
end)() end)()
----- -----
-- TODO -- Holds information about one observed directory inclusively subdirs.
-- --
local Origin = (function() local Origin = (function()
---- ----
@ -224,7 +234,7 @@ end)()
----- -----
-- Puts an action on the delay stack. -- Puts an action on the delay stack.
-- --
function Origin.delay(origin, ename, time, pathname, pathname2) function Origin:delay(origin, ename, time, pathname, pathname2)
log("Debug", "delay ", ename, " ", pathname) log("Debug", "delay ", ename, " ", pathname)
local o = origin local o = origin
local delays = o.delays local delays = o.delays
@ -295,27 +305,36 @@ local Origins = (function()
local list = Array.new() local list = Array.new()
----- -----
-- inheritly copies all from a configs 'config' key -- inheritly copies all non integer keys from
-- @cd copy destination -- @cd copy destination
-- to
-- @cs copy source -- @cs copy source
-- all integer keys are treated as new copy sources
--
local function inherit(cd, cs) local function inherit(cd, cs)
if cs.config then -- first recurses into all integer keyed tables
-- recurse into source for i, v in ipairs(cs) do
inherit(cd, cs.config) if type(v) == "table" then
inherit(cd, v)
end end
end
-- does nothing further if source == destination (first step)
if (cd == cs) then
return
end
-- then copies from the source all non integer keyed values
for k, v in pairs(cs) do for k, v in pairs(cs) do
if k ~= "config" and not cd[k] then if type(k) ~= "number" and not cd[k] then
cd[k] = v cd[k] = v
end end
end end
end end
----- -----
-- adds a configuration -- Adds a new directory to observe.
local add = function(config) --
if config.config then local function add(config)
inherit(config, config.config) inherit(config, config)
end
-- raises an error if 'name' isnt in opts -- raises an error if 'name' isnt in opts
local function require_opt(name) local function require_opt(name)
@ -358,7 +377,6 @@ local Origins = (function()
optional("action") optional("action")
optional("max_processes") optional("max_processes")
optional("collapse_table") optional("collapse_table")
local o = Origin.new(config.source, config.target, config) local o = Origin.new(config.source, config.target, config)
table.insert(list, o) table.insert(list, o)
end end
@ -378,79 +396,151 @@ local Origins = (function()
end)() end)()
----- -----
-- inotifies -- Interface to inotify, watches recursively subdirs and
-- sends events.
-- --
-- contains all inotify watches. -- All inotify specific implementation should be enclosed here.
-- So lsyncd can work with other notifications mechanisms just
-- by changing this.
-- --
-- structure: local Inotifies = (function()
-- a list indexed by watch descriptor -----
-- [wd] -- A list indexed by inotifies watch descriptor.
-- of a numeric list of all origins watching this dir. -- Contains a list of all origins observing this directory
-- [#] -- (directly or by recurse)
-- of inotify { local wdlist = CountArray.new()
-- .origin .. link to origin
-- .path .. relative path of dir
-- }
-- }
--
local inotifies = CountArray.new()
local proto_inotify = {origin=true, path=true}
----- -----
-- A list of names of the event types the core sends.
-- (Also makes sure the strings are not collected)
--
local valid_events = {
Attrib = true,
Modify = true,
Create = true,
Delete = true,
Move = true,
MoveFrom = true,
MoveTo = true,
}
--============================================================================
-- The lsyncd runner
--============================================================================
----
-- Adds watches for a directory including all subdirectories. -- Adds watches for a directory including all subdirectories.
-- --
-- @param origin link to origins[] entry
-- @param path relative path of this dir to origin -- @param path relative path of this dir to origin
-- @param parent link to parent directory in watches[] -- @param origin link to the observer to be notified.
-- Note: Inotifies should handle this opaquely
-- @param recurse true if recursing into subdirs
-- --
local function inotify_watch_dir(origin, path) local function add(path, origin, recurse)
local op = origin.source .. path
-- register watch and receive watch descriptor -- register watch and receive watch descriptor
local wd = lsyncd.add_watch(op); local wd = lsyncd.add_watch(path);
if wd < 0 then if wd < 0 then
-- failed adding the watch -- failed adding the watch
log("Error", "Failure adding watch ", op, " -> ignored ") log("Error", "Failure adding watch ", path, " -> ignored ")
return return
end end
local ilist = inotifies[wd] local ilist = wdlist[wd]
if not ilist then if not ilist then
ilist = Array.new() ilist = Array.new()
inotifies[wd] = ilist wdlist[wd] = ilist
end end
local inotify = { origin = origin, path = path } local inotify = { path = path, origin = origin, recurse = recurse}
set_prototype(inotify, proto_inotify)
table.insert(ilist, inotify) table.insert(ilist, inotify)
-- on a warmstart add a Create for the directory -- registers and adds watches for all subdirectories
if not origin.config.startup then if recurse then
-- TODO BROKEN local subdirs = lsyncd.sub_dirs(path)
origin:delay("Create", sync, nil, nil, nil) for _, dirname in ipairs(subdirs) do
add(path..dirname.."/", origin, true)
end
end
end end
-- registers and adds watches for all subdirectories -----
local subdirs = lsyncd.sub_dirs(op) -- Called when an event has occured.
for _, dirname in ipairs(subdirs) do --
inotify_watch_dir(origin, path..dirname.."/") -- @param ename "Attrib", "Mofify", "Create", "Delete", "Move")
-- @param wd watch descriptor (matches lsyncd.add_watch())
-- @param isdir true if filename is a directory
-- @param time time of event
-- @param filename string filename without path
-- @param filename2
--
function event(ename, wd, isdir, time, filename, filename2)
local ftype;
if isdir then
ftype = "directory"
else
ftype = "file"
end
if filename2 then
log("Debug", "got event ", ename, " of ", ftype, " ", filename,
" to ", filename2)
else
log("Debug", "got event ", ename, " of ", ftype, " ", filename)
end
local ilist = wdlist[wd]
-- looks up the watch descriptor id
if not ilist then
-- this is normal in case of deleted subdirs
log("Normal", "event belongs to unknown/deleted watch descriptor.")
return
end
-- works through all observers interested in this directory
for _, inotify in ipairs(ilist) do
local pathname = inotify.path .. filename
local pathname2
if filename2 then
pathname2 = inotify.path..filename2
end
Origin.delay(inotify.origin, ename, time, pathname, pathname2)
-- add subdirs for new directories
if inotify.recurse and isdir then
if ename == "Create" then
add(pathname.."/", inotify.origin, true)
-- TODO remove /
end
end
end
end
-----
-- Writes a status report about inotifies to a filedescriptor
--
local function status_report(fd)
local w = lsyncd.writefd
w(fd, "Watching ", wdlist:size(), " directories\n")
for wd, v in wdlist:iwalk() do
w(fd, " ", wd, ": ")
for _, v in ipairs(v) do
print(v.origin)
print(v.path)
print(v.origin.source)
w(fd, "(", v.origin.source, "/", (v.path) or ")")
end
w(fd, "\n")
end
end
-----
-- Returns the number of directories watched in total.
local function size()
return wdlist:size()
end
-- public interface
return { add = add, size = size, status_report = status_report }
end)()
--============================================================================
-- lsyncd runner plugs. These functions will be called from core.
--============================================================================
-----
-- Called from core whenever a lua failed.
--
function lsyncd_call_error(message)
log("Error", "IN LUA: ", message)
-- prints backtrace
local level = 2
while true do
local info = debug.getinfo(level, "Sl")
if not info then
terminate(-1) -- ERRNO
end
log("Error", "Backtrace ", level - 1, " :",
info.short_src, ":", info.currentline)
level = level + 1
end end
end end
@ -533,14 +623,7 @@ end
function lsyncd_status_report(fd) function lsyncd_status_report(fd)
local w = lsyncd.writefd local w = lsyncd.writefd
w(fd, "Lsyncd status report at ", os.date(), "\n\n") w(fd, "Lsyncd status report at ", os.date(), "\n\n")
w(fd, "Watching ", inotifies.size, " directories\n") Inotifies.status_report(fd)
for wd, v in inotifies:iwalk() do
w(fd, " ", wd, ": ")
for _, inotify in ipairs(v) do
w(fd, "(", inotify.origin.source, "/", (inotify.path) or ")")
end
w(fd, "\n")
end
end end
---- ----
@ -553,7 +636,7 @@ function lsyncd_alarm(now)
-- goes through all targets and spawns more actions -- goes through all targets and spawns more actions
-- if possible -- if possible
for _, o in Origins.iwalk() do for _, o in Origins.iwalk() do
if o.processes.size < o.config.max_processes then if o.processes:size() < o.config.max_processes then
local delays = o.delays local delays = o.delays
local d = delays[1] local d = delays[1]
if d and lsyncd.before_eq(d.alarm, now) then if d and lsyncd.before_eq(d.alarm, now) then
@ -651,10 +734,10 @@ function lsyncd_initialize(args)
have_startup = true have_startup = true
end end
-- adds the dir watch inclusively all subdirs -- adds the dir watch inclusively all subdirs
inotify_watch_dir(o, "") Inotifies.add(o.source, "", true)
end end
-- from this point on use logging facilities as configured. -- from now on use logging as configured instead of stdout/err.
lsyncd.configure("running"); lsyncd.configure("running");
if have_startup then if have_startup then
@ -669,10 +752,10 @@ function lsyncd_initialize(args)
end end
lsyncd.wait_pids(pids, "startup_collector") lsyncd.wait_pids(pids, "startup_collector")
log("Normal", "- Entering normal operation with ", log("Normal", "- Entering normal operation with ",
inotifies.size, " monitored directories -") Inotifies.size(), " monitored directories -")
else else
log("Normal", "- Warmstart into normal operation with ", log("Normal", "- Warmstart into normal operation with ",
inotifies.size, " monitored directories -") Inotifies.size(), " monitored directories -")
end end
end end
@ -688,7 +771,7 @@ function lsyncd_get_alarm()
local alarm = 0 local alarm = 0
for _, o in Origins.iwalk() do for _, o in Origins.iwalk() do
if o.delays[1] and if o.delays[1] and
o.processes.size < o.config.max_processes then o.processes:size() < o.config.max_processes then
if have_alarm then if have_alarm then
alarm = lsyncd.earlier(alarm, o.delays[1].alarm) alarm = lsyncd.earlier(alarm, o.delays[1].alarm)
else else
@ -700,53 +783,7 @@ function lsyncd_get_alarm()
return have_alarm, alarm return have_alarm, alarm
end end
----- lsyncd_inotify_event = Inotifies.event
-- Called by core on inotify event
--
-- @param ename "Attrib", "Mofify", "Create", "Delete", "Move")
-- @param wd watch descriptor (matches lsyncd.add_watch())
-- @param time time of event
-- @param filename string filename without path
-- @param filename2
--
function lsyncd_inotify_event(ename, wd, isdir, time, filename, filename2)
local ftype;
if isdir then
ftype = "directory"
else
ftype = "file"
end
if filename2 then
log("Debug", "got event ", ename, " of ", ftype, " ", filename,
" to ", filename2)
else
log("Debug", "got event ", ename, " of ", ftype, " ", filename)
end
-- looks up the watch descriptor id
local ilist = inotifies[wd]
if not ilist then
log("Normal", "event belongs to unknown or deleted watch descriptor.")
return
end
-- works through all possible source->target pairs
for _, inotify in ipairs(ilist) do
local pathname2
if filename2 then
pathname2 = inotify.path..filename2
end
Origin.delay(inotify.origin, ename, time,
inotify.path..filename, pathname2)
-- add subdirs for new directories
if isdir then
if ename == "Create" then
inotify_watch_dir(inotify.origin,
inotify.path .. filename .. "/")
end
end
end
end
----- -----
-- Collector for every child process that finished in startup phase -- Collector for every child process that finished in startup phase