From fc70d4c3edabdfdcfab5d53d91c28e3e22a7d96c Mon Sep 17 00:00:00 2001 From: Axel Kittenberger Date: Sun, 28 Nov 2010 23:39:18 +0000 Subject: [PATCH] rewritten event combiner, now clearly not part of user script --- lsyncd.lua | 304 +++++++++++++++++++++++---------------- tests/churn-rsync.lua | 4 +- tests/churn-rsyncssh.lua | 5 +- 3 files changed, 185 insertions(+), 128 deletions(-) diff --git a/lsyncd.lua b/lsyncd.lua index d3ea0de..9b832ae 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -225,9 +225,169 @@ local Delay = (function() return o end + -- public interface return {new = new} end)() +----- +-- combines delays +-- +local Combiner = (function() + + ---- + -- new delay absorbed by old + -- + local function abso(d1, d2) + log("Delay",d2.etype,":",d2.path," absorbed by ", + d1.etype,":",d1.path) + return "absorb" + end + + ---- + -- new delay replaces the old one + -- + local function repl(d1, d2) + log("Delay",d2.etype,":",d2.path," replaces ", + d1.etype,":",d1.path) + return "replace" + end + + ---- + -- delays nullificate each other + -- + local function null(d1, d2) + log("Delay",d2.etype,":",d2.path," nullifies ", + d1.etype,":",d1.path) + return "remove" + end + + ----- + -- TODO + -- + local combineNoMove = { + Attrib = {Attrib=abso, Modify=repl, Create=repl, Delete=repl }, + Modify = {Attrib=abso, Modify=abso, Create=repl, Delete=repl }, + Create = {Attrib=abso, Modify=abso, Create=abso, Delete=null }, + Delete = {Attrib=abso, Modify=abso, Create=repl, Delete=abso }, + } + + ------ + -- combines two delays + -- + local function combine(d1, d2) + if d1.etype == "Blanket" then + -- everything is blocked by a blanket delay. + if d2.path2 then + log("Delay", d2.etype,":",d2.path,"->",d2.path2, "blocked by", + "Blanket event") + else + log("Delay", d2.etype,":",d2.path, "blocked by", + "Blanket event") + end + return "stack" + end + + -- Two normal events + if d1.etype ~= "Move" and d2.etype ~= "Move" then + if d1.path == d2.path then + if d1.status == "active" then + return "stack" + end + return combineNoMove[d1.etype][d2.etype](d1, d2) + end + + -- blocks events if one is a parent directory of another + if d1.path:byte(-1) == 47 and string.starts(d2.path, d1.path) or + d2.path:byte(-1) == 47 and string.starts(d1.path, d2.path) + then + return "stack" + end + return nil + end + + -- Normal upon a Move + if d1.etype == "Move" and d2.etype ~= "Move" then + -- stacks the move if the from field could anyway be damaged + if d1.path == d2.path or + d2.path:byte(-1) == 47 and string.starts(d1.path, d2.path) or + d1.path:byte(-1) == 47 and string.starts(d2.path, d1.path) + then + log("Delay",d2.etype,":",d2.path," blocked by", + "Move :",d1.path,"->",d1.path2) + return "stack" + end + + -- Event does something with the move destination + if d1.path2 == d2.path then + if d2.etype == "Delete" or d2.etype == "Create" then + if d1.status == "active" then + return "stack" + end + log("Delay",d2.etype,":",d2.path," turns ", + "Move :",d1.path,"->",d1.path2, " into ", + "Delete:",d1.path) + d1.etype = "Delete" + d1.path2 = nil + return "stack" + end + -- on "Attrib" or "Modify" simply wait for the move first + return "stack" + end + + if d2.path :byte(-1) == 47 and string.starts(d1.path2, d2.path) or + d1.path2:byte(-1) == 47 and string.starts(d2.path, d1.path2) + then + log("Delay",d2.etype,":",d2.path," blocked by ", + "Move:",d1.path,"->",d1.path2) + return "stack" + end + return nil + end + + -- Move upon a single event + if d1.etype ~= "Move" and d2.etype == "Move" then + if d1.path == d2.path or d1.path == d2.path2 or + d1.path :byte(-1) == 47 and string.starts(d2.path, d1.path) or + d1.path :byte(-1) == 47 and string.starts(d2.path2, d1.path) or + d2.path :byte(-1) == 47 and string.starts(d1.path, d2.path) or + d2.path2:byte(-1) == 47 and string.starts(d1.path, d2.path2) + then + log("Delay","Move:",d2.path,"->",d1.path2, + "splitted by ",d1.etype,":",d1.path) + return "split" + end + return nil + end + + -- Move upon move + if d1.etype == "Move" and d2.etype == "Move" then + -- TODO combine moves, + + if d1.path == d2.path or d1.path == d2.path2 or + d1.path2 == d2.path or d2.path2 == d2.path or + d1.path :byte(-1) == 47 and string.starts(d2.path, d1.path) or + d1.path :byte(-1) == 47 and string.starts(d2.path2, d1.path) or + d1.path2:byte(-1) == 47 and string.starts(d2.path, d1.path2) or + d1.path2:byte(-1) == 47 and string.starts(d2.path2, d1.path2) or + d2.path :byte(-1) == 47 and string.starts(d1.path, d2.path) or + d2.path :byte(-1) == 47 and string.starts(d1.path2, d2.path) or + d2.path2:byte(-1) == 47 and string.starts(d1.path, d2.path2) or + d2.path2:byte(-1) == 47 and string.starts(d1.path2, d2.path2) + then + log("Delay","Move:",d2.path,"->",d1.path2, + "splitted by Move:",d1.path,"->",d1.path2) + return "split" + end + return nil + end + + error("reached impossible position") + end + + -- public interface + return {combine = combine} +end)() + ----- -- User interface to grap events -- @@ -1036,84 +1196,40 @@ local Sync = (function() return end - -- detects blocks and collapses by working from back until + -- detects blocks and combos by working from back until -- front through the fifo - - InletControl.setSync(self) - local ne, ne2 = InletControl.d2e(nd) local il = #self.delays -- last delay while il > 0 do - -- get 'old' delay local od = self.delays[il] - local oe, oe2 = InletControl.d2e(od) + local ac = Combiner.combine(od, nd) - if oe.etype == "Blanket" then - -- everything is blocked by a blanket event. - log("Delay", "Stacking ",nd.etype," upon blanket event.") - stack(od, nd) - table.insert(self.delays, nd) - return - end - - -- this mini loop repeats the collapse a second - -- time for move events - local oel = oe - local nel = ne - - while oel and nel do - local c = self.config.collapse(oel, nel, self.config) - if c == 0 then - -- events nullificate each ether - log("Delay",nd.etype," and ",od.etype," on ",path, - " nullified each other.") - od.etype = "None" + if ac then + if ac == "remove" then table.remove(self.delays, il) return - elseif c == 1 then - log("Delay",nd.etype," is absored by event ", - od.etype," on ",path) - return - elseif c == 2 then - if od.etype ~= "Move" then - log("Delay",nd.etype," replaces event ", - od.etype," on ",path) - od.etype = nd.etype - if od.path ~= nd.path then - error("Cannot replace events with different paths") - end - else - log("Delay",nd.etype," turns a Move into delete of ", - od.path) - od.etype = "Delete" - od.path2 = nil - table.insert(self.delays, nd) - end - return - elseif c == 3 then - log("Delay", "Stacking ",nd.etype," upon ", - od.etype," on ",path) + elseif ac == "stack" then stack(od, nd) table.insert(self.delays, nd) return - end - - -- loops over all oe, oe2, ne, ne2 combos. - if oel == oe and oe2 then - -- do another time for oe2 if present - oel = oe2 - elseif nel == ne then - -- do another time for ne2 if present - -- start with first oe - nel = ne2 - oel = oe + elseif ac == "absorb" then + return + elseif ac == "replace" then + od.etype = nd.etype + od.path = nd.path + od.path2 = nd.path2 + return + elseif ac == "split" then + delay(self, "Delete", time, path, nil) + delay(self, "Create", time, path2, nil) + return else - oel = false + error("unknown result of combine()") end end il = il - 1 end - log("Delay", "Registering ",nd.etype," on ",path) - -- there was no hit on collapse or it decided to stack. + log("Delay", "New ",nd.etype,":",path) + -- no block or combo table.insert(self.delays, nd) end @@ -1420,8 +1536,6 @@ local Syncs = (function() end local defaultValues = { 'action', - 'collapse', - 'collapseTable', 'collect', 'init', 'maxDelays', @@ -2913,9 +3027,9 @@ local default_rsyncssh = { rsyncOps = "-lts", ----- - -- allow several processes + -- allow processes -- - maxProcesses = 3, + maxProcesses = 1, ------ -- Let the core not split move event. @@ -2954,63 +3068,7 @@ default = { end end, - ----- - -- Called to see if two events can be collapsed. - -- - -- Default function uses the collapseTable. - -- - -- @param event1 first event - -- @param event2 second event - -- @return -1 ... no interconnection - -- 0 ... drop both events. - -- 1 ... keep first event only - -- 2 ... keep second event only - -- 3 ... events block. - -- - collapse = function(event1, event2, config) - if event1.path == event2.path then - if event1.status == "active" then - return 3 - end - local e1 = event1.etype .. event1.move - local e2 = event2.etype .. event2.move - return config.collapseTable[e1][e2] - end - ----- - -- Block events if one is a parent directory of another - -- - if event1.isdir and string.starts(event2.path, event1.path) then - return 3 - end - if event2.isdir and string.starts(event1.path, event2.path) then - return 3 - end - - return -1 - end, - - ----- - -- Used by default collapse function. - -- Specifies how two event should be collapsed when here - -- horizontal event meets upon a vertical event. - -- values: - -- 0 ... nullification of both events. - -- 1 ... absorbtion of horizontal event. - -- 2 ... replace of vertical event. - -- 3 ... stack both events, vertical blocking horizonal. - -- 9 ... combines two move events. - -- - collapseTable = { - Attrib = {Attrib=1, Modify=2, Create=2, Delete=2, MoveFr=3, MoveTo= 2}, - Modify = {Attrib=1, Modify=1, Create=2, Delete=2, MoveFr=3, MoveTo= 2}, - Create = {Attrib=1, Modify=1, Create=1, Delete=0, MoveFr=3, MoveTo= 2}, - Delete = {Attrib=1, Modify=1, Create=3, Delete=1, MoveFr=3, MoveTo= 2}, - MoveFr = {Attrib=3, Modify=3, Create=3, Delete=3, MoveFr=3, MoveTo= 3}, - -- TODO MoveFr=9 - MoveTo = {Attrib=3, Modify=3, Create=2, Delete=2, MoveFr=3, MoveTo= 2}, - }, - ----- -- Called when collecting a finished child process -- diff --git a/tests/churn-rsync.lua b/tests/churn-rsync.lua index d7a1a1b..2827871 100755 --- a/tests/churn-rsync.lua +++ b/tests/churn-rsync.lua @@ -18,7 +18,7 @@ posix.mkdir(trgdir) churn(srcdir, 10) local logs = {} ---logs = {"-log", "Inotify", "-log", "Exec" } +logs = {"-log", "Delay" } local pid = spawn("./lsyncd", "-nodaemon", "-delay", "5", "-rsync", srcdir, trgdir, unpack(logs)) @@ -28,7 +28,7 @@ posix.sleep(1) churn(srcdir, 100) cwriteln("waiting for Lsyncd to finish its jobs.") -posix.sleep(30) +posix.sleep(10) cwriteln("killing the Lsyncd daemon") posix.kill(pid) diff --git a/tests/churn-rsyncssh.lua b/tests/churn-rsyncssh.lua index 046ea3f..ba9beb9 100755 --- a/tests/churn-rsyncssh.lua +++ b/tests/churn-rsyncssh.lua @@ -18,8 +18,7 @@ posix.mkdir(trgdir) churn(srcdir, 10) local logs = {} ---logs = {"-log", "Inotify", "-log", "Exec" } ---logs = {"-log", "Delay"} +logs = {"-log", "Delay", "-log", "Exec"} local pid = spawn("./lsyncd", "-nodaemon", "-delay", "5", "-rsyncssh", srcdir, "localhost", trgdir, @@ -31,7 +30,7 @@ posix.sleep(1) churn(srcdir, 100) cwriteln("waiting for Lsyncd to finish its jobs.") -posix.sleep(30) +posix.sleep(10) cwriteln("killing the Lsyncd daemon") posix.kill(pid)