added global maxProcesses limit, invoking delays has now a round-robin scheduler, changed syntax of init

This commit is contained in:
Axel Kittenberger 2011-08-18 13:29:18 +00:00
parent c21620d3e6
commit dec35304dc
3 changed files with 242 additions and 92 deletions

View File

@ -26,6 +26,7 @@ TESTS = \
tests/churn-direct.lua \
tests/exclude-rsync.lua \
tests/exclude-rsyncssh.lua \
tests/schedule.lua \
tests/l4rsyncdata.lua
dist_man1_MANS = doc/lsyncd.1

View File

@ -49,6 +49,10 @@ local now = now
--
local Monitors
-----
-- Global: total number of processess running
local processCount = 0
--============================================================================
-- Lsyncd Prototypes
--============================================================================
@ -163,9 +167,9 @@ end)()
--
--
Queue = (function()
-----
-- Creates a new queue.
--
local function new()
return { first = 1, last = 0, size = 0};
end
@ -173,6 +177,7 @@ Queue = (function()
-----
-- Pushes a value on the queue.
-- Returns the last value
--
local function push(list, value)
if not value then
error("Queue pushing nil value", 2)
@ -217,6 +222,7 @@ Queue = (function()
-----
-- Stateless queue iterator.
--
local function iter(list, pos)
pos = pos + 1
while list[pos] == nil and pos <= list.last do
@ -241,16 +247,18 @@ Queue = (function()
return pos, list[pos]
end
----
-----
-- Iteraters through the queue
-- returning all non-nil pos-value entries
--
local function qpairs(list)
return iter, list, list.first - 1
end
----
-----
-- Iteraters backwards through the queue
-- returning all non-nil pos-value entries
--
local function qpairsReverse(list)
return iterReverse, list, list.last + 1
end
@ -262,7 +270,7 @@ Queue = (function()
qpairsReverse = qpairsReverse}
end)()
----
-----
-- Locks globals,
-- no more globals can be created
--
@ -406,14 +414,14 @@ local Combiner = (function()
-- combines two delays
--
local function combine(d1, d2)
if d1.etype == "Blanket" then
-- everything is blocked by a blanket delay.
if d1.etype == "Init" or d1.etype == "Blanket" then
-- everything is blocked by init or blanket delays.
if d2.path2 then
log("Delay", d2.etype,":",d2.path,"->",d2.path2, "blocked by",
"Blanket event")
d1.etype," event")
else
log("Delay", d2.etype,":",d2.path, "blocked by",
"Blanket event")
d1.etype," event")
end
return "stack"
end
@ -985,12 +993,6 @@ local InletFactory = (function()
}
end)()
-----
-- Little dirty workaround to retrieve the Inlet from events in Inlet
getInlet = function()
return Inlet
end
-----
-- A set of exclude patterns
@ -1331,9 +1333,9 @@ local Sync = (function()
end
-- new delay
local nd = Delay.new(etype, alarm, path, path2)
if nd.etype == "Blanket" then
if nd.etype == "Init" or nd.etype == "Blanket" then
-- always stack blanket events on the last event
log("Delay", "Stacking blanket event.")
log("Delay", "Stacking ",nd.etype," event.")
if self.delays.size > 0 then
stack(self.delays[self.delays.last], nd)
end
@ -1452,6 +1454,11 @@ local Sync = (function()
return
end
for _, d in Queue.qpairs(self.delays) do
-- if reached the global limit return
if settings.maxProcesses and processCount >= settings.maxProcesses then
log("Alarm", "at global process limit.")
return
end
if self.delays.size < self.config.maxDelays then
-- time constrains are only concerned if not maxed
-- the delay FIFO already.
@ -1462,7 +1469,11 @@ local Sync = (function()
end
if d.status == "wait" then
-- found a waiting delay
self.config.action(self.inlet)
if d.etype ~= "Init" then
self.config.action(self.inlet)
else
self.config.init(InletFactory.d2e(self, d))
end
if self.processes:size() >= self.config.maxProcesses then
-- no further processes
return
@ -1492,8 +1503,8 @@ local Sync = (function()
end
------
-- adds and returns a blanket delay thats blocks all
-- (used in startup)
-- Adds and returns a blanket delay thats blocks all.
-- Used as custom marker.
--
local function addBlanketDelay(self)
local newd = Delay.new("Blanket", true, "")
@ -1501,6 +1512,16 @@ local Sync = (function()
return newd
end
------
-- Adds and returns a blanket delay thats blocks all.
-- Used as startup marker to call init asap.
--
local function addInitDelay(self)
local newd = Delay.new("Init", true, "")
newd.dpos = Queue.push(self.delays, newd)
return newd
end
-----
-- Writes a status report about delays in this sync.
--
@ -1530,9 +1551,6 @@ local Sync = (function()
f:write("\n")
end
-----
-- b
-----
-- Creates a new Sync
--
@ -1548,6 +1566,7 @@ local Sync = (function()
-- functions
addBlanketDelay = addBlanketDelay,
addExclude = addExclude,
addInitDelay = addInitDelay,
collect = collect,
concerns = concerns,
delay = delay,
@ -1604,13 +1623,40 @@ local Syncs = (function()
-- the list of all syncs
--
local list = Array.new()
-----
-- inheritly copies all non integer keys from
-- @cd copy destination
-- to
-- @cs copy source
-- all integer keys are treated as new copy sources
-- The round robin pointer. In case of global limited maxProcesses
-- gives every sync equal chances to spawn the next process.
--
local round = 1
-----
-- The cycle() sheduler goes into the next round of roundrobin.
local function nextRound()
round = round + 1;
if round > #list then
round = 1
end
return round
end
-----
-- Returns the round
local function getRound()
return round
end
-----
-- Returns sync at listpos i
local function get(i)
return list[i];
end
-----
-- Inheritly copies all non integer keys from
-- copy source (cs) to copy destination (cd).
--
-- all entries with integer keys are treated as new sources to copy
--
local function inherit(cd, cs)
-- first copies from source all
@ -1629,7 +1675,7 @@ local Syncs = (function()
end
-----
-- Adds a new directory to observe.
-- Adds a new sync (directory-tree to observe).
--
local function add(config)
-- Creates a new config table and inherit all keys/values
@ -1712,21 +1758,21 @@ local Syncs = (function()
end
-----
-- allows to walk through all syncs
-- Allows a for-loop to walk through all syncs.
--
local function iwalk()
return ipairs(list)
end
-----
-- returns the number of syncs
-- Returns the number of syncs.
--
local size = function()
return #list
end
------
-- tests if any sync is interested in path
-- Tests if any sync is interested in a path.
--
local function concerns(path)
for _, s in ipairs(list) do
@ -1738,10 +1784,15 @@ local Syncs = (function()
end
-- public interface
return {add = add,
concerns = concerns,
iwalk = iwalk,
size = size}
return {
add = add,
get = get,
getRound = getRound,
concerns = concerns,
iwalk = iwalk,
nextRound = nextRound,
size = size
}
end)()
@ -2340,6 +2391,7 @@ end)()
--
--
local StatusFile = (function()
-----
-- Timestamp when the status file has been written.
local lastWritten = false
@ -2404,13 +2456,13 @@ local StatusFile = (function()
end)()
------
-- Lets the userscript make its own alarms
-- Lets the userscript make its own alarms.
--
local UserAlarms = (function()
local alarms = {}
-----
-- Calls the user function at timestamp
-- Calls the user function at timestamp.
--
local function alarm(timestamp, func, extra)
local idx
@ -2431,7 +2483,7 @@ local UserAlarms = (function()
end
----
-- Retrieves the nearest alarm
-- Retrieves the nearest alarm.
--
local function getAlarm()
if #alarms == 0 then
@ -2442,7 +2494,7 @@ local UserAlarms = (function()
end
-----
-- Calls user alarms
-- Calls user alarms.
--
local function invoke(timestamp)
while #alarms > 0 and alarms[1].timestamp <= timestamp do
@ -2499,6 +2551,11 @@ end
-- zombie process was collected by core.
--
function runner.collectProcess(pid, exitcode)
processCount = processCount - 1
if processCount < 0 then
error("negative number of processes!")
end
for _, s in Syncs.iwalk() do
if s:collect(pid, exitcode) then
return
@ -2506,12 +2563,12 @@ function runner.collectProcess(pid, exitcode)
end
end
----
-----
-- Called from core everytime a masterloop cycle runs through.
-- This happens in case of
-- * an expired alarm.
-- * a returned child process.
-- * received inotify events.
-- * received filesystem events.
-- * received a HUP or TERM signal.
--
-- @param timestamp the current kernel time (in jiffies)
@ -2520,23 +2577,30 @@ function runner.cycle(timestamp)
-- goes through all syncs and spawns more actions
-- if possible
if lsyncdStatus == "fade" then
local np = 0
for _, s in Syncs.iwalk() do
np = np + s.processes:size()
end
if np > 0 then
log("Normal", "waiting for ",np," more child processes.")
if processCount > 0 then
log("Normal", "waiting for ",processCount," more child processes.")
return true
else
return false
end
end
if lsyncdStatus ~= "run" then
error("cycle called in not run?!")
error("runner.cycle() called while not running!")
end
for _, s in Syncs.iwalk() do
s:invokeActions(timestamp)
--- only let Syncs invoke actions if not on global limit
if not settings.maxProcesses or processCount < settings.maxProcesses then
local start = Syncs.getRound()
local ir = start
repeat
local s = Syncs.get(ir)
s:invokeActions(timestamp)
ir = ir + 1
if ir > Syncs.size() then
ir = 1
end
until ir == start
Syncs.nextRound()
end
UserAlarms.invoke(timestamp)
@ -2572,7 +2636,7 @@ USAGE:
OPTIONS:
-delay SECS Overrides default delay times
-help Shows this
-inisit Continues startup even if it cannot connect
-insist Continues startup even if it cannot connect
-log all Logs everything (debug)
-log scarce Logs errors only
-log [Category] Turns on logging for a debug category
@ -2840,9 +2904,10 @@ function runner.initialize()
error("sync "..s.config.name..
" has no known event monitor interface.")
end
-- if the sync has an init function, stacks an init delay
-- that will cause the init function to be called.
if s.config.init then
s.config.init(s.inlet)
s:addInitDelay()
end
end
end
@ -2874,15 +2939,21 @@ function runner.getAlarm()
end
-- checks all syncs for their earliest alarm
for _, s in Syncs.iwalk() do
checkAlarm(s:getAlarm())
-- but only if the global process limit is not yet reached.
if not settings.maxProcesses or processCount < settings.maxProcesses then
for _, s in Syncs.iwalk() do
checkAlarm(s:getAlarm())
end
else
log("Alarm", "at global process limit.")
end
-- checks if a statusfile write has been delayed
checkAlarm(StatusFile.getAlarm())
-- checks for an userAlarm
checkAlarm(UserAlarms.getAlarm())
log("Debug","getAlarm returns: ",alarm)
log("Alarm","runner.getAlarm returns: ",alarm)
return alarm
end
@ -2911,7 +2982,7 @@ function runner.collector(pid, exitcode)
return 0
end
----
-----
-- Called by core when an overflow happened.
--
function runner.overflow()
@ -2919,7 +2990,7 @@ function runner.overflow()
lsyncdStatus = "fade"
end
----
-----
-- Called by core on a hup signal.
--
function runner.hup()
@ -2927,7 +2998,7 @@ function runner.hup()
lsyncdStatus = "fade"
end
----
-----
-- Called by core on a term signal.
--
function runner.term()
@ -2991,6 +3062,10 @@ function spawn(agent, binary, ...)
local pid = lsyncd.exec(binary, ...)
if pid and pid > 0 then
processCount = processCount + 1
if settings.maxProcesses and processCount > settings.maxProcesses then
error("Spawned too much processes!")
end
local sync = InletFactory.getSync(agent)
-- delay or list
if dol.status then
@ -3097,7 +3172,7 @@ local default_rsync = {
-- gets all events ready for syncing
local elist = inlet.getEvents(
function(event)
return event.etype ~= "Blanket"
return event.etype ~= "Init" and event.etype ~= "Blanket"
end
)
@ -3177,15 +3252,9 @@ local default_rsync = {
-----
-- Spawns the recursive startup sync
--
init = function(inlet)
local config = inlet.getConfig()
local event = inlet.createBlanketEvent()
event.isStartup = true -- marker for user scripts
if string.sub(config.target, -1) ~= "/" then
config.target = config.target .. "/"
end
init = function(event)
local config = event.config;
local inlet = event.inlet;
local excludes = inlet.getExcludes();
if #excludes == 0 then
log("Normal", "recursive startup rsync: ", config.source,
@ -3223,6 +3292,11 @@ local default_rsync = {
end
config.rsyncOpts = config.rsyncOps
end
-- appends a / to target if not present
if string.sub(config.target, -1) ~= "/" then
config.target = config.target .. "/"
end
end,
-----
@ -3310,7 +3384,8 @@ local default_rsyncssh = {
function(e)
return e.etype ~= "Move" and
e.etype ~= "Delete" and
e.etype ~= "Blanket"
e.etype ~= "Init" and
e.etype ~= "Blanket"
end)
local paths = elist.getPaths()
@ -3338,7 +3413,7 @@ local default_rsyncssh = {
-- Called when collecting a finished child process
--
collect = function(agent, exitcode)
if not agent.isList and agent.etype == "Blanket" then
if not agent.isList and agent.etype == "Init" then
if exitcode == 0 then
log("Normal", "Startup of '",agent.source,"' finished.")
elseif rsync_exitcodes[exitcode] == "again" then
@ -3385,14 +3460,9 @@ local default_rsyncssh = {
-----
-- Spawns the recursive startup sync
--
init = function(inlet)
local config = inlet.getConfig()
local event = inlet.createBlanketEvent()
event.isStartup = true -- marker for user scripts
if string.sub(config.targetdir, -1) ~= "/" then
config.targetdir = config.targetdir .. "/"
end
init = function(event)
local config = event.config
local inlet = event.inlet
local excludes = inlet.getExcludes();
if #excludes == 0 then
log("Normal", "recursive startup rsync: ", config.source,
@ -3438,6 +3508,11 @@ local default_rsyncssh = {
if not config.targetdir then
error("default.rsyncssh needs 'targetdir' configured", 4)
end
-- appends a slash to the targetdir if missing
if string.sub(config.targetdir, -1) ~= "/" then
config.targetdir = config.targetdir .. "/"
end
end,
-----
@ -3537,7 +3612,7 @@ local default_direct = {
collect = function(agent, exitcode)
local config = agent.config
if not agent.isList and agent.etype == "Blanket" then
if not agent.isList and agent.etype == "Init" then
if exitcode == 0 then
log("Normal", "Startup of '",agent.source,"' finished.")
elseif rsync_exitcodes and
@ -3639,7 +3714,7 @@ default = {
collect = function(agent, exitcode)
local config = agent.config
if not agent.isList and agent.etype == "Blanket" then
if not agent.isList and agent.etype == "Init" then
if exitcode == 0 then
log("Normal", "Startup of '",agent.source,"' finished.")
elseif config.exitcodes and
@ -3681,22 +3756,20 @@ default = {
-----
-- called on (re)initalizing of Lsyncd.
--
init = function(inlet)
local config = inlet.getConfig()
init = function(event)
local config = event.config
local inlet = event.inlet
-- user functions
-- calls a startup if given by user script.
if type(config.onStartup) == "function" then
local event = inlet.createBlanketEvent()
event.isStartup = true -- marker for user scripts
local startup = config.onStartup(event)
if event.status == "wait" then
-- user script did not spawn anything
-- thus the blanket event is deleted again.
inlet.discardEvent(event)
end
-- TODO honor some return codes of startup like "warmstart".
end
if event.status == "wait" then
-- user script did not spawn anything
-- thus the blanket event is deleted again.
inlet.discardEvent(event)
end
end,
-----

76
tests/schedule.lua Executable file
View File

@ -0,0 +1,76 @@
#!/usr/bin/lua
require("posix")
dofile("tests/testlib.lua")
cwriteln("****************************************************************")
cwriteln(" Testing Lsyncd scheduler ")
cwriteln("****************************************************************")
local tdir, srcdir, trgdir = mktemps()
local logfile = tdir .. "log"
local cfgfile = tdir .. "config.lua"
local logs = {"-log", "all" }
writefile(cfgfile, [[
settings = {
logfile = "]]..logfile..[[",
log = all,
nodaemon = true,
maxProcesses = 1
}
-- continously touches a file
acircuit = {
delay = 0,
onStartup = "sleep 3 && touch ^source/a",
onCreate = "sleep 3 && touch ^source/a",
}
-- continously touches b file
bcircuit = {
delay = 0,
onStartup = "sleep 3 && touch ^source/b",
onCreate = "sleep 3 && touch ^source/b",
}
-- continously touches c file
ccircuit = {
delay = 0,
onStartup = "sleep 3 && touch ^source/c",
onCreate = "sleep 3 && touch ^source/c",
}
sync {acircuit, source ="]]..srcdir..[[", target = "]]..trgdir..[["}
sync {bcircuit, source ="]]..srcdir..[[", target = "]]..trgdir..[["}
sync {ccircuit, source ="]]..srcdir..[[", target = "]]..trgdir..[["}
]]);
-- test if the filename exists, fails if this is different to expect
local function testfile(filename)
local stat, err = posix.stat(filename)
if not stat then
cwriteln("failure: ",filename," missing");
os.exit(1);
end
end
cwriteln("starting Lsyncd");
local pid = spawn("./lsyncd", cfgfile, unpack(logs));
cwriteln("waiting for Lsyncd to do a few cycles");
posix.sleep(30)
cwriteln("look if every circle got a chance to run");
testfile(srcdir.."a")
testfile(srcdir.."b")
testfile(srcdir.."c")
cwriteln("killing started Lsyncd");
posix.kill(pid);
local _, exitmsg, lexitcode = posix.wait(lpid);
cwriteln("Exitcode of Lsyncd = ", exitmsg, " ", lexitcode);
posix.sleep(1);
if lexitcode == 0 then
cwriteln("OK");
end
os.exit(lexitcode);
-- TODO remove temp