This commit is contained in:
Axel Kittenberger 2010-11-07 09:53:39 +00:00
parent e8f0a9f57a
commit 981f6e5e42
3 changed files with 81 additions and 177 deletions

View File

@ -16,14 +16,23 @@ prefix = "sleep 1 && "
slowbash = {
delay = 5,
onStartup = function(config)
-- called on startup
local source = config.source
local target = config.target
log("Normal", "cp -r from ", source, " -> ", target)
spawnShell("all", {[0]="ok", others="fail"},
init = function(inlet)
local c = inlet.getConfig()
log("Normal", "cp -r from ", c.source, " -> ", c.target)
-- collect gets called when spawned process finished
local function collect(exitcode)
if exitcode == 0 then
log("Normal", "Startup of '"..c.source.."' finished.")
else
log("Error", "Failure on startup of '"...source.."'.")
terminate(-1) -- ERRNO
end
end
spawnShell(inlet.blanketEvent(), collect,
[[if [ "$(ls -A $1)" ]; then cp -r "$1"* "$2"; fi]],
source, target)
config.source, config.target)
end,
onCreate = function(event)

103
lsyncd.c
View File

@ -779,108 +779,6 @@ l_terminate(lua_State *L)
return 0;
}
/**
* Suspends execution until a table of child processes returned.
*
* @param (Lua stack) a table of the process ids.
* @param (Lua stack) a function of a collector to be called
* when a process finishes.
*/
int
l_waitpids(lua_State *L)
{
/* the number of pids in table */
int pidn;
/* the pid table */
int *pids;
/* the number of processes to be waited for */
int remaining = 0;
int i;
/* global function to call on finished processes */
const char * collector;
/* checks if Lua script returned a table */
luaL_checktype(L, 1, LUA_TTABLE);
if (lua_type(L, 2) == LUA_TNIL) {
collector = NULL;
} else {
collector = luaL_checkstring(L, 2);
}
/* determines size of the pid-table */
pidn = lua_objlen (L, 1);
if (pidn == 0) {
/* nothing to do on zero pids */
return 0;
}
/* reads the pid table from Lua stack */
pids = s_calloc(pidn, sizeof(int));
for(i = 0; i < pidn; i++) {
lua_rawgeti(L, 1, i + 1);
pids[i] = luaL_checkinteger(L, -1);
lua_pop(L, 1);
/* ignores zero pids */
if (pids[i]) {
remaining++;
}
}
/* starts waiting for the processes */
while(remaining) {
/* argument for waitpid, and exitcode of process */
int status, exitcode;
/* new process id in case of retry */
int newp;
/* process id of terminated child process */
int wp = waitpid(0, &status, 0);
/* if nothing really finished ignore */
if (wp == 0 || !WIFEXITED(status)) {
continue;
}
exitcode = WEXITSTATUS(status);
/* checks if the pid is one waited for */
for(i = 0; i < pidn; i++) {
if (pids[i] == wp) {
break;
}
}
if (i >= pidn) {
/* not a pid waited for */
continue;
}
/* calls the lua collector to determine further actions */
if (collector) {
printlogf(L, "Call", "startup collector");
lua_getglobal(L, "lsyncd_call_error");
lua_getglobal(L, collector);
lua_pushinteger(L, wp);
lua_pushinteger(L, exitcode);
if (lua_pcall(L, 2, 1, -4)) {
exit(-1); // ERRNO
}
newp = luaL_checkinteger(L, -1);
lua_pop(L, 2);
} else {
newp = 0;
}
/* replace the new pid in the pidtable,
or zero it on no new pid */
for(i = 0; i < pidn; i++) {
if (pids[i] == wp) {
pids[i] = newp;
if (newp == 0) {
remaining--;
}
/* does not break!
* in case there are duplicate pids (why-ever) */
}
}
}
free(pids);
return 0;
}
/**
* Configures core parameters.
*
@ -921,7 +819,6 @@ static const luaL_reg lsyncdlib[] = {
{"stackdump", l_stackdump },
{"subdirs", l_subdirs },
{"terminate", l_terminate },
{"waitpids", l_waitpids },
{NULL, NULL}
};

View File

@ -176,9 +176,9 @@ local Delay = (function()
-- Creates a new delay.
--
-- @param TODO
local function new(ename, path, alarm)
local function new(etype, path, alarm)
local o = {
ename = ename, -- TODO rename
etype = etype,
alarm = alarm,
path = path,
status = "delay",
@ -201,12 +201,12 @@ local Sync = (function()
-----
-- Puts an action on the delay stack.
--
local function delay(self, ename, time, path, path2)
log("Function", "delay(", self, ", ", ename, ", ", path, ")")
local function delay(self, etype, time, path, path2)
log("Function", "delay(", self, ", ", etype, ", ", path, ")")
local delays = self.delays
local delayname = self.delayname
if ename == "Move" and not self.config.move then
if etype == "Move" and not self.config.move then
-- if there is no move action defined, split a move as delete/create
log("Debug", "splitting Move into Delete & Create")
delay(self, "Delete", time, path, nil)
@ -221,35 +221,35 @@ local Sync = (function()
else
alarm = lsyncd.now()
end
local newd = Delay.new(ename, path, alarm)
local newd = Delay.new(etype, path, alarm)
local oldd = delayname[path]
if oldd then
-- if there is already a delay on this path.
-- decide what should happen with multiple delays.
if newd.ename == "MoveFrom" or newd.ename == "MoveTo" or
oldd.ename == "MoveFrom" or oldd.ename == "MoveTo" then
if newd.etype == "MoveFrom" or newd.etype == "MoveTo" or
oldd.etype == "MoveFrom" or oldd.etype == "MoveTo" then
-- do not collapse moves
log("Normal", "Not collapsing events with moves on ", path)
-- TODO stackinfo
return
else
local col = self.config.collapseTable[oldd.ename][newd.ename]
local col = self.config.collapseTable[oldd.etype][newd.etype]
if col == -1 then
-- events cancel each other
log("Normal", "Nullfication: ", newd.ename, " after ",
oldd.ename, " on ", path)
oldd.ename = "None"
log("Normal", "Nullfication: ", newd.etype, " after ",
oldd.etype, " on ", path)
oldd.etype = "None" -- TODO remove name block
return
elseif col == 0 then
-- events tack
log("Normal", "Stacking ", newd.ename, " after ",
oldd.ename, " on ", path)
log("Normal", "Stacking ", newd.etype, " after ",
oldd.etype, " on ", path)
-- TODO Stack pointer
else
log("Normal", "Collapsing ", newd.ename, " upon ",
oldd.ename, " to ", col, " on ", path)
oldd.ename = col
log("Normal", "Collapsing ", newd.etype, " upon ",
oldd.etype, " to ", col, " on ", path)
oldd.etype = col
return
end
end
@ -287,7 +287,6 @@ local Sync = (function()
if d and lsyncd.clockbeforeq(d.alarm, now) then
InletControl.set(sync, delay)
sync.config.action(Inlet)
invoke_action(s, d)
-- TODO do not remove
table.remove(delays, 1)
@ -295,6 +294,13 @@ local Sync = (function()
end
end
------
-- adds a blanketEvent thats blocks all (startup)
--
local function addBlanketEvent()
end
-----
-- Creates a new Sync
--
@ -492,14 +498,14 @@ local Inotifies = (function()
-----
-- Called when an event has occured.
--
-- @param ename "Attrib", "Mofify", "Create", "Delete", "Move")
-- @param etype "Attrib", "Mofify", "Create", "Delete", "Move")
-- @param wd watch descriptor (matches lsyncd.add_watch())
-- @param isdir true if filename is a directory
-- @param time time of event
-- @param filename string filename without path
-- @param filename2
--
function event(ename, wd, isdir, time, filename, filename2)
function event(etype, wd, isdir, time, filename, filename2)
local ftype;
if isdir then
ftype = "directory"
@ -509,10 +515,10 @@ local Inotifies = (function()
end
end
if filename2 then
log("Inotify", "got event ", ename, " ", filename,
log("Inotify", "got event ", etype, " ", filename,
" to ", filename2)
else
log("Inotify", "got event ", ename, " ", filename)
log("Inotify", "got event ", etype, " ", filename)
end
local ilist = wdlist[wd]
@ -530,12 +536,12 @@ local Inotifies = (function()
if filename2 then
path2 = inotify.path..filename2
end
inotify.sync:delay(ename, time, path, path2)
inotify.sync:delay(etype, time, path, path2)
-- adds subdirs for new directories
if inotify.recurse and isdir then
if ename == "Create" then
if etype == "Create" then
add(inotify.root, path, true, inotify.sync)
elseif ename == "Delete" then
elseif etype == "Delete" then
-- TODO
end
end
@ -617,7 +623,7 @@ function lsyncd_collect_process(pid, exitcode)
if not delay then
return
end
log("Debug", "collected ",pid, ": ",delay.ename," of ",
log("Debug", "collected ",pid, ": ",delay.etype," of ",
sync.source, delay.path," = ",exitcode)
sync.processes[pid] = nil
end
@ -646,6 +652,14 @@ local Inlet, InletControl = (function()
end
end
-----
-- Creates a blanketEvent that blocks everything
-- and is blocked by everything.
--
local function blanketEvent()
return sync.addBlanketEvent()
end
-----
-- Interface for the user to get fields.
local eventFields = {
@ -662,7 +676,7 @@ local Inlet, InletControl = (function()
-- "Modify"
-- "Move"
etype = function()
return delay.ename
return delay.etype
end,
-----
@ -791,18 +805,18 @@ local Inlet, InletControl = (function()
end
-----
-- public interface
return {getEvent = getEvent, getConfig = getConfig},
{set = set, getInterior = getInterior }
-- public interface.
-- this one is split, one for user one for runner.
return {
getEvent = getEvent,
getConfig = getConfig,
blanketEvent = blanketEvent
}, {
set = set,
getInterior = getInterior
}
end)()
-----
-- TODO
--
--
local function invoke_action(sync, delay)
end
----
-- Writes a status report file at most every [statusintervall] seconds.
@ -989,37 +1003,17 @@ function lsyncd_initialize()
terminate(-1) -- ERRNO
end
-- set to true if at least one sync has a startup function
local have_startup = false
-- runs through the syncs table filled by user calling directory()
for _, s in Syncs.iwalk() do
if s.config.onStartup then
have_startup = true
end
-- adds the dir watch inclusively all subdirs
Inotifies.add(s.source, "", true, s)
end
-- from now on use logging as configured instead of stdout/err.
running = true;
lsyncd.configure("running");
if have_startup then
log("Normal", "--- startup ---")
local pids = { }
for _, s in Syncs.iwalk() do
local pid
if s.config.onStartup then
local pid = s.config.onStartup(s.config)
table.insert(pids, pid)
end
-- runs through the syncs table filled by user calling directory()
for _, s in Syncs.iwalk() do
Inotifies.add(s.source, "", true, s)
if s.config.init then
InletControl(s, nil)
s.config.init(Inlet)
end
lsyncd.waitpids(pids, "startup_collector")
log("Normal", "- Entering normal operation with ",
Inotifies.size(), " monitored directories -")
else
log("Normal", "- Warmstart into normal operation with ",
Inotifies.size(), " monitored directories -")
end
end
@ -1110,11 +1104,15 @@ overflow = default_overflow
-- process blocks all events and is blocked by all
-- this is used on startup.
-- @param collect a table of exitvalues and the action that shall taken.
-- @param ... binary and arguments to execute.
-- @param binary binary to call
-- @param ... arguments
--
function spawn(agent, collect, ...)
local pid = lsyncd.exec(...)
function spawn(agent, collect, binary, ...)
local pid = lsyncd.exec(binary, ...)
if pid and pid > 0 then
if agent == "full" then
end
local sync, delay = InletControl.getInterior(agent)
delay.status = "active"
delay.collect = collect