From 69d5b186a351a21936a3c38cb80fc4911943f50a Mon Sep 17 00:00:00 2001 From: Axel Kittenberger Date: Mon, 8 Nov 2010 12:14:10 +0000 Subject: [PATCH] --- lsyncd-conf.lua | 6 +- lsyncd.c | 7 +- lsyncd.lua | 573 +++++++++++++++++++++++++++--------------------- 3 files changed, 331 insertions(+), 255 deletions(-) diff --git a/lsyncd-conf.lua b/lsyncd-conf.lua index 9f2c5ec..fadb151 100644 --- a/lsyncd-conf.lua +++ b/lsyncd-conf.lua @@ -23,16 +23,16 @@ slowbash = { -- collect gets called when spawned process finished local function collect(exitcode) if exitcode == 0 then - log("Normal", "Startup of '"..c.source.."' finished.") + log("Normal", "Startup of '",c.source,"' finished.") else - log("Error", "Failure on startup of '"...source.."'.") + log("Error", "Failure on startup of '",c.source,"'.") terminate(-1) -- ERRNO end end spawnShell(inlet.blanketEvent(), collect, [[if [ "$(ls -A $1)" ]; then cp -r "$1"* "$2"; fi]], - config.source, config.target) + c.source, c.target) end, onCreate = function(event) diff --git a/lsyncd.c b/lsyncd.c index f3fe72e..380cca3 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -987,8 +987,11 @@ masterloop(lua_State *L) if (lua_pcall(L, 0, 1, -2)) { exit(-1); // ERRNO } - have_alarm = lua_toboolean(L, -1); - if (have_alarm) { + + if (lua_type(L, -1) == LUA_TBOOLEAN) { + have_alarm = lua_toboolean(L, -1); + } else { + have_alarm = true; alarm_time = (clock_t) luaL_checkinteger(L, -1); } lua_pop(L, 2); diff --git a/lsyncd.lua b/lsyncd.lua index d0f55c6..c5e9a4e 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -37,7 +37,7 @@ log = lsyncd.log terminate = lsyncd.terminate --============================================================================ --- Coding checks, ensure termination on some easy to do coding errors. +-- Lsyncd Prototypes --============================================================================ ----- @@ -154,7 +154,7 @@ local function lockGlobals() end mt.__newindex = function(t, k, v) if (k~="_" and string.sub(k, 1, 2) ~= "__") then - error("Lsyncd does not allow GLOBALS to be created on the fly." .. + error("Lsyncd does not allow GLOBALS to be created on the fly. " .. "Declare '" ..k.."' local or declare global on load.", 2) else rawset(t, k, v) @@ -181,7 +181,7 @@ local Delay = (function() etype = etype, alarm = alarm, path = path, - status = "delay", + status = "wait", } return o end @@ -189,21 +189,252 @@ local Delay = (function() return {new = new} end)() +----- +-- User interface to grap events +-- +-- InletControl is the Luas runner part to control the interface +-- hidden from the user. +-- +local Inlet, InletControl = (function() + -- lua runner controlled variables + local sync + + -- key variable for delays hidden from user + local delayKey = {} + + ----- + -- removes the trailing slash from a path + local function cutSlash(path) + if string.byte(path, -1) == 47 then + return string.sub(path, 1, -2) + else + return path + end + end + + ----- + -- Interface for the user to get fields. + local eventFields = { + config = function(event) + return event[delayKey].sync.config + end, + + ----- + -- Returns the type of the event. + -- Can be: + -- "Attrib" + -- "Create" + -- "Delete" + -- "Modify" + -- "Move" + etype = function(event) + return event[delayKey].etype + end, + + ----- + -- Returns true if event relates to a directory. + isdir = function(event) + return string.byte(event[delayKey].path, -1) == 47 + end, + + ----- + -- Returns the name of the file/dir. + -- Includes a trailing slash for dirs. + name = function() + return string.match(event[delayKey].path, "[^/]+/?$") + end, + + ----- + -- Returns the name of the file/dir. + -- Excludes a trailing slash for dirs. + basename = function() + return string.match(event[delayKey].path, "([^/]+)/?$") + end, + + ----- + -- Returns the file/dir relative to watch root + -- Includes a trailing slash for dirs. + path = function(event) + return event[delayKey].path + end, + + ----- + -- Returns the file/dir relativ to watch root + -- Excludes a trailing slash for dirs. + pathname = function(event) + return cutSlash(event[delayKey].path) + end, + + ------ + -- Returns the absolute path of the watch root. + -- All symlinks will have been resolved. + source = function(event) + return sync.source + end, + + ------ + -- Returns the absolute path of the file/dir. + -- Includes a trailing slash for dirs. + sourcePath = function(event) + return sync.source .. event[delayKey].path + end, + + ------ + -- Returns the absolute path of the file/dir. + -- Excludes a trailing slash for dirs. + sourcePathname = function(event) + return sync.source .. cutSlash(event[delayKey].path) + end, + + ------ + -- Returns the target. + -- Just for user comfort, for most case + -- (Actually except of here, the lsyncd.runner itself + -- does not care event about the existance of "target", + -- this is completly up to the action scripts.) + target = function(event) + return sync.config.target + end, + + ------ + -- Returns the relative dir/file appended to the target. + -- Includes a trailing slash for dirs. + targetPath = function(event) + return sync.config.target .. event[delayKey].path + end, + + ------ + -- Returns the relative dir/file appended to the target. + -- Excludes a trailing slash for dirs. + targetPathname = function(event) + return sync.config.target .. cutSlash(event[delayKey].path) + end, + } + + ----- + -- Calls event functions for the user. + local eventMeta = { + __index = function(t, k) + local f = eventFields[k] + if not f then + error("event does not have field '"..k.."'", 2) + end + return f(t) + end + } + + ----- + -- Encapsulates a delay to an event for the user + -- + -- TODO this hidden key technique can be circumvented with + -- pairs(), use a weak table as referencer instead. + -- + local function toEvent(delay) + if not delay.event then + delay.event = {} + setmetatable(delay.event, eventMeta) + delay.event[delayKey] = delay + end + return delay.event + end + + + ----- + -- Creates a blanketEvent that blocks everything + -- and is blocked by everything. + -- + local function blanketEvent() + return toEvent(sync:addBlanketDelay()) + end + + ----- + -- Gets the next event from queue. + -- + local function getEvent() + return toEvent(sync:getNextDelay(lysncd.now())) + end + + ----- + -- Returns the configuration table specified by sync{} + -- + local function getConfig() + -- TODO give a readonly handler only. + return sync.config + end + + ----- + -- Interface for lsyncd runner to control what + -- the inlet will present the user. + -- + local function set(setSync) + sync = setSync + end + + ----- + -- Return the inner config + -- not to be called from user + local function getInterior(event) + return sync, event[delayKey] + end + + ----- + -- public interface. + -- this one is split, one for user one for runner. + return { + getEvent = getEvent, + getConfig = getConfig, + blanketEvent = blanketEvent + }, { + set = set, + getInterior = getInterior + } +end)() + ----- -- Holds information about one observed directory inclusively subdirs. -- local Sync = (function() + ----- -- Syncs that have no name specified get an incremental default name -- local nextDefaultName = 1 + + ----- + -- Collects a child process + -- + local function collect(self, pid, exitcode) + local delay = self.processes[pid] + if not delay then + -- not a child of this sync. + return + end + if delay.status ~= "active" then + error("internal fail, collecting a non-active process") + end + -- TODO call user collector + local found + for i, d in ipairs(self.delays) do + if d == delay then + found = true + table.remove(self.delays, i) + break + end + end + if not found then + error("Did not find a delay!") + end + log("Debug", "collected ",pid, ": ",delay.etype," of ", + self.source, delay.path," = ",exitcode) + self.processes[pid] = nil + self.delayname[delay.path] = nil + end ----- -- Puts an action on the delay stack. -- local function delay(self, etype, time, path, path2) - log("Function", "delay(", self, ", ", etype, ", ", path, ")") - local delays = self.delays + log("Function", "delay(", self.config.name,", ",etype,", ",path,"...)") local delayname = self.delayname if etype == "Move" and not self.config.move then @@ -253,15 +484,15 @@ local Sync = (function() return end end - table.insert(delays, newd) + table.insert(self.delays, newd) else delayname[path] = newd - table.insert(delays, newd) + table.insert(self.delays, newd) end end ----- - -- Return the nearest alarm for this Sync. + -- Returns the nearest alarm for this Sync. -- local function getAlarm(self) -- first checks if more processses could be spawned @@ -275,30 +506,67 @@ local Sync = (function() end ----- - -- Creates new actions + -- Gets the next event to be processed. -- - local function invokeActions(self) - if self.processes:size() >= self.config.maxProcesses then - return - end - - local delays = self.delays - local d = delays[1] - if d and lsyncd.clockbeforeq(d.alarm, now) then - InletControl.set(sync, delay) - sync.config.action(Inlet) - - -- TODO do not remove - table.remove(delays, 1) - s.delayname[d.path] = nil + local function getNextDelay(self, now) + for i, d in ipairs(self.delays) do + if d.alarm ~= true and lsyncd.clockbefore(now, d.alarm) then + -- reached point in stack where delays are in future + return nil + end + if d.status == "wait" then + -- found a waiting delay + return d + end end end - ------ - -- adds a blanketEvent thats blocks all (startup) + ----- + -- Creates new actions -- - local function addBlanketEvent() - + local function invokeActions(self, now) + log("Function", "invokeActions('",self.config.name,"',",now,")") + if self.processes:size() >= self.config.maxProcesses then + log("Debug", "no new processes") + -- no new processes + return + end + for _, d in ipairs(self.delays) do + log("Debug", "iter") + if d.alarm ~= true and lsyncd.clockbefore(now, d.alarm) then + -- reached point in stack where delays are in future + log("Debug", "waits in future.") + return + end + if d.status == "wait" then + -- found a waiting delay + log("Debug", "invoke") + InletControl.set(self) + log("Debug", "invoke2") + self.config.action(Inlet) + log("Debug", "invoke3") + if self.processes:size() >= self.config.maxProcesses then + -- no further processes + log("Debug", "no further processes") + return + end + log("Debug", "finInvoke") + end + log("Debug", "next") + end + log("Debug", "finInvoke") + end + + + ------ + -- adds and returns a blanket delay thats blocks all + -- (used in startup) + -- + local function addBlanketDelay(self) + local newd = Delay.new("Blanket", "/", true) + self.delayname["/"] = newd -- TODO remove delayname + table.insert(self.delays, newd) + return newd end ----- @@ -314,21 +582,26 @@ local Sync = (function() processes = CountArray.new(), -- functions - delay = delay, - getAlarm = getAlarm, - invokeActions = invokeActions, + collect = collect, + delay = delay, + addBlanketDelay = addBlanketDelay, + getAlarm = getAlarm, + getNextDelay = getNextDelay, + invokeActions = invokeActions, } -- provides a default name if needed if not config.name then config.name = "Sync" .. nextDefaultName end - -- increment default nevertheless to cause less confusion + -- increments default nevertheless to cause less confusion + -- so name will be the n-th call to sync{} nextDefaultName = nextDefaultName + 1 return s end ----- -- public interface + -- return {new = new} end)() @@ -611,212 +884,13 @@ end -- zombie process was collected by core. -- function lsyncd_collect_process(pid, exitcode) - local delay = nil - local sync = nil for _, s in Syncs.iwalk() do - delay = s.processes[pid] - if delay then - sync = s - break + if s:collect(pid, exitcode) then + return end end - if not delay then - return - end - log("Debug", "collected ",pid, ": ",delay.etype," of ", - sync.source, delay.path," = ",exitcode) - sync.processes[pid] = nil end ------ --- User interface to grap events --- --- InletControl is the Luas runner part to control the interface --- hidden from the user. --- -local Inlet, InletControl = (function() - -- lua runner controlled variables - local sync = true - local delay = true - - -- event to be passed to the user - local event = {} - - ----- - -- removes the trailing slash from a path - local function cutSlash(path) - if string.byte(path, -1) == 47 then - return string.sub(path, 1, -2) - else - return path - end - end - - ----- - -- Creates a blanketEvent that blocks everything - -- and is blocked by everything. - -- - local function blanketEvent() - return sync.addBlanketEvent() - end - - ----- - -- Interface for the user to get fields. - local eventFields = { - config = function() - return sync.config - end, - - ----- - -- Returns the type of the event. - -- Can be: - -- "Attrib" - -- "Create" - -- "Delete" - -- "Modify" - -- "Move" - etype = function() - return delay.etype - end, - - ----- - -- Returns true if event relates to a directory. - isdir = function() - return string.byte(delay.path, -1) == 47 - end, - - ----- - -- Returns the name of the file/dir. - -- Includes a trailing slash for dirs. - name = function() - return string.match(delay.path, "[^/]+/?$") - end, - - ----- - -- Returns the name of the file/dir. - -- Excludes a trailing slash for dirs. - basename = function() - return string.match(delay.path, "([^/]+)/?$") - end, - - ----- - -- Returns the file/dir relative to watch root - -- Includes a trailing slash for dirs. - path = function() - return delay.path - end, - - ----- - -- Returns the file/dir relativ to watch root - -- Excludes a trailing slash for dirs. - pathname = function() - return cutSlash(delay.path) - end, - - ------ - -- Returns the absolute path of the watch root. - -- All symlinks will have been resolved. - source = function() - return sync.source - end, - - ------ - -- Returns the absolute path of the file/dir. - -- Includes a trailing slash for dirs. - sourcePath = function() - return sync.source .. delay.path - end, - - ------ - -- Returns the absolute path of the file/dir. - -- Excludes a trailing slash for dirs. - sourcePathname = function() - return sync.source .. cutSlash(delay.path) - end, - - ------ - -- Returns the target. - -- Just for user comfort, for most case - -- (Actually except of here, the lsyncd.runner itself - -- does not care event about the existance of "target", - -- this is completly up to the action scripts.) - target = function() - return sync.config.target - end, - - ------ - -- Returns the relative dir/file appended to the target. - -- Includes a trailing slash for dirs. - targetPath = function() - return sync.config.target .. delay.path - end, - - ------ - -- Returns the relative dir/file appended to the target. - -- Excludes a trailing slash for dirs. - targetPathname = function() - return sync.config.target .. cutSlash(delay.path) - end, - } - - ----- - -- Calls event functions for the user. - local eventMeta = { - __index = function(t, k) - local f = eventFields[k] - if not f then - error("event does not have field '"..k.."'", 2) - end - return f() - end - } - setmetatable(event, eventMeta) - - ----- - -- Interface for lsyncd runner to control what - -- the inlet will present the user. - -- - local function set(setSync, setDelay) - sync = setSync - delay = setDelay - end - - ----- - -- Gets the next event from queue. - -- - local function getEvent() - -- TODO actually aquire here - return event - end - - ----- - -- Returns the configuration table specified by sync{} - -- - local function getConfig() - -- TODO give a readonly handler only. - return sync.config - end - - ----- - -- Return the inner config - -- not to be called from user - local function getInterior(event) - return sync, delay - end - - ----- - -- public interface. - -- this one is split, one for user one for runner. - return { - getEvent = getEvent, - getConfig = getConfig, - blanketEvent = blanketEvent - }, { - set = set, - getInterior = getInterior - } -end)() - ---- -- Writes a status report file at most every [statusintervall] seconds. @@ -892,14 +966,16 @@ end)() -- @param now the current kernel time (in jiffies) -- function lsyncd_cycle(now) - -- goes through all targets and spawns more actions + -- goes through all syncs and spawns more actions -- if possible + for _, s in Syncs.iwalk() do + s:invokeActions(now) + end + if settings.statusfile then StatusFile.write(now) end - for _, s in Syncs.iwalk() do - s:invokeActions() - end + log("Debug", "fin lsyncd_cycle") end @@ -1011,7 +1087,7 @@ function lsyncd_initialize() for _, s in Syncs.iwalk() do Inotifies.add(s.source, "", true, s) if s.config.init then - InletControl(s, nil) + InletControl.set(s) s.config.init(Inlet) end end @@ -1021,6 +1097,7 @@ end -- Called by core to query soonest alarm. -- -- @return false ... no alarm, core can in untimed sleep, or +-- true ... immediate action -- times ... the alarm time (only read if number is 1) -- function lsyncd_get_alarm() @@ -1030,7 +1107,8 @@ function lsyncd_get_alarm() -- checks if current nearest alarm or a is earlier -- local function checkAlarm(a) - if not a then + if alarm == true or not a then + -- already immediate or no new alarm return end if not alarm then @@ -1107,15 +1185,12 @@ overflow = default_overflow -- @param binary binary to call -- @param ... arguments -- -function spawn(agent, collect, binary, ...) +function spawn(agent, collector, binary, ...) local pid = lsyncd.exec(binary, ...) if pid and pid > 0 then - if agent == "full" then - - end local sync, delay = InletControl.getInterior(agent) delay.status = "active" - delay.collect = collect + delay.collector = collector sync.processes[pid] = delay end end @@ -1123,8 +1198,8 @@ end ----- -- Spawns a child process using bash. -- -function spawnShell(agent, collect, command, ...) - return spawn(agent, collect, "/bin/sh", "-c", command, "/bin/sh", ...) +function spawnShell(agent, collector, command, ...) + return spawn(agent, collector, "/bin/sh", "-c", command, "/bin/sh", ...) end --============================================================================ @@ -1159,14 +1234,12 @@ default = { -- TODO desc -- action = function(inlet) - local event = inlet.get_event() - local config = inlet.get_config() + local event = inlet.getEvent() + local config = inlet.getConfig() local func = config["on".. event.etype] if func then -- TODO Moves? - return func(event) - else - return -1 + func(event) end end,