This commit is contained in:
Axel Kittenberger 2010-10-24 13:52:28 +00:00
parent 0ff894eee9
commit c0a13f66ee
2 changed files with 59 additions and 61 deletions

View File

@ -532,9 +532,9 @@ l_terminate(lua_State *L)
/** /**
* Suspends execution until a table of child processes returned. * Suspends execution until a table of child processes returned.
* *
* @param (Lua stack) a table of the children pids. * @param (Lua stack) a table of the process ids.
* @param (Lua stack) a function of a collector to be called * @param (Lua stack) a function of a collector to be called
* when a child finishes. * when a process finishes.
*/ */
int int
l_wait_pids(lua_State *L) l_wait_pids(lua_State *L)
@ -543,7 +543,7 @@ l_wait_pids(lua_State *L)
int pidn; int pidn;
/* the pid table */ /* the pid table */
int *pids; int *pids;
/* the number of children to be waited for */ /* the number of processes to be waited for */
int remaining = 0; int remaining = 0;
int i; int i;
/* global function to call on finished processes */ /* global function to call on finished processes */
@ -573,9 +573,9 @@ l_wait_pids(lua_State *L)
remaining++; remaining++;
} }
} }
/* starts waiting for the children */ /* starts waiting for the processes */
while(remaining) { while(remaining) {
/* argument for waitpid, and exitcode of child */ /* argument for waitpid, and exitcode of process */
int status, exitcode; int status, exitcode;
/* new process id in case of retry */ /* new process id in case of retry */
int newp; int newp;
@ -909,19 +909,19 @@ masterloop(lua_State *L)
/* collect zombified child processes */ /* collect zombified child processes */
while(1) { while(1) {
pid_t pid = waitpid(0, &status, WNOHANG);
int status; int status;
pid_t pid = waitpid(0, &status, WNOHANG);
if (pid <= 0) { if (pid <= 0) {
break; break;
} }
lua_getglobal(L, "lsyncd_collect_child"); lua_getglobal(L, "lsyncd_collect_process");
lua_pushinteger(L, pid); lua_pushinteger(L, pid);
lua_pushinteger(L, WEXITSTATUS(status)); lua_pushinteger(L, WEXITSTATUS(status));
lua_call(L, 2, 0); lua_call(L, 2, 0);
} }
/* let the runner spawn child processes */ /* let the runner spawn new processes */
lua_getglobal(L, "lsyncd_alarm"); lua_getglobal(L, "lsyncd_alarm");
lua_pushinteger(L, times(NULL)); lua_pushinteger(L, times(NULL));
lua_call(L, 1, 0); lua_call(L, 1, 0);

View File

@ -33,32 +33,22 @@ exec = lsyncd.exec
-- [#] { -- [#] {
-- actions = actions, -- actions = actions,
-- source = source_dir, -- source = source_dir,
-- targetident = target_identifier, -- targetident = the identifier of target (like string "host:dir")
-- target = link to target directory -- for lsyncd this passed competly opaquely to the
-- action handlers
--
-- .processes = [pid] .. a sublist of processes[] for this target
-- .delays = [#) { .. the delays stack
-- .atype .. enum, kind of action
-- .wd .. watch descriptor id this origins from TODO needed?
-- .sync .. link to sync that raised this delay.
-- .filename .. filename or nil (=dir itself)
-- (.movepeer) .. for MOVEFROM/MOVETO link to other delay
-- }
-- } -- }
-- --
local origins = {} local origins = {}
----
-- a array of all targets
--
-- structure:
-- [#] {
-- .ident .. the identifier of target (like string "host:dir")
-- for lsyncd this passed competly opaquely to the
-- action handlers
-- .delays = [#) { .. the delays stack
-- .atype .. enum, kind of action
-- .wd .. watch descriptor id this origins from TODO needed?
-- .sync .. link to sync that raised this delay.
-- .filename .. filename or nil (=dir itself)
-- (.movepeer) .. for MOVEFROM/MOVETO link to other delay
-- }
-
-- }
--
local targets = {}
----- -----
-- all watches -- all watches
-- --
@ -87,7 +77,7 @@ local watches = {}
-- sync .. -- sync ..
-- filename .. -- filename ..
-- --
local children = {} local processes = {}
------ ------
@ -118,8 +108,7 @@ local event_names = {
local function delay_action(atype, wd, sync, filename, time) local function delay_action(atype, wd, sync, filename, time)
print("delay_action "..event_names[atype].."("..wd..") ") print("delay_action "..event_names[atype].."("..wd..") ")
local origin = sync.origin local origin = sync.origin
local target = origin.target local delays = origin.delays
local delays = target.delays
local nd = {atype = atype, local nd = {atype = atype,
wd = wd, wd = wd,
sync = sync, sync = sync,
@ -172,22 +161,24 @@ end
-- Called from code whenever a child process finished and -- Called from code whenever a child process finished and
-- zombie process was collected by core. -- zombie process was collected by core.
-- --
function lsyncd_collect_child(pid, exitcode) function lsyncd_collect_process(pid, exitcode)
local child = children[pid] local process = processes[pid]
if child == nil then if process == nil then
return return
end end
local sync = child.sync local sync = process.sync
local origin = sync.origin local origin = sync.origin
log(DEBUG, "collected "..pid..": "..event_names[atpye].." "..origin.source.."/"..sync.path..child.filename.. log(DEBUG, "collected "..pid..": "..event_names[atpye].." "..origin.source.."/"..sync.path..process.filename..
" = "..exitcode) " = "..exitcode)
processes[pid] = nil
origin.processes[pid] = nil
end end
----- -----
-- TODO -- TODO
-- --
-- --
local function invoke_action(target, delay) local function invoke_action(delay)
local sync = delay.sync local sync = delay.sync
local origin = sync.origin local origin = sync.origin
local actions = origin.actions local actions = origin.actions
@ -221,16 +212,16 @@ local function invoke_action(target, delay)
end end
if func ~= nil then if func ~= nil then
pid = func(origin.source, sync.path, delay.filename, target.ident) pid = func(origin.source, sync.path, delay.filename, origin.targetident)
if pid ~= nil and pid > 0 then if pid ~= nil and pid > 0 then
child = {pid = pid, process = {pid = pid,
target = target, atpye = delay.atype,
atpye = delay.atype, wd = delay.wd,
wd = delay.wd, sync = delay.sync,
sync = delay.sync, filename = delay.filname
filename = delay.filname
} }
children[pid] = child processes[pid] = process
origin.processes[pid] = process
end end
end end
end end
@ -249,13 +240,16 @@ end
-- --
-- @param now the time is now -- @param now the time is now
-- --
function lsyncd_alarm(now) function lsyncd_alarm(now)
for i, target in ipairs(targets) do -- goes through all targets and spawns more actions
local delays = target.delays -- if possible
if delays[1] ~= nil then for i, o in ipairs(origins) do
invoke_action(target, target.delays[1]) if #o.processes < o.actions.max_processes then
table.remove(delays, 1) local delays = o.delays
if delays[1] ~= nil then
invoke_action(o.delays[1])
table.remove(delays, 1)
end
end end
end end
end end
@ -276,19 +270,21 @@ function lsyncd_initialize()
for i, origin in ipairs(origins) do for i, origin in ipairs(origins) do
-- resolves source to be an absolute path -- resolves source to be an absolute path
local asrc = lsyncd.real_dir(origin.source) local asrc = lsyncd.real_dir(origin.source)
local actions = origin.actions
if asrc == nil then if asrc == nil then
print("Cannot resolve source path: ", origin.source) print("Cannot resolve source path: ", origin.source)
lsyncd.terminate(-1) -- ERRNO lsyncd.terminate(-1) -- ERRNO
end end
origin.source = asrc origin.source = asrc
origin.delays = {}
-- appends the target on target lists origin.processes = {}
local target = { ident = origin.targetident, delays = {} } if actions.max_processes == nil then
table.insert(targets, target) actions.max_processes = 1 -- TODO DEFAULT MAXPROCESS
origin.target = target end
if origin.actions.startup ~= nil then if actions.startup ~= nil then
have_startup = true have_startup = true
end end
-- and add the dir watch inclusively all subdirs -- and add the dir watch inclusively all subdirs
attend_dir(origin, "", nil) attend_dir(origin, "", nil)
end end
@ -365,8 +361,7 @@ function lsyncd_event(etype, wd, isdir, filename, filename2)
end end
----- -----
-- Called by the core for every child process that -- Collector for every child process that finished in startup phase
-- finished in startup phase
-- --
-- Parameters are pid and exitcode of child process -- Parameters are pid and exitcode of child process
-- --
@ -398,6 +393,9 @@ function sync(source_dir, target_identifier, actions)
source = source_dir, source = source_dir,
targetident = target_identifier, targetident = target_identifier,
} }
if actions.max_actions == nil then
actions.max_actions = 1
end
table.insert(origins, o) table.insert(origins, o)
return return
end end