This commit is contained in:
Axel Kittenberger 2010-11-01 19:57:53 +00:00
parent 7550789ff4
commit 187dedb491
3 changed files with 129 additions and 135 deletions

View File

@ -22,25 +22,27 @@ slowbash = {
return shell([[if [ "$(ls -A $1)" ]; then cp -r "$1"* "$2"; fi]], source, target) return shell([[if [ "$(ls -A $1)" ]; then cp -r "$1"* "$2"; fi]], source, target)
end, end,
create = function(inlet) action = function(inlet)
local event = inlet:nextevent() local event = inlet:nextevent()
return inlet:config()[string.lower(event.ename)](event)
end,
create = function(event)
log(NORMAL, "create from "..event.spath.." -> "..event.tpath) log(NORMAL, "create from "..event.spath.." -> "..event.tpath)
return shell(prefix..[[cp "$1" "$2"]], event.spath, event.tpath) return shell(prefix..[[cp "$1" "$2"]], event.spath, event.tpath)
end, end,
modify = function(inlet) modify = function(event)
local event = inlet:nextevent()
log(NORMAL, "modify from "..event.spath.." -> "..event.tpath) log(NORMAL, "modify from "..event.spath.." -> "..event.tpath)
return shell(prefix..[[cp "$1" "$2"]], event.spath, event.tpath) return shell(prefix..[[cp "$1" "$2"]], event.spath, event.tpath)
end, end,
attrib = function(self, inlet) attrib = function(event)
-- ignore attribs -- ignore attribs
return 0 return 0
end, end,
delete = function(inlet) delete = function(event)
local event = inlet:nextevent()
log(NORMAL, "delete "..event.tpath) log(NORMAL, "delete "..event.tpath)
return exec(prefix..[[rm "$1"]], event.tpath) return exec(prefix..[[rm "$1"]], event.tpath)
end, end,

View File

@ -75,12 +75,6 @@ enum event_type {
CREATE = 3, CREATE = 3,
DELETE = 4, DELETE = 4,
MOVE = 5, MOVE = 5,
/* MOVEFROM/TO never get passed to the runner, but it uses these
* enums to split events again. The core will only send complete
* move events to the runner. Moves into or out of the watch tree
* are replaced with CREATE/DELETE events. */
MOVEFROM = 6,
MOVETO = 7,
}; };
/** /**
@ -941,7 +935,16 @@ void handle_event(lua_State *L, struct inotify_event *event) {
/* and hands over to runner */ /* and hands over to runner */
lua_getglobal(L, "lsyncd_event"); lua_getglobal(L, "lsyncd_event");
lua_pushnumber(L, event_type); switch(event_type) {
case ATTRIB : lua_pushstring(L, "Attrib"); break;
case MODIFY : lua_pushstring(L, "Modify"); break;
case CREATE : lua_pushstring(L, "Create"); break;
case DELETE : lua_pushstring(L, "Delete"); break;
case MOVE : lua_pushstring(L, "Move"); break;
default :
logstring(ERROR, "Internal: unknown event in handle_event()");
exit(-1); // ERRNO
}
lua_pushnumber(L, event->wd); lua_pushnumber(L, event->wd);
lua_pushboolean(L, (event->mask & IN_ISDIR) != 0); lua_pushboolean(L, (event->mask & IN_ISDIR) != 0);
lua_pushinteger(L, times(NULL)); lua_pushinteger(L, times(NULL));
@ -1137,15 +1140,6 @@ main(int argc, char *argv[])
luaL_register(L, "lsyncd", lsyncdlib); luaL_register(L, "lsyncd", lsyncdlib);
lua_setglobal(L, "lysncd"); lua_setglobal(L, "lysncd");
/* register event types */
lua_pushinteger(L, ATTRIB); lua_setglobal(L, "ATTRIB");
lua_pushinteger(L, MODIFY); lua_setglobal(L, "MODIFY");
lua_pushinteger(L, CREATE); lua_setglobal(L, "CREATE");
lua_pushinteger(L, DELETE); lua_setglobal(L, "DELETE");
lua_pushinteger(L, MOVE); lua_setglobal(L, "MOVE");
lua_pushinteger(L, MOVEFROM); lua_setglobal(L, "MOVEFROM");
lua_pushinteger(L, MOVETO); lua_setglobal(L, "MOVETO");
/* register log levels */ /* register log levels */
lua_pushinteger(L, DEBUG); lua_setglobal(L, "DEBUG"); lua_pushinteger(L, DEBUG); lua_setglobal(L, "DEBUG");
lua_pushinteger(L, VERBOSE); lua_setglobal(L, "VERBOSE"); lua_pushinteger(L, VERBOSE); lua_setglobal(L, "VERBOSE");

