This commit is contained in:
Axel Kittenberger 2010-11-12 18:52:43 +00:00
parent 14669a70dc
commit 9246cecb87
2 changed files with 222 additions and 61 deletions

View File

@ -58,5 +58,5 @@ slowbash = {
end, end,
} }
sync{slowbash, source="s", target="d/"} sync{default.rsync, source="s", target="d/"}

View File

@ -232,13 +232,17 @@ local Inlet, InletControl = (function()
-- lua runner controlled variables -- lua runner controlled variables
local sync local sync
----- -----
-- table to receive the delay of a event. -- table to receive the delay of an event.
local e2d = {} local e2d = {}
-- doesnt stop the garbage collect to remove either. -- doesnt stop the garbage collect to remove entries.
setmetatable(e2d, { __mode = 'kv' }) setmetatable(e2d, { __mode = 'kv' })
-- table to receive the delay list of an event list.
local el2dl = {}
-- doesnt stop the garbage collect to remove entries.
setmetatable(el2dl, { __mode = 'kv' })
----- -----
-- removes the trailing slash from a path -- removes the trailing slash from a path
local function cutSlash(path) local function cutSlash(path)
@ -258,7 +262,7 @@ local Inlet, InletControl = (function()
end end
----- -----
-- Interface for userscripts to get event fields.. -- Interface for user scripts to get event fields.
-- --
local eventFields = { local eventFields = {
----- -----
@ -278,13 +282,19 @@ local Inlet, InletControl = (function()
-- "Create", -- "Create",
-- "Delete", -- "Delete",
-- "Modify", -- "Modify",
-- "MoveFr", -- "Move",
-- "MoveTo"
-- --
etype = function(event) etype = function(event)
return e2d[event].etype return e2d[event].etype
end, end,
-----
-- Tells script this isnt a list.
--
isList = function()
return false
end,
----- -----
-- Returns 'Fr'/'To' for events of moves. -- Returns 'Fr'/'To' for events of moves.
move = function(event) move = function(event)
@ -394,7 +404,7 @@ local Inlet, InletControl = (function()
} }
----- -----
-- Retrievs event fields for the user. -- Retrievs event fields for the user script.
-- --
local eventMeta = { local eventMeta = {
__index = function(t, k) __index = function(t, k)
@ -411,8 +421,59 @@ local Inlet, InletControl = (function()
} }
----- -----
-- Turns a delay to a event. -- Interface for user scripts to get event fields.
-- Encapsulates a delay into an event for the user --
local eventListFuncs = {
-----
-- Returns the pathnames of all events.
--
getPathnames = function(elist, delimiter)
local dlist = el2dl[elist]
if not dlist then
error("cannot find delay list from event list.")
end
if not delimiter then
delimiter = '\n'
end
local pl = {}
local i = 1
for k, d in pairs(dlist) do
if type(k) == "number" then
pl[i] = d.path
i = i + 1
if d.path2 then
pl[i] = d.path2
i = i + 1
end
end
end
return table.concat(pl, delimiter) .. delimiter
end,
}
-----
-- Retrievs event list fields for the user script.
--
local eventListMeta = {
__index = function(t, k)
if k == "isList" then
return true
end
local f = eventListFuncs[k]
if not f then
error("event list does not have function '"..k.."'", 2)
end
return function()
return f(t)
end
end
}
-----
-- Encapsulates a delay into an event for the user script.
-- --
local function d2e(delay) local function d2e(delay)
if delay.etype ~= "Move" then if delay.etype ~= "Move" then
@ -444,6 +505,19 @@ local Inlet, InletControl = (function()
end end
end end
-----
-- Encapsulates a delay list into an event list for the user script.
--
local function dl2el(dlist)
if not dlist.elist then
local elist = {}
dlist.elist = elist
setmetatable(elist, eventListMeta)
el2dl[elist] = dlist
end
return dlist.elist
end
----- -----
-- Creates a blanketEvent that blocks everything -- Creates a blanketEvent that blocks everything
@ -473,6 +547,14 @@ local Inlet, InletControl = (function()
return d2e(sync:getNextDelay(lysncd.now())) return d2e(sync:getNextDelay(lysncd.now()))
end end
-----
-- Gets all events that are not blocked by active events.
--
local function getEvents()
local dlist = sync:getDelays()
return dl2el(dlist)
end
----- -----
-- Returns the configuration table specified by sync{} -- Returns the configuration table specified by sync{}
-- --
@ -490,10 +572,24 @@ local Inlet, InletControl = (function()
end end
----- -----
-- Return the inner config -- Returns the delay from a event.
-- not to be called from user -- not to be called from user script.
local function getInterior(event) local function getDelay(event)
return sync, e2d[event] return e2d[event]
end
-----
-- Returns the delay list from a event list.
-- not to be called from user script.
local function getDelayList(elist)
return el2dl[elist]
end
-----
-- Return the currentsync
-- not to be called from user script.
local function getSync()
return sync
end end
----- -----
@ -501,12 +597,16 @@ local Inlet, InletControl = (function()
-- this one is split, one for user one for runner. -- this one is split, one for user one for runner.
return { return {
getEvent = getEvent, getEvent = getEvent,
getEvents = getEvents,
getConfig = getConfig, getConfig = getConfig,
createBlanketEvent = createBlanketEvent, createBlanketEvent = createBlanketEvent,
}, { }, {
setSync = setSync, setSync = setSync,
getInterior = getInterior, -- TODO <- remove getSync = getSync,
getDelay = getDelay,
getDelayList = getDelayList,
d2e = d2e, d2e = d2e,
dl2el = dl2el,
} }
end)() end)()
@ -557,17 +657,31 @@ local Sync = (function()
-- not a child of this sync. -- not a child of this sync.
return return
end end
if delay.status then
-- collected an event
if delay.status ~= "active" then if delay.status ~= "active" then
error("internal fail, collecting a non-active process") error("internal fail, collecting a non-active process")
end end
InletControl.setSync(self) InletControl.setSync(self)
local rc = self.config.collect(InletControl.d2e(delay), exitcode) local rc = self.config.collect(InletControl.d2e(delay), exitcode)
-- TODO honor return codes of the collect -- TODO honor return codes of the collect?
removeDelay(self, delay) removeDelay(self, delay)
log("Delay","Finish of ",delay.etype," on ", log("Delay","Finish of ",delay.etype," on ",
self.source,delay.path," = ",exitcode) self.source,delay.path," = ",exitcode)
else
log("Delay", "collected a list")
InletControl.setSync(self)
local rc = self.config.collect(InletControl.dl2el(delay), exitcode)
-- TODO honor return codes of collect?
for k, d in pairs(delay) do
if type(k) == "number" then
removeDelay(self, d)
end
end
log("Delay","Finished list = ",exitcode)
end
self.processes[pid] = nil self.processes[pid] = nil
end end
@ -712,20 +826,32 @@ local Sync = (function()
return nil return nil
end end
----- -----
-- Gets the next event to be processed. -- Gets all delays that are not blocked by active delays.
-- --
local function getNextDelay(self, now) local function getDelays(self)
local dlist = {}
local blocks = {}
----
-- inheritly transfers all blocks from delay
--
local function getBlocks(delay)
blocks[delay] = true
for i, d in ipairs(delay.blocks) do
getBlocks(d)
end
end
for i, d in ipairs(self.delays) do for i, d in ipairs(self.delays) do
if d.alarm ~= true and lsyncd.clockbefore(now, d.alarm) then if d.status == "active" then
-- reached point in stack where delays are in future getBlocks(d)
return nil elseif not blocks[d] then
end dlist[i] = d
if d.status == "wait" then
-- found a waiting delay
return d
end end
end end
return dlist
end end
----- -----
@ -754,6 +880,22 @@ local Sync = (function()
end end
end end
-----
-- Gets the next event to be processed.
--
local function getNextDelay(self, now)
for i, d in ipairs(self.delays) do
if d.alarm ~= true and lsyncd.clockbefore(now, d.alarm) then
-- reached point in stack where delays are in future
return nil
end
if d.status == "wait" then
-- found a waiting delay
return d
end
end
end
------ ------
-- adds and returns a blanket delay thats blocks all -- adds and returns a blanket delay thats blocks all
@ -801,6 +943,7 @@ local Sync = (function()
delay = delay, delay = delay,
addBlanketDelay = addBlanketDelay, addBlanketDelay = addBlanketDelay,
getAlarm = getAlarm, getAlarm = getAlarm,
getDelays = getDelays,
getNextDelay = getNextDelay, getNextDelay = getNextDelay,
invokeActions = invokeActions, invokeActions = invokeActions,
removeDelay = removeDelay, removeDelay = removeDelay,
@ -1479,9 +1622,23 @@ function spawn(agent, binary, ...)
end end
local pid = lsyncd.exec(binary, ...) local pid = lsyncd.exec(binary, ...)
if pid and pid > 0 then if pid and pid > 0 then
local sync, delay = InletControl.getInterior(agent) local sync = InletControl.getSync()
local delay = InletControl.getDelay(agent)
if delay then
delay.status = "active" delay.status = "active"
sync.processes[pid] = delay sync.processes[pid] = delay
else
local dlist = InletControl.getDelayList(agent)
if not dlist then
error("spawning with an unknown agent", 2)
end
for k, d in pairs(dlist) do
if type(k) == "number" then
d.status = "active"
end
end
sync.processes[pid] = dlist
end
end end
end end
@ -1520,16 +1677,17 @@ end
local defaultRsync = { local defaultRsync = {
----- -----
-- Called for every sync/target pair on startup -- Called for every sync/target pair on startup
startup = function(source, config) --
log("Normal", "startup recursive rsync: ", source, " -> ", target) action = function(inlet)
return exec("/usr/bin/rsync", "-ltrs", source, target) local elist = inlet.getEvents()
local pathnames = elist.getPathnames()
log("Normal", "rsyncing list\n", pathnames)
return spawn(elist, "/tmp/input", "<", pathnames)
end, end,
default = function(inlet) -----
-- TODO -- Default delay 3 seconds
--return exec("/usr/bin/rsync", "--delete", "-ltds", delay = 3,
-- source.."/"..path, target.."/"..path)
end
} }
----- -----
@ -1613,18 +1771,22 @@ default = {
----- -----
-- Called when collecting a finished child process -- Called when collecting a finished child process
-- --
collect = function(event, exitcode) collect = function(agent, exitcode)
if event.etype == "Blanket" then if agent.isList then
if exitcode == 0 then log("Normal", "Finished a list = ",exitcode)
log("Normal", "Startup of '",event.source,"' finished.")
else else
log("Error", "Failure on startup of '",event.source,"'.") if agent.etype == "Blanket" then
if exitcode == 0 then
log("Normal", "Startup of '",agent.source,"' finished.")
else
log("Error", "Failure on startup of '",agent.source,"'.")
terminate(-1) -- ERRNO terminate(-1) -- ERRNO
end end
return return
end end
log("Normal", "Finished ",event.etype, log("Normal", "Finished ",agent.etype,
" on ",event.sourcePath," = ",exitcode) " on ",agent.sourcePath," = ",exitcode)
end
end, end,
----- -----
@ -1632,7 +1794,6 @@ default = {
-- --
init = function(inlet) init = function(inlet)
local config = inlet.getConfig() local config = inlet.getConfig()
-- creates a prior startup if configured -- creates a prior startup if configured
if type(config.onStartup) == "function" then if type(config.onStartup) == "function" then
local event = inlet.createBlanketEvent() local event = inlet.createBlanketEvent()