Every Sync has now its own inlet.

This commit is contained in:
Axel Kittenberger 2010-11-30 22:56:34 +00:00
parent 8aec4ffb34
commit 5bc66eb1f9
1 changed files with 191 additions and 243 deletions

View File

@ -408,27 +408,18 @@ local Combiner = (function()
end)()
-----
-- User interface to grap events
-- Creates inlets for syncs, the user interface for events.
--
-- InletControl is the runners part to control the interface
-- hidden from the user.
--
local getInlet
local Inlet, InletControl = (function()
-- lua runner controlled variables
local sync
-----
local InletFactory = (function()
-- table to receive the delay of an event.
-- or the delay list of an event list.
local e2d = {}
-- doesnt stop the garbage collect to remove entries.
-- table to receive the sync of an event or event list
local e2s = {}
-- dont stop the garbage collect to remove entries.
setmetatable(e2d, { __mode = 'kv' })
setmetatable(e2s, { __mode = 'kv' })
-- table to receive the delay list of an event list.
local el2dl = {}
-- doesnt stop the garbage collect to remove entries.
setmetatable(el2dl, { __mode = 'kv' })
-----
-- removes the trailing slash from a path
local function cutSlash(path)
@ -438,7 +429,7 @@ local Inlet, InletControl = (function()
return path
end
end
local function getPath(event)
if event.move ~= "To" then
return e2d[event].path
@ -458,21 +449,18 @@ local Inlet, InletControl = (function()
-- TODO give user a readonly version.
--
config = function(event)
return sync.config
return e2s[event].config
end,
inlet = function(event)
return getInlet()
return e2s[event].inlet
end,
-----
-- Returns the type of the event.
-- Can be:
-- "Attrib",
-- "Create",
-- "Delete",
-- "Modify",
-- "Move",
-- "Attrib", "Create", "Delete",
-- "Modify" or "Move",
--
etype = function(event)
return e2d[event].etype
@ -485,17 +473,6 @@ local Inlet, InletControl = (function()
return false
end,
-----
-- Returns 'Fr'/'To' for events of moves.
move = function(event)
local d = e2d[event]
if d.move then
return d.move
else
return ""
end
end,
-----
-- Status
status = function(event)
@ -516,7 +493,7 @@ local Inlet, InletControl = (function()
name = function(event)
return string.match(getPath(event), "[^/]+/?$")
end,
-----
-- Returns the name of the file/dir.
-- Excludes a trailing slash for dirs.
@ -532,7 +509,7 @@ local Inlet, InletControl = (function()
path = function(event)
return getPath(event)
end,
-----
-- Returns the directory of the file/dir relative to watch root
-- Always includes a trailing slash.
@ -548,13 +525,13 @@ local Inlet, InletControl = (function()
pathname = function(event)
return cutSlash(getPath(event))
end,
------
-- Returns the absolute path of the watch root.
-- All symlinks will have been resolved.
--
source = function(event)
return sync.source
return e2s[event].source
end,
------
@ -562,26 +539,26 @@ local Inlet, InletControl = (function()
-- Includes a trailing slash for dirs.
--
sourcePath = function(event)
return sync.source .. getPath(event)
return e2s[event].source .. getPath(event)
end,
------
-- Returns the absolute dir of the file/dir.
-- Includes a trailing slash.
--
sourcePathdir = function(event)
return sync.source ..
return e2s[event].source ..
(string.match(getPath(event), "^(.*/)[^/]+/?") or "")
end,
------
-- Returns the absolute path of the file/dir.
-- Excludes a trailing slash for dirs.
--
sourcePathname = function(event)
return sync.source .. cutSlash(getPath(event))
return e2s[event].source .. cutSlash(getPath(event))
end,
------
-- Returns the target.
-- Just for user comfort, for most case
@ -590,101 +567,57 @@ local Inlet, InletControl = (function()
-- this is completly up to the action scripts.)
--
target = function(event)
return sync.config.target
return e2s[event].config.target
end,
------
-- Returns the relative dir/file appended to the target.
-- Includes a trailing slash for dirs.
--
targetPath = function(event)
return sync.config.target .. getPath(event)
return e2s[event].config.target .. getPath(event)
end,
------
-- Returns the dir of the dir/file appended to the target.
-- Includes a trailing slash.
--
targetPathdir = function(event)
return sync.config.target ..
return e2s[event].config.target ..
(string.match(getPath(event), "^(.*/)[^/]+/?") or "")
end,
------
-- Returns the relative dir/file appended to the target.
-- Excludes a trailing slash for dirs.
--
targetPathname = function(event)
return sync.config.target .. cutSlash(getPath(event))
return e2s[event].config.target ..
cutSlash(getPath(event))
end,
}
-----
-- Retrievs event fields for the user script.
--
local eventMeta = {
__index = function(t, k)
local f = eventFields[k]
__index = function(event, field)
local f = eventFields[field]
if not f then
if k == 'move' then
if field == 'move' then
-- possibly undefined
return nil
end
error("event does not have field '"..k.."'", 2)
error("event does not have field '"..field.."'", 2)
end
return f(t)
return f(event)
end
}
------
-- registers an alarm.
alarm = function(timestamp, func, extra)
return sync:addAlarm(timestamp, func, extra)
end
-----
-- adds an exclude.
--
local function addExclude(pattern)
sync:addExclude(pattern)
end
-----
-- removes an exclude.
--
local function rmExclude(pattern)
sync:rmExclude(pattern)
end
-----
-- Interface for user scripts to get event fields.
--
local eventListFuncs = {
-----
-- Returns a list of file/dirnames of all events in list.
--
--getNames = function(elist)
-- local dlist = el2dl[elist]
-- if not dlist then
-- error("cannot find delay list from event list.")
-- end
-- local pl = {}
-- local i = 1
-- for k, d in pairs(dlist) do
-- if type(k) == "number" then
-- pl[i] = string.match(d.path, "[^/]+/?$")
-- i = i + 1
-- if d.path2 then
-- pl[i] = string.match(d.path2, "[^/]+/?$")
-- i = i + 1
-- end
-- end
-- end
-- return pl
--end,
-----
-- Returns a list of paths of all events in list.
--
@ -693,7 +626,7 @@ local Inlet, InletControl = (function()
-- returns one or two strings to add.
--
getPaths = function(elist, mutator)
local dlist = el2dl[elist]
local dlist = e2d[elist]
if not dlist then
error("cannot find delay list from event list.")
end
@ -713,67 +646,53 @@ local Inlet, InletControl = (function()
end
end
return result
end,
-----
-- Returns a list of absolutes local paths in list.
--
--getSourcePaths = function(elist)
-- local dlist = el2dl[elist]
-- if not dlist then
-- error("cannot find delay list from event list.")
-- end
-- local pl = {}
-- local i = 1
-- for k, d in pairs(dlist) do
-- if type(k) == "number" then
-- pl[i] = sync.source .. d.path
-- i = i + 1
-- if d.path2 then
-- pl[i] = sync.source .. d.path2
-- i = i + 1
-- end
-- end
-- end
-- return pl
--end,
end
}
-----
-- Retrievs event list fields for the user script.
--
local eventListMeta = {
__index = function(t, k)
if k == "isList" then
__index = function(elist, func)
if func == "isList" then
return true
end
if k == "config" then
return sync.config
if func == "config" then
return e2s[elist].config
end
local f = eventListFuncs[k]
local f = eventListFuncs[func]
if not f then
error("event list does not have function '"..k.."'", 2)
error("event list does not have function '"..func.."'", 2)
end
return function(...)
return f(t, ...)
return f(elist, ...)
end
end
}
----
-- list of all inlets and their syncs
--
local inlets = {}
-- dont stop the garbage collect to remove entries.
setmetatable(inlets, { __mode = 'kv' })
-----
-- Encapsulates a delay into an event for the user script.
--
local function d2e(delay)
local function d2e(sync, delay)
if delay.etype ~= "Move" then
if not delay.event then
local event = {}
delay.event = event
setmetatable(event, eventMeta)
e2d[event] = delay
e2s[event] = sync
end
return delay.event
else
@ -786,10 +705,12 @@ local Inlet, InletControl = (function()
setmetatable(event, eventMeta)
setmetatable(event2, eventMeta)
e2d[delay.event] = delay
e2d[delay.event] = delay
e2d[delay.event2] = delay
e2s[delay.event] = sync
e2s[delay.event2] = sync
-- move events have a field 'event'
-- move events have a field 'move'
event.move = "Fr"
event2.move = "To"
end
@ -800,112 +721,135 @@ local Inlet, InletControl = (function()
-----
-- Encapsulates a delay list into an event list for the user script.
--
local function dl2el(dlist)
local function dl2el(sync, dlist)
if not dlist.elist then
local elist = {}
dlist.elist = elist
setmetatable(elist, eventListMeta)
el2dl[elist] = dlist
e2d[elist] = dlist
e2s[elist] = sync
end
return dlist.elist
end
-----
-- TODO
--
local inletFuncs = {
-----
-- adds an exclude.
--
addExclude = function(sync, pattern)
sync:addExclude(pattern)
end,
-----
-- Creates a blanketEvent that blocks everything
-- and is blocked by everything.
--
local function createBlanketEvent()
return d2e(sync:addBlanketDelay())
end
-----
-- Discards a waiting event.
--
local function discardEvent(event)
local delay = e2d[event]
if delay.status ~= "wait" then
log("Error", "Ignored try to cancel a non-waiting event of type ",
event.etype)
return
end
sync:removeDelay(delay)
end
-----
-- Gets the next not blocked event from queue.
--
local function getEvent()
return d2e(sync:getNextDelay(now()))
end
-----
-- removes an exclude.
--
rmExclude = function(sync, pattern)
sync:rmExclude(pattern)
end,
-----
-- Gets all events that are not blocked by active events.
--
-- @param if not nil a function to test each delay
--
local function getEvents(test)
local dlist = sync:getDelays(test)
return dl2el(dlist)
end
-----
-- Creates a blanketEvent that blocks everything
-- and is blocked by everything.
--
createBlanketEvent = function(sync)
return d2e(sync, sync:addBlanketDelay())
end,
-----
-- Discards a waiting event.
--
discardEvent = function(sync, event)
local delay = e2d[event]
if delay.status ~= "wait" then
log("Error",
"Ignored cancel of a non-waiting event of type ",
event.etype)
return
end
sync:removeDelay(delay)
end,
-----
-- Gets the next not blocked event from queue.
--
getEvent = function(sync)
return d2e(sync, sync:getNextDelay(now()))
end,
-----
-- Gets all events that are not blocked by active events.
--
-- @param if not nil a function to test each delay
--
getEvents = function(sync, test)
local dlist = sync:getDelays(test)
return dl2el(sync, dlist)
end,
-----
-- Returns the configuration table specified by sync{}
--
getConfig = function(sync)
-- TODO gives a readonly handler only.
return sync.config
end,
}
-----
-- Returns the configuration table specified by sync{}
-- Forwards access to inlet functions.
--
local function getConfig()
-- TODO give a readonly handler only.
return sync.config
end
local inletMeta = {
__index = function(inlet, f)
local f = inletFuncs[f]
if not f then
error("inlet does not have function '"..f.."'", 2)
end
return function(...)
return f(inlets[inlet], ...)
end
end,
}
-----
-- Interface for lsyncd runner to control what
-- the inlet will present the user.
--
local function setSync(setSync)
sync = setSync
-- Creates a new inlet for Sync
local function newInlet(sync)
-- lua runner controlled variables
local inlet = {}
-- sets use access methods
setmetatable(inlet, inletMeta)
inlets[inlet] = sync
return inlet
end
-----
-- Returns the delay from a event.
-- not to be called from user script.
local function getDelay(event)
--
local function getDelayOrList(event)
return e2d[event]
end
-----
-- Returns the delay list from a event list.
-- not to be called from user script.
local function getDelayList(elist)
return el2dl[elist]
end
-----
-- Return the currentsync
-- not to be called from user script.
local function getSync()
return sync
-- Returns the sync from an event or list
--
local function getSync(agent)
return e2s[agent]
end
-----
-- public interface.
-- this one is split, one for user one for runner.
return {
addExclude = addExclude,
alarm = alarm,
createBlanketEvent = createBlanketEvent,
discardEvent = discardEvent,
getEvent = getEvent,
getEvents = getEvents,
getConfig = getConfig,
rmExclude = rmExclude,
}, {
d2e = d2e,
dl2el = dl2el,
getDelay = getDelay,
getDelayList = getDelayList,
getSync = getSync,
setSync = setSync,
}
getDelayOrList = getDelayOrList,
d2e = d2e,
dl2el = dl2el,
getSync = getSync,
newInlet = newInlet,
}
end)()
-----
@ -1115,12 +1059,13 @@ local Sync = (function()
end
if delay.status then
-- collected an event
log("Delay", "collected an event")
if delay.status ~= "active" then
error("collecting a non-active process")
end
InletControl.setSync(self)
local rc = self.config.collect(InletControl.d2e(delay), exitcode)
local rc = self.config.collect(
InletFactory.d2e(self, delay),
exitcode)
if rc == "die" then
log("Error", "Critical exitcode.");
terminate(-1) --ERRNO
@ -1142,8 +1087,9 @@ local Sync = (function()
end
else
log("Delay", "collected a list")
InletControl.setSync(self)
local rc = self.config.collect(InletControl.dl2el(delay), exitcode)
local rc = self.config.collect(
InletFactory.dl2el(self, delay),
exitcode)
if rc == "die" then
log("Error", "Critical exitcode.");
terminate(-1) --ERRNO
@ -1163,7 +1109,6 @@ local Sync = (function()
d.status = "wait"
end
end
end
for k, d in pairs(delay) do
if type(k) == "number" then
@ -1355,7 +1300,7 @@ local Sync = (function()
for i, d in ipairs(self.delays) do
if d.status == "active" or
(test and not test(InletControl.d2e(d)))
(test and not test(InletFactory.d2e(self, d)))
then
getBlocks(d)
elseif not blocks[d] then
@ -1389,8 +1334,7 @@ local Sync = (function()
end
if d.status == "wait" then
-- found a waiting delay
InletControl.setSync(self)
self.config.action(Inlet)
self.config.action(self.inlet)
if self.processes:size() >= self.config.maxProcesses then
-- no further processes
return
@ -1400,9 +1344,8 @@ local Sync = (function()
-- invokes user alarms
while #self.alarms > 0 and self.alarms[1].timestamp <= timestamp do
InletControl.setSync(self)
if type(self.config.alarm) == "function" then
self.config.alarm(Inlet,
self.config.alarm(self.inlet,
self.alarms[1].timestamp, self.alarms[1].extra)
else
log("Error", "registered alarm, but there is no alarm function")
@ -1498,6 +1441,8 @@ local Sync = (function()
rmExclude = rmExclude,
statusReport = statusReport,
}
s.inlet = InletFactory.newInlet(s)
-- provides a default name if needed
if not config.name then
config.name = "Sync" .. nextDefaultName
@ -1642,6 +1587,7 @@ local Syncs = (function()
--- creates the new sync
local s = Sync.new(config)
table.insert(list, s)
return s
end
-----
@ -2639,8 +2585,7 @@ function runner.initialize()
end
if s.config.init then
InletControl.setSync(s)
s.config.init(Inlet)
s.config.init(s.inlet)
end
end
end
@ -2738,12 +2683,13 @@ end
-----
-- Main utility to create new observations.
-- @returns an Inlet to that sync.
--
function sync(opts)
if lsyncdStatus ~= "init" then
error("Sync can only be created on initialization.", 2)
end
Syncs.add(opts)
return Syncs.add(opts).inlet
end
@ -2769,22 +2715,24 @@ function spawn(agent, binary, ...)
end
local pid = lsyncd.exec(binary, ...)
if pid and pid > 0 then
local sync = InletControl.getSync()
local delay = InletControl.getDelay(agent)
if delay then
delay.status = "active"
sync.processes[pid] = delay
local sync = InletFactory.getSync(agent)
-- delay or list
local dol = InletFactory.getDelayOrList(agent)
if not dol then
error("spawning with an unknown agent", 2)
end
if dol.status then
-- is a delay
dol.status = "active"
sync.processes[pid] = dol
else
local dlist = InletControl.getDelayList(agent)
if not dlist then
error("spawning with an unknown agent", 2)
end
for k, d in pairs(dlist) do
-- is a list
for k, d in pairs(dol) do
if type(k) == "number" then
d.status = "active"
end
end
sync.processes[pid] = dlist
sync.processes[pid] = dol
end
end
end
@ -2917,7 +2865,7 @@ local default_rsync = {
log("Normal",
"Calling rsync with filter-list of new/modified files/dirs\n",
filterS)
local config = inlet.getConfig()
local config = inlet.getConfig()
spawn(elist, "/usr/bin/rsync",
"<", filter0,
config.rsyncOps,