From dec35304dc337158f0d13cad6b7e47da29313442 Mon Sep 17 00:00:00 2001 From: Axel Kittenberger Date: Thu, 18 Aug 2011 13:29:18 +0000 Subject: [PATCH] added global maxProcesses limit, invoking delays has now a round-robin scheduler, changed syntax of init --- Makefile.am | 1 + lsyncd.lua | 257 +++++++++++++++++++++++++++++---------------- tests/schedule.lua | 76 ++++++++++++++ 3 files changed, 242 insertions(+), 92 deletions(-) create mode 100755 tests/schedule.lua diff --git a/Makefile.am b/Makefile.am index f49c3ff..5f89620 100644 --- a/Makefile.am +++ b/Makefile.am @@ -26,6 +26,7 @@ TESTS = \ tests/churn-direct.lua \ tests/exclude-rsync.lua \ tests/exclude-rsyncssh.lua \ + tests/schedule.lua \ tests/l4rsyncdata.lua dist_man1_MANS = doc/lsyncd.1 diff --git a/lsyncd.lua b/lsyncd.lua index e8e9aa1..549e946 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -49,6 +49,10 @@ local now = now -- local Monitors +----- +-- Global: total number of processess running +local processCount = 0 + --============================================================================ -- Lsyncd Prototypes --============================================================================ @@ -163,9 +167,9 @@ end)() -- -- Queue = (function() - ----- -- Creates a new queue. + -- local function new() return { first = 1, last = 0, size = 0}; end @@ -173,6 +177,7 @@ Queue = (function() ----- -- Pushes a value on the queue. -- Returns the last value + -- local function push(list, value) if not value then error("Queue pushing nil value", 2) @@ -217,6 +222,7 @@ Queue = (function() ----- -- Stateless queue iterator. + -- local function iter(list, pos) pos = pos + 1 while list[pos] == nil and pos <= list.last do @@ -241,16 +247,18 @@ Queue = (function() return pos, list[pos] end - ---- + ----- -- Iteraters through the queue -- returning all non-nil pos-value entries + -- local function qpairs(list) return iter, list, list.first - 1 end - ---- + ----- -- Iteraters backwards through the queue -- returning all non-nil pos-value entries + -- local function qpairsReverse(list) return iterReverse, list, list.last + 1 end @@ -262,7 +270,7 @@ Queue = (function() qpairsReverse = qpairsReverse} end)() ----- +----- -- Locks globals, -- no more globals can be created -- @@ -406,14 +414,14 @@ local Combiner = (function() -- combines two delays -- local function combine(d1, d2) - if d1.etype == "Blanket" then - -- everything is blocked by a blanket delay. + if d1.etype == "Init" or d1.etype == "Blanket" then + -- everything is blocked by init or blanket delays. if d2.path2 then log("Delay", d2.etype,":",d2.path,"->",d2.path2, "blocked by", - "Blanket event") + d1.etype," event") else log("Delay", d2.etype,":",d2.path, "blocked by", - "Blanket event") + d1.etype," event") end return "stack" end @@ -985,12 +993,6 @@ local InletFactory = (function() } end)() ------ --- Little dirty workaround to retrieve the Inlet from events in Inlet -getInlet = function() - return Inlet -end - ----- -- A set of exclude patterns @@ -1331,9 +1333,9 @@ local Sync = (function() end -- new delay local nd = Delay.new(etype, alarm, path, path2) - if nd.etype == "Blanket" then + if nd.etype == "Init" or nd.etype == "Blanket" then -- always stack blanket events on the last event - log("Delay", "Stacking blanket event.") + log("Delay", "Stacking ",nd.etype," event.") if self.delays.size > 0 then stack(self.delays[self.delays.last], nd) end @@ -1452,6 +1454,11 @@ local Sync = (function() return end for _, d in Queue.qpairs(self.delays) do + -- if reached the global limit return + if settings.maxProcesses and processCount >= settings.maxProcesses then + log("Alarm", "at global process limit.") + return + end if self.delays.size < self.config.maxDelays then -- time constrains are only concerned if not maxed -- the delay FIFO already. @@ -1462,7 +1469,11 @@ local Sync = (function() end if d.status == "wait" then -- found a waiting delay - self.config.action(self.inlet) + if d.etype ~= "Init" then + self.config.action(self.inlet) + else + self.config.init(InletFactory.d2e(self, d)) + end if self.processes:size() >= self.config.maxProcesses then -- no further processes return @@ -1492,8 +1503,8 @@ local Sync = (function() end ------ - -- adds and returns a blanket delay thats blocks all - -- (used in startup) + -- Adds and returns a blanket delay thats blocks all. + -- Used as custom marker. -- local function addBlanketDelay(self) local newd = Delay.new("Blanket", true, "") @@ -1501,6 +1512,16 @@ local Sync = (function() return newd end + ------ + -- Adds and returns a blanket delay thats blocks all. + -- Used as startup marker to call init asap. + -- + local function addInitDelay(self) + local newd = Delay.new("Init", true, "") + newd.dpos = Queue.push(self.delays, newd) + return newd + end + ----- -- Writes a status report about delays in this sync. -- @@ -1530,9 +1551,6 @@ local Sync = (function() f:write("\n") end - ----- - -- b - ----- -- Creates a new Sync -- @@ -1548,6 +1566,7 @@ local Sync = (function() -- functions addBlanketDelay = addBlanketDelay, addExclude = addExclude, + addInitDelay = addInitDelay, collect = collect, concerns = concerns, delay = delay, @@ -1604,13 +1623,40 @@ local Syncs = (function() -- the list of all syncs -- local list = Array.new() - + ----- - -- inheritly copies all non integer keys from - -- @cd copy destination - -- to - -- @cs copy source - -- all integer keys are treated as new copy sources + -- The round robin pointer. In case of global limited maxProcesses + -- gives every sync equal chances to spawn the next process. + -- + local round = 1 + + ----- + -- The cycle() sheduler goes into the next round of roundrobin. + local function nextRound() + round = round + 1; + if round > #list then + round = 1 + end + return round + end + + ----- + -- Returns the round + local function getRound() + return round + end + + ----- + -- Returns sync at listpos i + local function get(i) + return list[i]; + end + + ----- + -- Inheritly copies all non integer keys from + -- copy source (cs) to copy destination (cd). + -- + -- all entries with integer keys are treated as new sources to copy -- local function inherit(cd, cs) -- first copies from source all @@ -1629,7 +1675,7 @@ local Syncs = (function() end ----- - -- Adds a new directory to observe. + -- Adds a new sync (directory-tree to observe). -- local function add(config) -- Creates a new config table and inherit all keys/values @@ -1712,21 +1758,21 @@ local Syncs = (function() end ----- - -- allows to walk through all syncs + -- Allows a for-loop to walk through all syncs. -- local function iwalk() return ipairs(list) end ----- - -- returns the number of syncs + -- Returns the number of syncs. -- local size = function() return #list end ------ - -- tests if any sync is interested in path + -- Tests if any sync is interested in a path. -- local function concerns(path) for _, s in ipairs(list) do @@ -1738,10 +1784,15 @@ local Syncs = (function() end -- public interface - return {add = add, - concerns = concerns, - iwalk = iwalk, - size = size} + return { + add = add, + get = get, + getRound = getRound, + concerns = concerns, + iwalk = iwalk, + nextRound = nextRound, + size = size + } end)() @@ -2340,6 +2391,7 @@ end)() -- -- local StatusFile = (function() + ----- -- Timestamp when the status file has been written. local lastWritten = false @@ -2404,13 +2456,13 @@ local StatusFile = (function() end)() ------ --- Lets the userscript make its own alarms +-- Lets the userscript make its own alarms. -- local UserAlarms = (function() local alarms = {} ----- - -- Calls the user function at timestamp + -- Calls the user function at timestamp. -- local function alarm(timestamp, func, extra) local idx @@ -2431,7 +2483,7 @@ local UserAlarms = (function() end ---- - -- Retrieves the nearest alarm + -- Retrieves the nearest alarm. -- local function getAlarm() if #alarms == 0 then @@ -2442,7 +2494,7 @@ local UserAlarms = (function() end ----- - -- Calls user alarms + -- Calls user alarms. -- local function invoke(timestamp) while #alarms > 0 and alarms[1].timestamp <= timestamp do @@ -2499,6 +2551,11 @@ end -- zombie process was collected by core. -- function runner.collectProcess(pid, exitcode) + processCount = processCount - 1 + if processCount < 0 then + error("negative number of processes!") + end + for _, s in Syncs.iwalk() do if s:collect(pid, exitcode) then return @@ -2506,12 +2563,12 @@ function runner.collectProcess(pid, exitcode) end end ----- +----- -- Called from core everytime a masterloop cycle runs through. -- This happens in case of -- * an expired alarm. -- * a returned child process. --- * received inotify events. +-- * received filesystem events. -- * received a HUP or TERM signal. -- -- @param timestamp the current kernel time (in jiffies) @@ -2520,23 +2577,30 @@ function runner.cycle(timestamp) -- goes through all syncs and spawns more actions -- if possible if lsyncdStatus == "fade" then - local np = 0 - for _, s in Syncs.iwalk() do - np = np + s.processes:size() - end - if np > 0 then - log("Normal", "waiting for ",np," more child processes.") + if processCount > 0 then + log("Normal", "waiting for ",processCount," more child processes.") return true else return false end end if lsyncdStatus ~= "run" then - error("cycle called in not run?!") + error("runner.cycle() called while not running!") end - for _, s in Syncs.iwalk() do - s:invokeActions(timestamp) + --- only let Syncs invoke actions if not on global limit + if not settings.maxProcesses or processCount < settings.maxProcesses then + local start = Syncs.getRound() + local ir = start + repeat + local s = Syncs.get(ir) + s:invokeActions(timestamp) + ir = ir + 1 + if ir > Syncs.size() then + ir = 1 + end + until ir == start + Syncs.nextRound() end UserAlarms.invoke(timestamp) @@ -2572,7 +2636,7 @@ USAGE: OPTIONS: -delay SECS Overrides default delay times -help Shows this - -inisit Continues startup even if it cannot connect + -insist Continues startup even if it cannot connect -log all Logs everything (debug) -log scarce Logs errors only -log [Category] Turns on logging for a debug category @@ -2840,9 +2904,10 @@ function runner.initialize() error("sync "..s.config.name.. " has no known event monitor interface.") end - + -- if the sync has an init function, stacks an init delay + -- that will cause the init function to be called. if s.config.init then - s.config.init(s.inlet) + s:addInitDelay() end end end @@ -2874,15 +2939,21 @@ function runner.getAlarm() end -- checks all syncs for their earliest alarm - for _, s in Syncs.iwalk() do - checkAlarm(s:getAlarm()) + -- but only if the global process limit is not yet reached. + if not settings.maxProcesses or processCount < settings.maxProcesses then + for _, s in Syncs.iwalk() do + checkAlarm(s:getAlarm()) + end + else + log("Alarm", "at global process limit.") end + -- checks if a statusfile write has been delayed checkAlarm(StatusFile.getAlarm()) -- checks for an userAlarm checkAlarm(UserAlarms.getAlarm()) - log("Debug","getAlarm returns: ",alarm) + log("Alarm","runner.getAlarm returns: ",alarm) return alarm end @@ -2911,7 +2982,7 @@ function runner.collector(pid, exitcode) return 0 end ----- +----- -- Called by core when an overflow happened. -- function runner.overflow() @@ -2919,7 +2990,7 @@ function runner.overflow() lsyncdStatus = "fade" end ----- +----- -- Called by core on a hup signal. -- function runner.hup() @@ -2927,7 +2998,7 @@ function runner.hup() lsyncdStatus = "fade" end ----- +----- -- Called by core on a term signal. -- function runner.term() @@ -2991,6 +3062,10 @@ function spawn(agent, binary, ...) local pid = lsyncd.exec(binary, ...) if pid and pid > 0 then + processCount = processCount + 1 + if settings.maxProcesses and processCount > settings.maxProcesses then + error("Spawned too much processes!") + end local sync = InletFactory.getSync(agent) -- delay or list if dol.status then @@ -3097,7 +3172,7 @@ local default_rsync = { -- gets all events ready for syncing local elist = inlet.getEvents( function(event) - return event.etype ~= "Blanket" + return event.etype ~= "Init" and event.etype ~= "Blanket" end ) @@ -3177,15 +3252,9 @@ local default_rsync = { ----- -- Spawns the recursive startup sync -- - init = function(inlet) - local config = inlet.getConfig() - local event = inlet.createBlanketEvent() - event.isStartup = true -- marker for user scripts - - if string.sub(config.target, -1) ~= "/" then - config.target = config.target .. "/" - end - + init = function(event) + local config = event.config; + local inlet = event.inlet; local excludes = inlet.getExcludes(); if #excludes == 0 then log("Normal", "recursive startup rsync: ", config.source, @@ -3223,6 +3292,11 @@ local default_rsync = { end config.rsyncOpts = config.rsyncOps end + + -- appends a / to target if not present + if string.sub(config.target, -1) ~= "/" then + config.target = config.target .. "/" + end end, ----- @@ -3310,7 +3384,8 @@ local default_rsyncssh = { function(e) return e.etype ~= "Move" and e.etype ~= "Delete" and - e.etype ~= "Blanket" + e.etype ~= "Init" and + e.etype ~= "Blanket" end) local paths = elist.getPaths() @@ -3338,7 +3413,7 @@ local default_rsyncssh = { -- Called when collecting a finished child process -- collect = function(agent, exitcode) - if not agent.isList and agent.etype == "Blanket" then + if not agent.isList and agent.etype == "Init" then if exitcode == 0 then log("Normal", "Startup of '",agent.source,"' finished.") elseif rsync_exitcodes[exitcode] == "again" then @@ -3385,14 +3460,9 @@ local default_rsyncssh = { ----- -- Spawns the recursive startup sync -- - init = function(inlet) - local config = inlet.getConfig() - local event = inlet.createBlanketEvent() - event.isStartup = true -- marker for user scripts - if string.sub(config.targetdir, -1) ~= "/" then - config.targetdir = config.targetdir .. "/" - end - + init = function(event) + local config = event.config + local inlet = event.inlet local excludes = inlet.getExcludes(); if #excludes == 0 then log("Normal", "recursive startup rsync: ", config.source, @@ -3438,6 +3508,11 @@ local default_rsyncssh = { if not config.targetdir then error("default.rsyncssh needs 'targetdir' configured", 4) end + + -- appends a slash to the targetdir if missing + if string.sub(config.targetdir, -1) ~= "/" then + config.targetdir = config.targetdir .. "/" + end end, ----- @@ -3537,7 +3612,7 @@ local default_direct = { collect = function(agent, exitcode) local config = agent.config - if not agent.isList and agent.etype == "Blanket" then + if not agent.isList and agent.etype == "Init" then if exitcode == 0 then log("Normal", "Startup of '",agent.source,"' finished.") elseif rsync_exitcodes and @@ -3639,7 +3714,7 @@ default = { collect = function(agent, exitcode) local config = agent.config - if not agent.isList and agent.etype == "Blanket" then + if not agent.isList and agent.etype == "Init" then if exitcode == 0 then log("Normal", "Startup of '",agent.source,"' finished.") elseif config.exitcodes and @@ -3681,22 +3756,20 @@ default = { ----- -- called on (re)initalizing of Lsyncd. -- - init = function(inlet) - local config = inlet.getConfig() + init = function(event) + local config = event.config + local inlet = event.inlet -- user functions - -- calls a startup if given by user script. if type(config.onStartup) == "function" then - local event = inlet.createBlanketEvent() - event.isStartup = true -- marker for user scripts local startup = config.onStartup(event) - if event.status == "wait" then - -- user script did not spawn anything - -- thus the blanket event is deleted again. - inlet.discardEvent(event) - end -- TODO honor some return codes of startup like "warmstart". end + if event.status == "wait" then + -- user script did not spawn anything + -- thus the blanket event is deleted again. + inlet.discardEvent(event) + end end, ----- diff --git a/tests/schedule.lua b/tests/schedule.lua new file mode 100755 index 0000000..074468a --- /dev/null +++ b/tests/schedule.lua @@ -0,0 +1,76 @@ +#!/usr/bin/lua +require("posix") +dofile("tests/testlib.lua") + +cwriteln("****************************************************************") +cwriteln(" Testing Lsyncd scheduler ") +cwriteln("****************************************************************") + +local tdir, srcdir, trgdir = mktemps() +local logfile = tdir .. "log" +local cfgfile = tdir .. "config.lua" +local logs = {"-log", "all" } + +writefile(cfgfile, [[ +settings = { + logfile = "]]..logfile..[[", + log = all, + nodaemon = true, + maxProcesses = 1 +} + +-- continously touches a file +acircuit = { + delay = 0, + onStartup = "sleep 3 && touch ^source/a", + onCreate = "sleep 3 && touch ^source/a", +} + +-- continously touches b file +bcircuit = { + delay = 0, + onStartup = "sleep 3 && touch ^source/b", + onCreate = "sleep 3 && touch ^source/b", +} + +-- continously touches c file +ccircuit = { + delay = 0, + onStartup = "sleep 3 && touch ^source/c", + onCreate = "sleep 3 && touch ^source/c", +} + +sync {acircuit, source ="]]..srcdir..[[", target = "]]..trgdir..[["} +sync {bcircuit, source ="]]..srcdir..[[", target = "]]..trgdir..[["} +sync {ccircuit, source ="]]..srcdir..[[", target = "]]..trgdir..[["} +]]); + + +-- test if the filename exists, fails if this is different to expect +local function testfile(filename) + local stat, err = posix.stat(filename) + if not stat then + cwriteln("failure: ",filename," missing"); + os.exit(1); + end +end + +cwriteln("starting Lsyncd"); +local pid = spawn("./lsyncd", cfgfile, unpack(logs)); +cwriteln("waiting for Lsyncd to do a few cycles"); +posix.sleep(30) +cwriteln("look if every circle got a chance to run"); +testfile(srcdir.."a") +testfile(srcdir.."b") +testfile(srcdir.."c") +cwriteln("killing started Lsyncd"); +posix.kill(pid); +local _, exitmsg, lexitcode = posix.wait(lpid); +cwriteln("Exitcode of Lsyncd = ", exitmsg, " ", lexitcode); +posix.sleep(1); +if lexitcode == 0 then + cwriteln("OK"); +end +os.exit(lexitcode); + +-- TODO remove temp