From bd6e753fbc3825a14aded0a62f8351c1660424fa Mon Sep 17 00:00:00 2001 From: Axel Kittenberger Date: Sat, 11 Dec 2010 20:00:48 +0000 Subject: [PATCH] optimized Queues --- lsyncd.lua | 88 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 53 insertions(+), 35 deletions(-) diff --git a/lsyncd.lua b/lsyncd.lua index 296aab6..8e62a38 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -224,6 +224,19 @@ Queue = (function() end return pos, list[pos] end + + ----- + -- Stateless reverse queue iterator. + local function iterReverse(list, pos) + pos = pos - 1 + while list[pos] == nil and pos >= list.first do + pos = pos - 1 + end + if pos < list.first then + return nil + end + return pos, list[pos] + end ---- -- Iteraters through the queue @@ -231,8 +244,19 @@ Queue = (function() local function qpairs(list) return iter, list, list.first - 1 end + + ---- + -- Iteraters backwards through the queue + -- returning all non-nil pos-value entries + local function qpairsReverse(list) + return iterReverse, list, list.last + 1 + end - return {new = new, push = push, remove = remove, qpairs = qpairs} + return {new = new, + push = push, + remove = remove, + qpairs = qpairs, + qpairsReverse = qpairsReverse} end)() ---- @@ -294,7 +318,7 @@ local Delay = (function() -- path and file/dirname of a move destination. -- path2 = path2, - + ------ -- Status of the event. Valid stati are: -- 'wait' ... the event is ready to be handled. @@ -306,6 +330,10 @@ local Delay = (function() -- insurrance. -- status = "wait", + + ----- + -- Position in the queue + dpos = -1, } return o end @@ -1087,18 +1115,10 @@ local Sync = (function() -- Removes a delay. -- local function removeDelay(self, delay) - local found - for i, d in ipairs(self.delays) do - if d == delay then - found = true - table.remove(self.delays, i) - break - end - end - - if not found then - error("Did not find a delay to be removed!") + if self.delays[delay.dpos] ~= delay then + error("Queue is broken, delay not a dpos") end + Queue.remove(self.delays, delay.dpos) -- free all delays blocked by this one. if delay.blocks then @@ -1280,28 +1300,27 @@ local Sync = (function() if nd.etype == "Blanket" then -- always stack blanket events on the last event log("Delay", "Stacking blanket event.") - if #self.delays > 0 then - stack(self.delays[#self.delays], nd) + if self.delays.size > 0 then + stack(self.delays[self.delays.last], nd) end addDelayPath("", nd) - table.insert(self.delays, nd) + nd.dpos = Queue.push(self.delays, nd) return end -- detects blocks and combos by working from back until -- front through the fifo - local il = #self.delays -- last delay - while il > 0 do - local od = self.delays[il] + for il, od in Queue.qpairsReverse(self.delays) do + -- asks Combiner what to do local ac = Combiner.combine(od, nd) if ac then if ac == "remove" then - table.remove(self.delays, il) + Queue.remove(self.delays, il) return elseif ac == "stack" then stack(od, nd) - table.insert(self.delays, nd) + nd.dpos = Queue.push(self.delays, nd) return elseif ac == "absorb" then return @@ -1326,7 +1345,7 @@ local Sync = (function() log("Delay", "New ",nd.etype,":",nd.path) end -- no block or combo - table.insert(self.delays, nd) + nd.dpos = Queue.push(self.delays, nd) end @@ -1341,7 +1360,7 @@ local Sync = (function() -- first checks if more processses could be spawned if self.processes:size() < self.config.maxProcesses then -- finds the nearest delay waiting to be spawned - for _, d in ipairs(self.delays) do + for _, d in Queue.qpairs(self.delays) do if d.status == "wait" then return d.alarm end @@ -1374,7 +1393,7 @@ local Sync = (function() end end - for i, d in ipairs(self.delays) do + for i, d in Queue.qpairs(self.delays) do if d.status == "active" or (test and not test(InletFactory.d2e(self, d))) then @@ -1386,7 +1405,6 @@ local Sync = (function() --- TODO: make incremental indexes in dlist, -- and replace pairs with ipairs. - return dlist end @@ -1399,9 +1417,9 @@ local Sync = (function() -- no new processes return end - for _, d in ipairs(self.delays) do - if #self.delays < self.config.maxDelays then - -- time constrains only are only a concern if not maxed + for _, d in Queue.qpairs(self.delays) do + if self.delays.size < self.config.maxDelays then + -- time constrains are only concerned if not maxed -- the delay FIFO already. if d.alarm ~= true and timestamp < d.alarm then -- reached point in stack where delays are in future @@ -1423,9 +1441,9 @@ local Sync = (function() -- Gets the next event to be processed. -- local function getNextDelay(self, timestamp) - for i, d in ipairs(self.delays) do - if #self.delays < self.config.maxDelays then - -- time constrains only are only a concern if not maxed + for i, d in Queue.qpairs(self.delays) do + if self.delays.size < self.config.maxDelays then + -- time constrains are only concerned if not maxed -- the delay FIFO already. if d.alarm ~= true and timestamp < d.alarm then -- reached point in stack where delays are in future @@ -1445,7 +1463,7 @@ local Sync = (function() -- local function addBlanketDelay(self) local newd = Delay.new("Blanket", true, "") - table.insert(self.delays, newd) + newd.dpos = Queue.push(self.delays, newd) return newd end @@ -1455,8 +1473,8 @@ local Sync = (function() local function statusReport(self, f) local spaces = " " f:write(self.config.name," source=",self.source,"\n") - f:write("There are ",#self.delays, " delays\n") - for i, vd in ipairs(self.delays) do + f:write("There are ",self.delays.size, " delays\n") + for i, vd in Queue.qpairs(self.delays) do local st = vd.status f:write(st, string.sub(spaces, 1, 7 - #st)) f:write(vd.etype," ") @@ -1488,7 +1506,7 @@ local Sync = (function() local s = { -- fields config = config, - delays = CountArray.new(), + delays = Queue.new(), source = config.source, processes = CountArray.new(), excludes = Excludes.new(),