View File

@ -149,6 +149,13 @@ end
local function globals_lock() local function globals_lock()
local t = _G local t = _G
local mt = getmetatable(t) or {} local mt = getmetatable(t) or {}
mt.__index = function(t, k)
if (k~="_" and string.sub(k, 1, 2) ~= "__") then
error("Access of non-existing global.", 2)
else
rawget(t, k)
end
end
mt.__newindex = function(t, k, v) mt.__newindex = function(t, k, v)
if (k~="_" and string.sub(k, 1, 2) ~= "__") then if (k~="_" and string.sub(k, 1, 2) ~= "__") then
error("Lsyncd does not allow GLOBALS to be created on the fly." .. error("Lsyncd does not allow GLOBALS to be created on the fly." ..
@ -173,7 +180,7 @@ end
-- filled during initialization. -- filled during initialization.
-- --
-- [#] { -- [#] {
-- actions = actions, -- config = config,
-- source = source_dir, -- source = source_dir,
-- targetident = the identifier of target (like string "host:dir") -- targetident = the identifier of target (like string "host:dir")
-- for lsyncd this passed competly opaquely to the -- for lsyncd this passed competly opaquely to the
@ -181,10 +188,9 @@ end
-- --
-- .processes = [pid] .. a sublist of processes[] for this target -- .processes = [pid] .. a sublist of processes[] for this target
-- .delays = [#) { .. the delays stack -- .delays = [#) { .. the delays stack
-- .atype .. enum, kind of action -- .ename .. enum, kind of action
-- .alarm .. when it should fire -- .alarm .. when it should fire
-- .pathname .. complete path relativ to watch origin -- .pathname .. complete path relativ to watch origin
-- .filename .. filename or nil (=dir itself)
-- (.movepeer) .. for MOVEFROM/MOVETO link to other delay -- (.movepeer) .. for MOVEFROM/MOVETO link to other delay
-- } -- }
-- .delayname[pathname] = [#] .. a list of lists of all delays from a -- .delayname[pathname] = [#] .. a list of lists of all delays from a
@ -193,12 +199,11 @@ end
-- --
local origins = new_array() local origins = new_array()
local proto_origin = { local proto_origin = {
actions=true, source=true, targetident=true, config=true, source=true, targetident=true,
processes=true, delays=true, delayname=true processes=true, delays=true, delayname=true
} }
local proto_delay = { local proto_delay = {
atype =true, alarm=true, pathname=true, ename =true, alarm=true, pathname=true, movepeer=true
filename=true, movepeer=true
} }
----- -----
@ -222,17 +227,17 @@ local proto_inotify = {origin=true, path=true}
----- -----
-- A list of names of the event types the core sends. -- A list of names of the event types the core sends.
-- (Also makes sure the strings are not collected)
-- --
local event_names = { local valid_events = {
[ATTRIB ] = "Attrib", Attrib = true,
[MODIFY ] = "Modify", Modify = true,
[CREATE ] = "Create", Create = true,
[DELETE ] = "Delete", Delete = true,
[MOVE ] = "Move", Move = true,
[MOVEFROM ] = "MoveFrom", MoveFrom = true,
[MOVETO ] = "MoveTo", MoveTo = true,
} }
set_array(event_names)
--============================================================================ --============================================================================
-- The lsyncd runner -- The lsyncd runner
@ -241,28 +246,26 @@ set_array(event_names)
----- -----
-- Puts an action on the delay stack. -- Puts an action on the delay stack.
-- --
local function delay_action(atype, wd, sync, time, filename, filename2) local function delay_action(ename, wd, time, origin, pathname, pathname2)
log(DEBUG, "delay_action "..event_names[atype].."("..wd..") ") log(DEBUG, "delay_action "..ename.."("..wd..") ")
local o = sync.origin local o = origin
local delays = o.delays local delays = o.delays
local delayname = o.delayname local delayname = o.delayname
local pathname = sync.path..(filename or "")
if atype == MOVE and not o.actions.move then if ename == "Move" and not o.config.move then
-- if there is no move action defined, split a move as delete/create -- if there is no move action defined, split a move as delete/create
log(DEBUG, "splitting MOVE into DELETE & CREATE") log(DEBUG, "splitting Move into Delete & Create")
delay_action(DELETE, wd, sync, time, filename, nil) delay_action("Delete", wd, time, pathname, nil)
delay_action(CREATE, wd, sync, time, filename2, nil) delay_action("Create", wd, time, pathname2, nil)
return return
end end
-- creates the new action -- creates the new action
local newd = {atype = atype, local newd = {ename = ename,
pathname = pathname, pathname = pathname }
filename = filename }
set_prototype(newd, proto_delay) set_prototype(newd, proto_delay)
if time and o.actions.delay then if time and o.config.delay then
newd.alarm = lsyncd.addto_clock(time, o.actions.delay) newd.alarm = lsyncd.addto_clock(time, o.config.delay)
else else
newd.alarm = lsyncd.now() newd.alarm = lsyncd.now()
end end
@ -271,30 +274,30 @@ local function delay_action(atype, wd, sync, time, filename, filename2)
if oldd then if oldd then
-- if there is already a delay on this pathname. -- if there is already a delay on this pathname.
-- decide what should happen with multiple delays. -- decide what should happen with multiple delays.
if newd.atype == MOVE_FROM or newd.atype == MOVE_TO or if newd.ename == "MoveFrom" or newd.ename == "MoveTo" or
oldd.atype == MOVE_FROM or oldd.atype == MOVE_TO then oldd.ename == "MoveFrom" or oldd.ename == "MoveTo" then
-- do not collapse moves -- do not collapse moves
log(NORMAL, "Not collapsing events with moves on "..filename) log(NORMAL, "Not collapsing events with moves on "..pathname)
-- TODO stackinfo -- TODO stackinfo
return return
else else
local col = o.actions.collapse_table[oldd.atype][newd.atype] local col = o.config.collapse_table[oldd.ename][newd.ename]
if col == -1 then if col == -1 then
-- events cancel each other -- events cancel each other
log(NORMAL, "Nullfication: " ..event_names[newd.atype].." after ".. log(NORMAL, "Nullfication: " ..newd.ename.." after "..
event_names[oldd.atype].." on "..filename) oldd.ename.." on "..pathname)
oldd.atype = NONE oldd.ename = "none"
return return
elseif col == 0 then elseif col == 0 then
-- events tack -- events tack
log(NORMAL, "Stacking " ..event_names[newd.atype].." after ".. log(NORMAL, "Stacking " ..newd.ename.." after "..
event_names[oldd.atype].." on "..filename) oldd.ename.." on "..pathname)
-- TODO Stack pointer -- TODO Stack pointer
else else
log(NORMAL, "Collapsing "..event_names[newd.atype].." upon ".. log(NORMAL, "Collapsing "..newd.ename.." upon "..
event_names[oldd.atype].." to " .. oldd.ename.." to " ..
event_names[col].." on "..filename) col.." on "..pathname)
oldd.atype = col oldd.ename = col
return return
end end
end end
@ -312,7 +315,7 @@ end
-- @param path relative path of this dir to origin -- @param path relative path of this dir to origin
-- @param parent link to parent directory in watches[] -- @param parent link to parent directory in watches[]
-- --
local function inotify_dir(origin, path) local function inotify_watch_dir(origin, path)
local op = origin.source .. path local op = origin.source .. path
-- register watch and receive watch descriptor -- register watch and receive watch descriptor
local wd = lsyncd.add_watch(op); local wd = lsyncd.add_watch(op);
@ -331,15 +334,15 @@ local function inotify_dir(origin, path)
set_prototype(inotify, proto_inotify) set_prototype(inotify, proto_inotify)
table.insert(ilist, inotify) table.insert(ilist, inotify)
-- on a warmstart add a CREATE for the directory -- on a warmstart add a Create for the directory
if not origin.actions.startup then if not origin.config.startup then
delay_action(CREATE, wd, sync, nil, nil, nil) delay_action("Create", wd, sync, nil, nil, nil)
end end
-- registers and adds watches for all subdirectories -- registers and adds watches for all subdirectories
local subdirs = lsyncd.sub_dirs(op) local subdirs = lsyncd.sub_dirs(op)
for _, dirname in ipairs(subdirs) do for _, dirname in ipairs(subdirs) do
inotify_dir(origin, path..dirname.."/") inotify_watch_dir(origin, path..dirname.."/")
end end
end end
@ -361,7 +364,7 @@ function lsyncd_collect_process(pid, exitcode)
return return
end end
log(DEBUG, "collected "..pid..": ".. log(DEBUG, "collected "..pid..": "..
event_names[delay.atype].." of ".. delay.ename.." of "..
origin.source..delay.pathname.. origin.source..delay.pathname..
" = "..exitcode) " = "..exitcode)
origin.processes[pid] = nil origin.processes[pid] = nil
@ -377,15 +380,20 @@ local hk = {}
--- TODO --- TODO
local inlet = { local inlet = {
[hk] = { [hk] = {
origin = true, origin = true,
delay = true, delay = true,
}, },
config = function(self)
return self[hk].origin.config
end,
nextevent = function(self) nextevent = function(self)
local h = self[hk] local h = self[hk]
return { return {
spath = h.origin.source .. h.delay.pathname, spath = h.origin.source .. h.delay.pathname,
tpath = h.origin.targetident .. h.delay.pathname, tpath = h.origin.targetident .. h.delay.pathname,
ename = h.delay.ename
} }
end, end,
} }
@ -397,31 +405,17 @@ local inlet = {
-- --
local function invoke_action(origin, delay) local function invoke_action(origin, delay)
local o = origin local o = origin
local actions = o.actions local config = o.config
local func = nil if delay.ename == "None" then
local atype = delay.atype
if atype == NONE then
-- a removed action -- a removed action
return return
elseif atype == CREATE then
func = actions.create or actions.default
elseif atype == ATTRIB then
func = actions.attrib or actions.default
elseif atype == MODIFY then
func = actions.modify or actions.default
elseif atype == DELETE then
func = actions.delete or actions.default
elseif atype == MOVE then
log(ERROR, "MOVE NOT YET IMPLEMENTED!") -- TODO
end end
if func then inlet[hk].origin = origin
inlet[hk].origin = origin inlet[hk].delay = delay
inlet[hk].delay = delay local pid = config.action(inlet)
local pid = func(inlet) if pid and pid > 0 then
if pid and pid > 0 then o.processes[pid] = delay
o.processes[pid] = delay
end
end end
end end
@ -432,15 +426,11 @@ end
function lsyncd_status_report(fd) function lsyncd_status_report(fd)
local w = lsyncd.writefd local w = lsyncd.writefd
w(fd, "Lsyncd status report at "..os.date().."\n\n") w(fd, "Lsyncd status report at "..os.date().."\n\n")
w(fd, "Watching "..watches.size.." directories\n") w(fd, "Watching "..inotifies.size.." directories\n")
for i, v in pairs(watches.nt) do for wd, v in pairs(inotifies.nt) do
w(fd, " "..i..": ") w(fd, " "..wd..": ")
if i ~= v.wd then for _, inotify in ipairs(v) do
w(fd, "[Error: wd/v.wd "..i.."~="..v.wd.."]") w(fd, "("..inotify.origin.source.."|"..(inotify.path) or ")..")
end
for _, s in ipairs(v.syncs) do
local o = s.origin
w(fd, "("..o.source.."|"..(s.path) or ")..")
end end
w(fd, "\n") w(fd, "\n")
end end
@ -456,7 +446,7 @@ function lsyncd_alarm(now)
-- goes through all targets and spawns more actions -- goes through all targets and spawns more actions
-- if possible -- if possible
for _, o in ipairs(origins) do for _, o in ipairs(origins) do
if o.processes.size < o.actions.max_processes then if o.processes.size < o.config.max_processes then
local delays = o.delays local delays = o.delays
local d = delays[1] local d = delays[1]
if d and lsyncd.before_eq(d.alarm, now) then if d and lsyncd.before_eq(d.alarm, now) then
@ -552,9 +542,9 @@ function lsyncd_initialize(args)
for _, o in ipairs(origins) do for _, o in ipairs(origins) do
-- resolves source to be an absolute path -- resolves source to be an absolute path
local asrc = lsyncd.real_dir(o.source) local asrc = lsyncd.real_dir(o.source)
local actions = o.actions local config = o.config
if not asrc then if not asrc then
print("Cannot resolve source path: ", o.source) log(Error, "Cannot resolve source path: " .. o.source)
terminate(-1) -- ERRNO terminate(-1) -- ERRNO
end end
o.source = asrc o.source = asrc
@ -562,21 +552,21 @@ function lsyncd_initialize(args)
o.delayname = {} o.delayname = {}
o.processes = new_count_array() o.processes = new_count_array()
actions.max_processes = config.max_processes =
actions.max_processes or config.max_processes or
settings.max_processes or settings.max_processes or
defaults.max_processes defaults.max_processes
actions.collapse_table = config.collapse_table =
actions.collapse_table or config.collapse_table or
settings.collapse_table or settings.collapse_table or
defaults.collapse_table defaults.collapse_table
if actions.startup then if config.startup then
have_startup = true have_startup = true
end end
-- adds the dir watch inclusively all subdirs -- adds the dir watch inclusively all subdirs
inotify_dir(o, "") inotify_watch_dir(o, "")
end end
-- from this point on use logging facilities as configured. -- from this point on use logging facilities as configured.
@ -587,8 +577,8 @@ function lsyncd_initialize(args)
local pids = { } local pids = { }
for _, o in ipairs(origins) do for _, o in ipairs(origins) do
local pid local pid
if o.actions.startup then if o.config.startup then
local pid = o.actions.startup(o.source, o.targetident) local pid = o.config.startup(o.source, o.targetident)
table.insert(pids, pid) table.insert(pids, pid)
end end
end end
@ -613,7 +603,7 @@ function lsyncd_get_alarm()
local alarm = 0 local alarm = 0
for _, o in ipairs(origins) do for _, o in ipairs(origins) do
if o.delays[1] and if o.delays[1] and
o.processes.size < o.actions.max_processes then o.processes.size < o.config.max_processes then
if have_alarm then if have_alarm then
alarm = lsyncd.earlier(alarm, o.delays[1].alarm) alarm = lsyncd.earlier(alarm, o.delays[1].alarm)
else else
@ -628,13 +618,13 @@ end
----- -----
-- Called by core on inotify event -- Called by core on inotify event
-- --
-- @param etype enum (ATTRIB, MODIFY, CREATE, DELETE, MOVE) -- @param ename "Attrib", "Mofify", "Create", "Delete", "Move")
-- @param wd watch descriptor (matches lsyncd.add_watch()) -- @param wd watch descriptor (matches lsyncd.add_watch())
-- @param time time of event -- @param time time of event
-- @param filename string filename without path -- @param filename string filename without path
-- @param filename2 -- @param filename2
-- --
function lsyncd_event(etype, wd, isdir, time, filename, filename2) function lsyncd_event(ename, wd, isdir, time, filename, filename2)
local ftype; local ftype;
if isdir then if isdir then
ftype = "directory" ftype = "directory"
@ -643,25 +633,33 @@ function lsyncd_event(etype, wd, isdir, time, filename, filename2)
end end
-- TODO comment out to safe performance -- TODO comment out to safe performance
if filename2 then if filename2 then
log(DEBUG, "got event "..event_names[etype].." of "..ftype.." "..filename.." to "..filename2) log(DEBUG, "got event "..ename..
" of "..ftype.." "..filename.." to "..filename2)
else else
log(DEBUG, "got event "..event_names[etype].." of "..ftype.." "..filename) log(DEBUG, "got event "..ename..
" of "..ftype.." "..filename)
end end
-- looks up the watch descriptor id -- looks up the watch descriptor id
local w = watches[wd] local ilist = inotifies[wd]
if not w then if not ilist then
log(NORMAL, "event belongs to unknown or deleted watch descriptor.") log(NORMAL, "event belongs to unknown or deleted watch descriptor.")
return return
end end
-- works through all possible source->target pairs -- works through all possible source->target pairs
for _, sync in ipairs(w.syncs) do for _, inotify in ipairs(ilist) do
delay_action(etype, wd, sync, time, filename, filename2) local pathname2
if filename2 then
pathname2 = inotify.path..filename2
end
delay_action(ename, wd, time, inotify.origin,
inotify.path..filename, pathname2)
-- add subdirs for new directories -- add subdirs for new directories
if isdir then if isdir then
if etype == CREATE then if ename == "Create" then
inotify_dir(sync.origin, sync.path .. filename .. "/") inotify_watch_dir(inotify.origin,
inotify.path .. filename .. "/")
end end
end end
end end
@ -690,20 +688,20 @@ end
--============================================================================ --============================================================================
---- ----
-- Adds one directory (incl. subdir) to be synchronized. -- Adds one directory (incl. subdirs) to be synchronized.
-- Users primary configuration device. -- Users primary configuration device.
-- --
-- @param TODO -- @param TODO
-- --
function sync(source_dir, target_identifier, actions) function sync(source_dir, target_identifier, config)
local o = { actions = actions, local o = { config = config,
source = source_dir, source = source_dir,
targetident = target_identifier, targetident = target_identifier,
} }
set_prototype(o, proto_origin) set_prototype(o, proto_origin)
if not actions.max_actions then if not config.max_actions then
actions.max_actions = 1 -- TODO move to init config.max_actions = 1 -- TODO move to init
end end
table.insert(origins, o) table.insert(origins, o)
return return
@ -760,10 +758,10 @@ defaults = {
-- TODO -- TODO
-- --
collapse_table = { collapse_table = {
[ATTRIB] = { [ATTRIB] = ATTRIB, [MODIFY] = MODIFY, [CREATE] = CREATE, [DELETE] = DELETE }, Attrib = { Attrib = "Attrib", Modify = "Modify", Create = "Create", Delete = "Delete" },
[MODIFY] = { [ATTRIB] = MODIFY, [MODIFY] = MODIFY, [CREATE] = CREATE, [DELETE] = DELETE }, Modify = { Attrib = "Modify", Modify = "Modify", Create = "Create", Delete = "Delete" },
[CREATE] = { [ATTRIB] = CREATE, [MODIFY] = CREATE, [CREATE] = CREATE, [DELETE] = -1 }, Create = { Attrib = "Create", Modify = "Create", Create = "Create", Delete = -1 },
[DELETE] = { [ATTRIB] = DELETE, [MODIFY] = DELETE, [CREATE] = MODIFY, [DELETE] = DELETE }, Delete = { Attrib = "Delete", Modify = "Delete", Create = "Modify", Delete = "Delete" },
}, },
rsync = default_rsync rsync = default_rsync