This commit is contained in:
Axel Kittenberger 2010-11-08 12:14:10 +00:00
parent 981f6e5e42
commit 69d5b186a3
3 changed files with 331 additions and 255 deletions

View File

@ -23,16 +23,16 @@ slowbash = {
-- collect gets called when spawned process finished -- collect gets called when spawned process finished
local function collect(exitcode) local function collect(exitcode)
if exitcode == 0 then if exitcode == 0 then
log("Normal", "Startup of '"..c.source.."' finished.") log("Normal", "Startup of '",c.source,"' finished.")
else else
log("Error", "Failure on startup of '"...source.."'.") log("Error", "Failure on startup of '",c.source,"'.")
terminate(-1) -- ERRNO terminate(-1) -- ERRNO
end end
end end
spawnShell(inlet.blanketEvent(), collect, spawnShell(inlet.blanketEvent(), collect,
[[if [ "$(ls -A $1)" ]; then cp -r "$1"* "$2"; fi]], [[if [ "$(ls -A $1)" ]; then cp -r "$1"* "$2"; fi]],
config.source, config.target) c.source, c.target)
end, end,
onCreate = function(event) onCreate = function(event)

View File

@ -987,8 +987,11 @@ masterloop(lua_State *L)
if (lua_pcall(L, 0, 1, -2)) { if (lua_pcall(L, 0, 1, -2)) {
exit(-1); // ERRNO exit(-1); // ERRNO
} }
if (lua_type(L, -1) == LUA_TBOOLEAN) {
have_alarm = lua_toboolean(L, -1); have_alarm = lua_toboolean(L, -1);
if (have_alarm) { } else {
have_alarm = true;
alarm_time = (clock_t) luaL_checkinteger(L, -1); alarm_time = (clock_t) luaL_checkinteger(L, -1);
} }
lua_pop(L, 2); lua_pop(L, 2);

View File

@ -37,7 +37,7 @@ log = lsyncd.log
terminate = lsyncd.terminate terminate = lsyncd.terminate
--============================================================================ --============================================================================
-- Coding checks, ensure termination on some easy to do coding errors. -- Lsyncd Prototypes
--============================================================================ --============================================================================
----- -----
@ -181,7 +181,7 @@ local Delay = (function()
etype = etype, etype = etype,
alarm = alarm, alarm = alarm,
path = path, path = path,
status = "delay", status = "wait",
} }
return o return o
end end
@ -189,21 +189,252 @@ local Delay = (function()
return {new = new} return {new = new}
end)() end)()
-----
-- User interface to grap events
--
-- InletControl is the Luas runner part to control the interface
-- hidden from the user.
--
local Inlet, InletControl = (function()
-- lua runner controlled variables
local sync
-- key variable for delays hidden from user
local delayKey = {}
-----
-- removes the trailing slash from a path
local function cutSlash(path)
if string.byte(path, -1) == 47 then
return string.sub(path, 1, -2)
else
return path
end
end
-----
-- Interface for the user to get fields.
local eventFields = {
config = function(event)
return event[delayKey].sync.config
end,
-----
-- Returns the type of the event.
-- Can be:
-- "Attrib"
-- "Create"
-- "Delete"
-- "Modify"
-- "Move"
etype = function(event)
return event[delayKey].etype
end,
-----
-- Returns true if event relates to a directory.
isdir = function(event)
return string.byte(event[delayKey].path, -1) == 47
end,
-----
-- Returns the name of the file/dir.
-- Includes a trailing slash for dirs.
name = function()
return string.match(event[delayKey].path, "[^/]+/?$")
end,
-----
-- Returns the name of the file/dir.
-- Excludes a trailing slash for dirs.
basename = function()
return string.match(event[delayKey].path, "([^/]+)/?$")
end,
-----
-- Returns the file/dir relative to watch root
-- Includes a trailing slash for dirs.
path = function(event)
return event[delayKey].path
end,
-----
-- Returns the file/dir relativ to watch root
-- Excludes a trailing slash for dirs.
pathname = function(event)
return cutSlash(event[delayKey].path)
end,
------
-- Returns the absolute path of the watch root.
-- All symlinks will have been resolved.
source = function(event)
return sync.source
end,
------
-- Returns the absolute path of the file/dir.
-- Includes a trailing slash for dirs.
sourcePath = function(event)
return sync.source .. event[delayKey].path
end,
------
-- Returns the absolute path of the file/dir.
-- Excludes a trailing slash for dirs.
sourcePathname = function(event)
return sync.source .. cutSlash(event[delayKey].path)
end,
------
-- Returns the target.
-- Just for user comfort, for most case
-- (Actually except of here, the lsyncd.runner itself
-- does not care event about the existance of "target",
-- this is completly up to the action scripts.)
target = function(event)
return sync.config.target
end,
------
-- Returns the relative dir/file appended to the target.
-- Includes a trailing slash for dirs.
targetPath = function(event)
return sync.config.target .. event[delayKey].path
end,
------
-- Returns the relative dir/file appended to the target.
-- Excludes a trailing slash for dirs.
targetPathname = function(event)
return sync.config.target .. cutSlash(event[delayKey].path)
end,
}
-----
-- Calls event functions for the user.
local eventMeta = {
__index = function(t, k)
local f = eventFields[k]
if not f then
error("event does not have field '"..k.."'", 2)
end
return f(t)
end
}
-----
-- Encapsulates a delay to an event for the user
--
-- TODO this hidden key technique can be circumvented with
-- pairs(), use a weak table as referencer instead.
--
local function toEvent(delay)
if not delay.event then
delay.event = {}
setmetatable(delay.event, eventMeta)
delay.event[delayKey] = delay
end
return delay.event
end
-----
-- Creates a blanketEvent that blocks everything
-- and is blocked by everything.
--
local function blanketEvent()
return toEvent(sync:addBlanketDelay())
end
-----
-- Gets the next event from queue.
--
local function getEvent()
return toEvent(sync:getNextDelay(lysncd.now()))
end
-----
-- Returns the configuration table specified by sync{}
--
local function getConfig()
-- TODO give a readonly handler only.
return sync.config
end
-----
-- Interface for lsyncd runner to control what
-- the inlet will present the user.
--
local function set(setSync)
sync = setSync
end
-----
-- Return the inner config
-- not to be called from user
local function getInterior(event)
return sync, event[delayKey]
end
-----
-- public interface.
-- this one is split, one for user one for runner.
return {
getEvent = getEvent,
getConfig = getConfig,
blanketEvent = blanketEvent
}, {
set = set,
getInterior = getInterior
}
end)()
----- -----
-- Holds information about one observed directory inclusively subdirs. -- Holds information about one observed directory inclusively subdirs.
-- --
local Sync = (function() local Sync = (function()
----- -----
-- Syncs that have no name specified get an incremental default name -- Syncs that have no name specified get an incremental default name
-- --
local nextDefaultName = 1 local nextDefaultName = 1
-----
-- Collects a child process
--
local function collect(self, pid, exitcode)
local delay = self.processes[pid]
if not delay then
-- not a child of this sync.
return
end
if delay.status ~= "active" then
error("internal fail, collecting a non-active process")
end
-- TODO call user collector
local found
for i, d in ipairs(self.delays) do
if d == delay then
found = true
table.remove(self.delays, i)
break
end
end
if not found then
error("Did not find a delay!")
end
log("Debug", "collected ",pid, ": ",delay.etype," of ",
self.source, delay.path," = ",exitcode)
self.processes[pid] = nil
self.delayname[delay.path] = nil
end
----- -----
-- Puts an action on the delay stack. -- Puts an action on the delay stack.
-- --
local function delay(self, etype, time, path, path2) local function delay(self, etype, time, path, path2)
log("Function", "delay(", self, ", ", etype, ", ", path, ")") log("Function", "delay(", self.config.name,", ",etype,", ",path,"...)")
local delays = self.delays
local delayname = self.delayname local delayname = self.delayname
if etype == "Move" and not self.config.move then if etype == "Move" and not self.config.move then
@ -253,15 +484,15 @@ local Sync = (function()
return return
end end
end end
table.insert(delays, newd) table.insert(self.delays, newd)
else else
delayname[path] = newd delayname[path] = newd
table.insert(delays, newd) table.insert(self.delays, newd)
end end
end end
----- -----
-- Return the nearest alarm for this Sync. -- Returns the nearest alarm for this Sync.
-- --
local function getAlarm(self) local function getAlarm(self)
-- first checks if more processses could be spawned -- first checks if more processses could be spawned
@ -274,31 +505,68 @@ local Sync = (function()
end end
end end
-----
-- Gets the next event to be processed.
--
local function getNextDelay(self, now)
for i, d in ipairs(self.delays) do
if d.alarm ~= true and lsyncd.clockbefore(now, d.alarm) then
-- reached point in stack where delays are in future
return nil
end
if d.status == "wait" then
-- found a waiting delay
return d
end
end
end
----- -----
-- Creates new actions -- Creates new actions
-- --
local function invokeActions(self) local function invokeActions(self, now)
log("Function", "invokeActions('",self.config.name,"',",now,")")
if self.processes:size() >= self.config.maxProcesses then if self.processes:size() >= self.config.maxProcesses then
log("Debug", "no new processes")
-- no new processes
return return
end end
for _, d in ipairs(self.delays) do
local delays = self.delays log("Debug", "iter")
local d = delays[1] if d.alarm ~= true and lsyncd.clockbefore(now, d.alarm) then
if d and lsyncd.clockbeforeq(d.alarm, now) then -- reached point in stack where delays are in future
InletControl.set(sync, delay) log("Debug", "waits in future.")
sync.config.action(Inlet) return
-- TODO do not remove
table.remove(delays, 1)
s.delayname[d.path] = nil
end end
if d.status == "wait" then
-- found a waiting delay
log("Debug", "invoke")
InletControl.set(self)
log("Debug", "invoke2")
self.config.action(Inlet)
log("Debug", "invoke3")
if self.processes:size() >= self.config.maxProcesses then
-- no further processes
log("Debug", "no further processes")
return
end end
log("Debug", "finInvoke")
end
log("Debug", "next")
end
log("Debug", "finInvoke")
end
------ ------
-- adds a blanketEvent thats blocks all (startup) -- adds and returns a blanket delay thats blocks all
-- (used in startup)
-- --
local function addBlanketEvent() local function addBlanketDelay(self)
local newd = Delay.new("Blanket", "/", true)
self.delayname["/"] = newd -- TODO remove delayname
table.insert(self.delays, newd)
return newd
end end
----- -----
@ -314,21 +582,26 @@ local Sync = (function()
processes = CountArray.new(), processes = CountArray.new(),
-- functions -- functions
collect = collect,
delay = delay, delay = delay,
addBlanketDelay = addBlanketDelay,
getAlarm = getAlarm, getAlarm = getAlarm,
getNextDelay = getNextDelay,
invokeActions = invokeActions, invokeActions = invokeActions,
} }
-- provides a default name if needed -- provides a default name if needed
if not config.name then if not config.name then
config.name = "Sync" .. nextDefaultName config.name = "Sync" .. nextDefaultName
end end
-- increment default nevertheless to cause less confusion -- increments default nevertheless to cause less confusion
-- so name will be the n-th call to sync{}
nextDefaultName = nextDefaultName + 1 nextDefaultName = nextDefaultName + 1
return s return s
end end
----- -----
-- public interface -- public interface
--
return {new = new} return {new = new}
end)() end)()
@ -611,212 +884,13 @@ end
-- zombie process was collected by core. -- zombie process was collected by core.
-- --
function lsyncd_collect_process(pid, exitcode) function lsyncd_collect_process(pid, exitcode)
local delay = nil
local sync = nil
for _, s in Syncs.iwalk() do for _, s in Syncs.iwalk() do
delay = s.processes[pid] if s:collect(pid, exitcode) then
if delay then
sync = s
break
end
end
if not delay then
return return
end end
log("Debug", "collected ",pid, ": ",delay.etype," of ",
sync.source, delay.path," = ",exitcode)
sync.processes[pid] = nil
end
-----
-- User interface to grap events
--
-- InletControl is the Luas runner part to control the interface
-- hidden from the user.
--
local Inlet, InletControl = (function()
-- lua runner controlled variables
local sync = true
local delay = true
-- event to be passed to the user
local event = {}
-----
-- removes the trailing slash from a path
local function cutSlash(path)
if string.byte(path, -1) == 47 then
return string.sub(path, 1, -2)
else
return path
end end
end end
-----
-- Creates a blanketEvent that blocks everything
-- and is blocked by everything.
--
local function blanketEvent()
return sync.addBlanketEvent()
end
-----
-- Interface for the user to get fields.
local eventFields = {
config = function()
return sync.config
end,
-----
-- Returns the type of the event.
-- Can be:
-- "Attrib"
-- "Create"
-- "Delete"
-- "Modify"
-- "Move"
etype = function()
return delay.etype
end,
-----
-- Returns true if event relates to a directory.
isdir = function()
return string.byte(delay.path, -1) == 47
end,
-----
-- Returns the name of the file/dir.
-- Includes a trailing slash for dirs.
name = function()
return string.match(delay.path, "[^/]+/?$")
end,
-----
-- Returns the name of the file/dir.
-- Excludes a trailing slash for dirs.
basename = function()
return string.match(delay.path, "([^/]+)/?$")
end,
-----
-- Returns the file/dir relative to watch root
-- Includes a trailing slash for dirs.
path = function()
return delay.path
end,
-----
-- Returns the file/dir relativ to watch root
-- Excludes a trailing slash for dirs.
pathname = function()
return cutSlash(delay.path)
end,
------
-- Returns the absolute path of the watch root.
-- All symlinks will have been resolved.
source = function()
return sync.source
end,
------
-- Returns the absolute path of the file/dir.
-- Includes a trailing slash for dirs.
sourcePath = function()
return sync.source .. delay.path
end,
------
-- Returns the absolute path of the file/dir.
-- Excludes a trailing slash for dirs.
sourcePathname = function()
return sync.source .. cutSlash(delay.path)
end,
------
-- Returns the target.
-- Just for user comfort, for most case
-- (Actually except of here, the lsyncd.runner itself
-- does not care event about the existance of "target",
-- this is completly up to the action scripts.)
target = function()
return sync.config.target
end,
------
-- Returns the relative dir/file appended to the target.
-- Includes a trailing slash for dirs.
targetPath = function()
return sync.config.target .. delay.path
end,
------
-- Returns the relative dir/file appended to the target.
-- Excludes a trailing slash for dirs.
targetPathname = function()
return sync.config.target .. cutSlash(delay.path)
end,
}
-----
-- Calls event functions for the user.
local eventMeta = {
__index = function(t, k)
local f = eventFields[k]
if not f then
error("event does not have field '"..k.."'", 2)
end
return f()
end
}
setmetatable(event, eventMeta)
-----
-- Interface for lsyncd runner to control what
-- the inlet will present the user.
--
local function set(setSync, setDelay)
sync = setSync
delay = setDelay
end
-----
-- Gets the next event from queue.
--
local function getEvent()
-- TODO actually aquire here
return event
end
-----
-- Returns the configuration table specified by sync{}
--
local function getConfig()
-- TODO give a readonly handler only.
return sync.config
end
-----
-- Return the inner config
-- not to be called from user
local function getInterior(event)
return sync, delay
end
-----
-- public interface.
-- this one is split, one for user one for runner.
return {
getEvent = getEvent,
getConfig = getConfig,
blanketEvent = blanketEvent
}, {
set = set,
getInterior = getInterior
}
end)()
---- ----
-- Writes a status report file at most every [statusintervall] seconds. -- Writes a status report file at most every [statusintervall] seconds.
@ -892,14 +966,16 @@ end)()
-- @param now the current kernel time (in jiffies) -- @param now the current kernel time (in jiffies)
-- --
function lsyncd_cycle(now) function lsyncd_cycle(now)
-- goes through all targets and spawns more actions -- goes through all syncs and spawns more actions
-- if possible -- if possible
for _, s in Syncs.iwalk() do
s:invokeActions(now)
end
if settings.statusfile then if settings.statusfile then
StatusFile.write(now) StatusFile.write(now)
end end
for _, s in Syncs.iwalk() do log("Debug", "fin lsyncd_cycle")
s:invokeActions()
end
end end
@ -1011,7 +1087,7 @@ function lsyncd_initialize()
for _, s in Syncs.iwalk() do for _, s in Syncs.iwalk() do
Inotifies.add(s.source, "", true, s) Inotifies.add(s.source, "", true, s)
if s.config.init then if s.config.init then
InletControl(s, nil) InletControl.set(s)
s.config.init(Inlet) s.config.init(Inlet)
end end
end end
@ -1021,6 +1097,7 @@ end
-- Called by core to query soonest alarm. -- Called by core to query soonest alarm.
-- --
-- @return false ... no alarm, core can in untimed sleep, or -- @return false ... no alarm, core can in untimed sleep, or
-- true ... immediate action
-- times ... the alarm time (only read if number is 1) -- times ... the alarm time (only read if number is 1)
-- --
function lsyncd_get_alarm() function lsyncd_get_alarm()
@ -1030,7 +1107,8 @@ function lsyncd_get_alarm()
-- checks if current nearest alarm or a is earlier -- checks if current nearest alarm or a is earlier
-- --
local function checkAlarm(a) local function checkAlarm(a)
if not a then if alarm == true or not a then
-- already immediate or no new alarm
return return
end end
if not alarm then if not alarm then
@ -1107,15 +1185,12 @@ overflow = default_overflow
-- @param binary binary to call -- @param binary binary to call
-- @param ... arguments -- @param ... arguments
-- --
function spawn(agent, collect, binary, ...) function spawn(agent, collector, binary, ...)
local pid = lsyncd.exec(binary, ...) local pid = lsyncd.exec(binary, ...)
if pid and pid > 0 then if pid and pid > 0 then
if agent == "full" then
end
local sync, delay = InletControl.getInterior(agent) local sync, delay = InletControl.getInterior(agent)
delay.status = "active" delay.status = "active"
delay.collect = collect delay.collector = collector
sync.processes[pid] = delay sync.processes[pid] = delay
end end
end end
@ -1123,8 +1198,8 @@ end
----- -----
-- Spawns a child process using bash. -- Spawns a child process using bash.
-- --
function spawnShell(agent, collect, command, ...) function spawnShell(agent, collector, command, ...)
return spawn(agent, collect, "/bin/sh", "-c", command, "/bin/sh", ...) return spawn(agent, collector, "/bin/sh", "-c", command, "/bin/sh", ...)
end end
--============================================================================ --============================================================================
@ -1159,14 +1234,12 @@ default = {
-- TODO desc -- TODO desc
-- --
action = function(inlet) action = function(inlet)
local event = inlet.get_event() local event = inlet.getEvent()
local config = inlet.get_config() local config = inlet.getConfig()
local func = config["on".. event.etype] local func = config["on".. event.etype]
if func then if func then
-- TODO Moves? -- TODO Moves?
return func(event) func(event)
else
return -1
end end
end, end,