diff --git a/lsyncd.c b/lsyncd.c index 18cbba7..05e1566 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -532,9 +532,9 @@ l_terminate(lua_State *L) /** * Suspends execution until a table of child processes returned. * - * @param (Lua stack) a table of the children pids. + * @param (Lua stack) a table of the process ids. * @param (Lua stack) a function of a collector to be called - * when a child finishes. + * when a process finishes. */ int l_wait_pids(lua_State *L) @@ -543,7 +543,7 @@ l_wait_pids(lua_State *L) int pidn; /* the pid table */ int *pids; - /* the number of children to be waited for */ + /* the number of processes to be waited for */ int remaining = 0; int i; /* global function to call on finished processes */ @@ -573,9 +573,9 @@ l_wait_pids(lua_State *L) remaining++; } } - /* starts waiting for the children */ + /* starts waiting for the processes */ while(remaining) { - /* argument for waitpid, and exitcode of child */ + /* argument for waitpid, and exitcode of process */ int status, exitcode; /* new process id in case of retry */ int newp; @@ -909,19 +909,19 @@ masterloop(lua_State *L) /* collect zombified child processes */ while(1) { - pid_t pid = waitpid(0, &status, WNOHANG); int status; + pid_t pid = waitpid(0, &status, WNOHANG); if (pid <= 0) { break; } - lua_getglobal(L, "lsyncd_collect_child"); + lua_getglobal(L, "lsyncd_collect_process"); lua_pushinteger(L, pid); lua_pushinteger(L, WEXITSTATUS(status)); lua_call(L, 2, 0); } - /* let the runner spawn child processes */ + /* let the runner spawn new processes */ lua_getglobal(L, "lsyncd_alarm"); lua_pushinteger(L, times(NULL)); lua_call(L, 1, 0); diff --git a/lsyncd.lua b/lsyncd.lua index 1a5b452..ad5bf3d 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -33,32 +33,22 @@ exec = lsyncd.exec -- [#] { -- actions = actions, -- source = source_dir, --- targetident = target_identifier, --- target = link to target directory +-- targetident = the identifier of target (like string "host:dir") +-- for lsyncd this passed competly opaquely to the +-- action handlers +-- +-- .processes = [pid] .. a sublist of processes[] for this target +-- .delays = [#) { .. the delays stack +-- .atype .. enum, kind of action +-- .wd .. watch descriptor id this origins from TODO needed? +-- .sync .. link to sync that raised this delay. +-- .filename .. filename or nil (=dir itself) +-- (.movepeer) .. for MOVEFROM/MOVETO link to other delay +-- } -- } -- local origins = {} ----- --- a array of all targets --- --- structure: --- [#] { --- .ident .. the identifier of target (like string "host:dir") --- for lsyncd this passed competly opaquely to the --- action handlers --- .delays = [#) { .. the delays stack --- .atype .. enum, kind of action --- .wd .. watch descriptor id this origins from TODO needed? --- .sync .. link to sync that raised this delay. --- .filename .. filename or nil (=dir itself) --- (.movepeer) .. for MOVEFROM/MOVETO link to other delay --- } -- --- } --- -local targets = {} - ----- -- all watches -- @@ -87,7 +77,7 @@ local watches = {} -- sync .. -- filename .. -- -local children = {} +local processes = {} ------ @@ -118,8 +108,7 @@ local event_names = { local function delay_action(atype, wd, sync, filename, time) print("delay_action "..event_names[atype].."("..wd..") ") local origin = sync.origin - local target = origin.target - local delays = target.delays + local delays = origin.delays local nd = {atype = atype, wd = wd, sync = sync, @@ -172,22 +161,24 @@ end -- Called from code whenever a child process finished and -- zombie process was collected by core. -- -function lsyncd_collect_child(pid, exitcode) - local child = children[pid] - if child == nil then +function lsyncd_collect_process(pid, exitcode) + local process = processes[pid] + if process == nil then return end - local sync = child.sync + local sync = process.sync local origin = sync.origin - log(DEBUG, "collected "..pid..": "..event_names[atpye].." "..origin.source.."/"..sync.path..child.filename.. + log(DEBUG, "collected "..pid..": "..event_names[atpye].." "..origin.source.."/"..sync.path..process.filename.. " = "..exitcode) + processes[pid] = nil + origin.processes[pid] = nil end ----- -- TODO -- -- -local function invoke_action(target, delay) +local function invoke_action(delay) local sync = delay.sync local origin = sync.origin local actions = origin.actions @@ -221,16 +212,16 @@ local function invoke_action(target, delay) end if func ~= nil then - pid = func(origin.source, sync.path, delay.filename, target.ident) + pid = func(origin.source, sync.path, delay.filename, origin.targetident) if pid ~= nil and pid > 0 then - child = {pid = pid, - target = target, - atpye = delay.atype, - wd = delay.wd, - sync = delay.sync, - filename = delay.filname + process = {pid = pid, + atpye = delay.atype, + wd = delay.wd, + sync = delay.sync, + filename = delay.filname } - children[pid] = child + processes[pid] = process + origin.processes[pid] = process end end end @@ -249,13 +240,16 @@ end -- -- @param now the time is now -- - function lsyncd_alarm(now) - for i, target in ipairs(targets) do - local delays = target.delays - if delays[1] ~= nil then - invoke_action(target, target.delays[1]) - table.remove(delays, 1) + -- goes through all targets and spawns more actions + -- if possible + for i, o in ipairs(origins) do + if #o.processes < o.actions.max_processes then + local delays = o.delays + if delays[1] ~= nil then + invoke_action(o.delays[1]) + table.remove(delays, 1) + end end end end @@ -276,19 +270,21 @@ function lsyncd_initialize() for i, origin in ipairs(origins) do -- resolves source to be an absolute path local asrc = lsyncd.real_dir(origin.source) + local actions = origin.actions if asrc == nil then print("Cannot resolve source path: ", origin.source) lsyncd.terminate(-1) -- ERRNO end origin.source = asrc - - -- appends the target on target lists - local target = { ident = origin.targetident, delays = {} } - table.insert(targets, target) - origin.target = target - if origin.actions.startup ~= nil then + origin.delays = {} + origin.processes = {} + if actions.max_processes == nil then + actions.max_processes = 1 -- TODO DEFAULT MAXPROCESS + end + if actions.startup ~= nil then have_startup = true end + -- and add the dir watch inclusively all subdirs attend_dir(origin, "", nil) end @@ -365,8 +361,7 @@ function lsyncd_event(etype, wd, isdir, filename, filename2) end ----- --- Called by the core for every child process that --- finished in startup phase +-- Collector for every child process that finished in startup phase -- -- Parameters are pid and exitcode of child process -- @@ -398,6 +393,9 @@ function sync(source_dir, target_identifier, actions) source = source_dir, targetident = target_identifier, } + if actions.max_actions == nil then + actions.max_actions = 1 + end table.insert(origins, o) return end