mirror of
https://github.com/octoleo/lsyncd.git
synced 2025-02-02 03:48:24 +00:00
optimized Queues
This commit is contained in:
parent
917d2d8873
commit
bd6e753fbc
88
lsyncd.lua
88
lsyncd.lua
@ -224,6 +224,19 @@ Queue = (function()
|
||||
end
|
||||
return pos, list[pos]
|
||||
end
|
||||
|
||||
-----
|
||||
-- Stateless reverse queue iterator.
|
||||
local function iterReverse(list, pos)
|
||||
pos = pos - 1
|
||||
while list[pos] == nil and pos >= list.first do
|
||||
pos = pos - 1
|
||||
end
|
||||
if pos < list.first then
|
||||
return nil
|
||||
end
|
||||
return pos, list[pos]
|
||||
end
|
||||
|
||||
----
|
||||
-- Iteraters through the queue
|
||||
@ -231,8 +244,19 @@ Queue = (function()
|
||||
local function qpairs(list)
|
||||
return iter, list, list.first - 1
|
||||
end
|
||||
|
||||
----
|
||||
-- Iteraters backwards through the queue
|
||||
-- returning all non-nil pos-value entries
|
||||
local function qpairsReverse(list)
|
||||
return iterReverse, list, list.last + 1
|
||||
end
|
||||
|
||||
return {new = new, push = push, remove = remove, qpairs = qpairs}
|
||||
return {new = new,
|
||||
push = push,
|
||||
remove = remove,
|
||||
qpairs = qpairs,
|
||||
qpairsReverse = qpairsReverse}
|
||||
end)()
|
||||
|
||||
----
|
||||
@ -294,7 +318,7 @@ local Delay = (function()
|
||||
-- path and file/dirname of a move destination.
|
||||
--
|
||||
path2 = path2,
|
||||
|
||||
|
||||
------
|
||||
-- Status of the event. Valid stati are:
|
||||
-- 'wait' ... the event is ready to be handled.
|
||||
@ -306,6 +330,10 @@ local Delay = (function()
|
||||
-- insurrance.
|
||||
--
|
||||
status = "wait",
|
||||
|
||||
-----
|
||||
-- Position in the queue
|
||||
dpos = -1,
|
||||
}
|
||||
return o
|
||||
end
|
||||
@ -1087,18 +1115,10 @@ local Sync = (function()
|
||||
-- Removes a delay.
|
||||
--
|
||||
local function removeDelay(self, delay)
|
||||
local found
|
||||
for i, d in ipairs(self.delays) do
|
||||
if d == delay then
|
||||
found = true
|
||||
table.remove(self.delays, i)
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
if not found then
|
||||
error("Did not find a delay to be removed!")
|
||||
if self.delays[delay.dpos] ~= delay then
|
||||
error("Queue is broken, delay not a dpos")
|
||||
end
|
||||
Queue.remove(self.delays, delay.dpos)
|
||||
|
||||
-- free all delays blocked by this one.
|
||||
if delay.blocks then
|
||||
@ -1280,28 +1300,27 @@ local Sync = (function()
|
||||
if nd.etype == "Blanket" then
|
||||
-- always stack blanket events on the last event
|
||||
log("Delay", "Stacking blanket event.")
|
||||
if #self.delays > 0 then
|
||||
stack(self.delays[#self.delays], nd)
|
||||
if self.delays.size > 0 then
|
||||
stack(self.delays[self.delays.last], nd)
|
||||
end
|
||||
addDelayPath("", nd)
|
||||
table.insert(self.delays, nd)
|
||||
nd.dpos = Queue.push(self.delays, nd)
|
||||
return
|
||||
end
|
||||
|
||||
-- detects blocks and combos by working from back until
|
||||
-- front through the fifo
|
||||
local il = #self.delays -- last delay
|
||||
while il > 0 do
|
||||
local od = self.delays[il]
|
||||
for il, od in Queue.qpairsReverse(self.delays) do
|
||||
-- asks Combiner what to do
|
||||
local ac = Combiner.combine(od, nd)
|
||||
|
||||
if ac then
|
||||
if ac == "remove" then
|
||||
table.remove(self.delays, il)
|
||||
Queue.remove(self.delays, il)
|
||||
return
|
||||
elseif ac == "stack" then
|
||||
stack(od, nd)
|
||||
table.insert(self.delays, nd)
|
||||
nd.dpos = Queue.push(self.delays, nd)
|
||||
return
|
||||
elseif ac == "absorb" then
|
||||
return
|
||||
@ -1326,7 +1345,7 @@ local Sync = (function()
|
||||
log("Delay", "New ",nd.etype,":",nd.path)
|
||||
end
|
||||
-- no block or combo
|
||||
table.insert(self.delays, nd)
|
||||
nd.dpos = Queue.push(self.delays, nd)
|
||||
end
|
||||
|
||||
|
||||
@ -1341,7 +1360,7 @@ local Sync = (function()
|
||||
-- first checks if more processses could be spawned
|
||||
if self.processes:size() < self.config.maxProcesses then
|
||||
-- finds the nearest delay waiting to be spawned
|
||||
for _, d in ipairs(self.delays) do
|
||||
for _, d in Queue.qpairs(self.delays) do
|
||||
if d.status == "wait" then
|
||||
return d.alarm
|
||||
end
|
||||
@ -1374,7 +1393,7 @@ local Sync = (function()
|
||||
end
|
||||
end
|
||||
|
||||
for i, d in ipairs(self.delays) do
|
||||
for i, d in Queue.qpairs(self.delays) do
|
||||
if d.status == "active" or
|
||||
(test and not test(InletFactory.d2e(self, d)))
|
||||
then
|
||||
@ -1386,7 +1405,6 @@ local Sync = (function()
|
||||
|
||||
--- TODO: make incremental indexes in dlist,
|
||||
-- and replace pairs with ipairs.
|
||||
|
||||
return dlist
|
||||
end
|
||||
|
||||
@ -1399,9 +1417,9 @@ local Sync = (function()
|
||||
-- no new processes
|
||||
return
|
||||
end
|
||||
for _, d in ipairs(self.delays) do
|
||||
if #self.delays < self.config.maxDelays then
|
||||
-- time constrains only are only a concern if not maxed
|
||||
for _, d in Queue.qpairs(self.delays) do
|
||||
if self.delays.size < self.config.maxDelays then
|
||||
-- time constrains are only concerned if not maxed
|
||||
-- the delay FIFO already.
|
||||
if d.alarm ~= true and timestamp < d.alarm then
|
||||
-- reached point in stack where delays are in future
|
||||
@ -1423,9 +1441,9 @@ local Sync = (function()
|
||||
-- Gets the next event to be processed.
|
||||
--
|
||||
local function getNextDelay(self, timestamp)
|
||||
for i, d in ipairs(self.delays) do
|
||||
if #self.delays < self.config.maxDelays then
|
||||
-- time constrains only are only a concern if not maxed
|
||||
for i, d in Queue.qpairs(self.delays) do
|
||||
if self.delays.size < self.config.maxDelays then
|
||||
-- time constrains are only concerned if not maxed
|
||||
-- the delay FIFO already.
|
||||
if d.alarm ~= true and timestamp < d.alarm then
|
||||
-- reached point in stack where delays are in future
|
||||
@ -1445,7 +1463,7 @@ local Sync = (function()
|
||||
--
|
||||
local function addBlanketDelay(self)
|
||||
local newd = Delay.new("Blanket", true, "")
|
||||
table.insert(self.delays, newd)
|
||||
newd.dpos = Queue.push(self.delays, newd)
|
||||
return newd
|
||||
end
|
||||
|
||||
@ -1455,8 +1473,8 @@ local Sync = (function()
|
||||
local function statusReport(self, f)
|
||||
local spaces = " "
|
||||
f:write(self.config.name," source=",self.source,"\n")
|
||||
f:write("There are ",#self.delays, " delays\n")
|
||||
for i, vd in ipairs(self.delays) do
|
||||
f:write("There are ",self.delays.size, " delays\n")
|
||||
for i, vd in Queue.qpairs(self.delays) do
|
||||
local st = vd.status
|
||||
f:write(st, string.sub(spaces, 1, 7 - #st))
|
||||
f:write(vd.etype," ")
|
||||
@ -1488,7 +1506,7 @@ local Sync = (function()
|
||||
local s = {
|
||||
-- fields
|
||||
config = config,
|
||||
delays = CountArray.new(),
|
||||
delays = Queue.new(),
|
||||
source = config.source,
|
||||
processes = CountArray.new(),
|
||||
excludes = Excludes.new(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user