mirror of
https://github.com/octoleo/lsyncd.git
synced 2024-11-16 01:47:08 +00:00
809 lines
13 KiB
Lua
809 lines
13 KiB
Lua
--
|
|
-- sync.lua from Lsyncd -- the Live (Mirror) Syncing Demon
|
|
--
|
|
--
|
|
-- Holds information about one observed directory including subdirs.
|
|
--
|
|
--
|
|
-- This code assumes your editor is at least 100 chars wide.
|
|
--
|
|
-- License: GPLv2 (see COPYING) or any later version
|
|
-- Authors: Axel Kittenberger <axkibe@gmail.com>
|
|
--
|
|
if mantle
|
|
then
|
|
print( 'Error, Lsyncd mantle already loaded' )
|
|
os.exit( -1 )
|
|
end
|
|
|
|
|
|
--
|
|
-- Syncs that have no name specified by the user script
|
|
-- get an incremental default name 'Sync[X]'
|
|
--
|
|
local nextDefaultName = 1
|
|
|
|
|
|
--
|
|
-- Appends a filter to the sync
|
|
--
|
|
local function appendFilter
|
|
(
|
|
self,
|
|
rule,
|
|
pattern
|
|
)
|
|
if not self.filters then self.filters = Filters.new( ) end
|
|
|
|
return self.filters:append( rule, pattern )
|
|
end
|
|
|
|
|
|
--
|
|
-- Removes a delay.
|
|
--
|
|
local function removeDelay
|
|
(
|
|
self,
|
|
delay
|
|
)
|
|
if self.delays[ delay.dpos ] ~= delay
|
|
then
|
|
error( 'Queue is broken, delay not at dpos' )
|
|
end
|
|
|
|
self.delays:remove( delay.dpos )
|
|
|
|
-- frees all delays blocked by this one.
|
|
if delay.blocks
|
|
then
|
|
for _, vd in pairs( delay.blocks )
|
|
do
|
|
vd.status = 'wait'
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
--
|
|
-- Returns false if the relative path is filtered
|
|
--
|
|
local function testFilter
|
|
(
|
|
self, -- the Sync
|
|
path -- the relative path
|
|
)
|
|
-- never filter the relative root itself
|
|
-- ( that would make zero sense )
|
|
if path == '/'
|
|
or not self.filters
|
|
then
|
|
return true
|
|
end
|
|
|
|
return self.filters:test( path )
|
|
end
|
|
|
|
|
|
--
|
|
-- Returns true if this Sync concerns about 'path'.
|
|
--
|
|
local function concerns
|
|
(
|
|
self, -- the Sync
|
|
path -- the absolute path
|
|
)
|
|
-- not concerned if watch rootdir doesn't match
|
|
if not path:starts( self.source )
|
|
then
|
|
return false
|
|
end
|
|
|
|
-- a sub dir and not concerned about subdirs
|
|
if self.config.subdirs == false
|
|
and path:sub( #self.source, -1 ):match( '[^/]+/?' )
|
|
then
|
|
return false
|
|
end
|
|
|
|
return testFilter( self, path:sub( #self.source ) )
|
|
end
|
|
|
|
|
|
--
|
|
-- Collects a child process.
|
|
--
|
|
local function collect
|
|
(
|
|
self, -- the sync
|
|
pid, -- process id of collected child process
|
|
exitcode -- exitcode of child process
|
|
)
|
|
local delay = self.processes[ pid ]
|
|
|
|
-- not a child of this sync?
|
|
if not delay then return end
|
|
|
|
if delay.status
|
|
then
|
|
log( 'Delay', 'collected an event' )
|
|
|
|
if delay.status ~= 'active'
|
|
then
|
|
error( 'collecting a non-active process' )
|
|
end
|
|
|
|
local rc = self.config.collect(
|
|
InletFactory.d2e( delay ),
|
|
exitcode
|
|
)
|
|
|
|
if rc == 'die'
|
|
then
|
|
log( 'Error', 'Critical exitcode.' )
|
|
|
|
terminate( -1 )
|
|
elseif rc ~= 'again'
|
|
then
|
|
-- if its active again the collecter restarted the event
|
|
removeDelay( self, delay )
|
|
|
|
log(
|
|
'Delay',
|
|
'Finish of ',
|
|
delay.etype,
|
|
' on ',
|
|
self.source,delay.path,
|
|
' = ',
|
|
exitcode
|
|
)
|
|
else
|
|
-- sets the delay on wait again
|
|
local alarm = self.config.delay
|
|
|
|
-- delays at least 1 second
|
|
if alarm < 1 then alarm = 1 end
|
|
|
|
delay:wait( now( ) + alarm )
|
|
end
|
|
else
|
|
log( 'Delay', 'collected a list' )
|
|
|
|
local rc = self.config.collect(
|
|
InletFactory.dl2el( delay ),
|
|
exitcode
|
|
)
|
|
|
|
if rc == 'die'
|
|
then
|
|
log( 'Error', 'Critical exitcode.' );
|
|
|
|
terminate( -1 )
|
|
elseif rc == 'again'
|
|
then
|
|
-- sets the delay on wait again
|
|
local alarm = self.config.delay
|
|
|
|
-- delays are at least 1 second
|
|
if alarm < 1 then alarm = 1 end
|
|
|
|
alarm = now() + alarm
|
|
|
|
for _, d in ipairs( delay )
|
|
do
|
|
d:wait( alarm )
|
|
end
|
|
else
|
|
for _, d in ipairs( delay )
|
|
do
|
|
removeDelay( self, d )
|
|
end
|
|
end
|
|
|
|
log( 'Delay','Finished list = ',exitcode )
|
|
end
|
|
|
|
self.processes[ pid ] = nil
|
|
end
|
|
|
|
|
|
--
|
|
-- Stacks a newDelay on the oldDelay,
|
|
-- the oldDelay blocks the new Delay.
|
|
--
|
|
-- A delay can block 'n' other delays,
|
|
-- but is blocked at most by one, the latest delay.
|
|
--
|
|
local function stack
|
|
(
|
|
oldDelay,
|
|
newDelay
|
|
)
|
|
newDelay:blockedBy( oldDelay )
|
|
end
|
|
|
|
|
|
--
|
|
-- Puts an action on the delay stack.
|
|
--
|
|
local function delay
|
|
(
|
|
self, -- the sync
|
|
etype, -- the event type
|
|
time, -- time of the event
|
|
path, -- path of the event
|
|
path2 -- desitination path of move events
|
|
)
|
|
log(
|
|
'Function',
|
|
'delay( ',
|
|
self.config.name, ', ',
|
|
etype, ', ',
|
|
path, ', ',
|
|
path2,
|
|
' )'
|
|
)
|
|
|
|
--
|
|
-- In case new directories were created
|
|
-- looks through this directories and makes create events for
|
|
-- new stuff found in there.
|
|
--
|
|
local function recurse
|
|
( )
|
|
if etype == 'Create' and path:byte( -1 ) == 47
|
|
then
|
|
local entries = core.readdir( self.source .. path )
|
|
|
|
if entries
|
|
then
|
|
for dirname, isdir in pairs( entries )
|
|
do
|
|
local pd = path .. dirname
|
|
|
|
if isdir then pd = pd..'/' end
|
|
|
|
log( 'Delay', 'Create creates Create on ', pd )
|
|
|
|
delay( self, 'Create', time, pd, nil )
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
-- exclusion tests
|
|
if not path2
|
|
then
|
|
-- simple test for single path events
|
|
if not testFilter( self, path )
|
|
then
|
|
log( 'Filter', 'filtered ', etype, ' on "', path, '"' )
|
|
|
|
return
|
|
end
|
|
else
|
|
-- for double paths ( move ) it might result into a split
|
|
local ex1 = not testFilter( self, path )
|
|
|
|
local ex2 = not testFilter( self, path2 )
|
|
|
|
if ex1 and ex2
|
|
then
|
|
log(
|
|
'Filter',
|
|
'filtered "', etype, ' on "', path,
|
|
'" -> "', path2, '"'
|
|
)
|
|
|
|
return
|
|
elseif not ex1 and ex2
|
|
then
|
|
-- splits the move if only partly filtered
|
|
log(
|
|
'Filter',
|
|
'filtered destination transformed ',
|
|
etype,
|
|
' to Delete ',
|
|
path
|
|
)
|
|
|
|
delay( self, 'Delete', time, path, nil )
|
|
|
|
return
|
|
elseif ex1 and not ex2
|
|
then
|
|
-- splits the move if only partly filtered
|
|
log(
|
|
'Filter',
|
|
'filtered origin transformed ',
|
|
etype,
|
|
' to Create.',
|
|
path2
|
|
)
|
|
|
|
delay( self, 'Create', time, path2, nil )
|
|
|
|
return
|
|
end
|
|
end
|
|
|
|
if etype == 'Move'
|
|
and not self.config.onMove
|
|
then
|
|
-- if there is no move action defined,
|
|
-- split a move as delete/create
|
|
-- layer 1 scripts which want moves events have to
|
|
-- set onMove simply to 'true'
|
|
log( 'Delay', 'splitting Move into Delete & Create' )
|
|
|
|
delay( self, 'Delete', time, path, nil )
|
|
|
|
delay( self, 'Create', time, path2, nil )
|
|
|
|
return
|
|
end
|
|
|
|
-- creates the new action
|
|
local alarm
|
|
|
|
if time and self.config.delay
|
|
then
|
|
alarm = time + self.config.delay
|
|
else
|
|
alarm = now( )
|
|
end
|
|
|
|
-- new delay
|
|
local nd = Delay.new( etype, self, alarm, path, path2 )
|
|
|
|
if nd.etype == 'Init' or nd.etype == 'Blanket'
|
|
then
|
|
-- always stack init or blanket events on the last event
|
|
log(
|
|
'Delay',
|
|
'Stacking ',
|
|
nd.etype,
|
|
' event.'
|
|
)
|
|
|
|
if #self.delays > 0
|
|
then
|
|
stack( self.delays:last( ), nd )
|
|
end
|
|
|
|
nd.dpos = self.delays:push( nd )
|
|
|
|
recurse( )
|
|
|
|
return
|
|
end
|
|
|
|
-- detects blocks and combos by working from back until
|
|
-- front through the fifo
|
|
for il, od in self.delays:qpairsReverse( )
|
|
do
|
|
-- asks Combiner what to do
|
|
local ac = Combiner.combine( od, nd )
|
|
|
|
if ac
|
|
then
|
|
Combiner.log( ac, od, nd )
|
|
|
|
if ac == 'remove'
|
|
then
|
|
self.delays:remove( il )
|
|
elseif ac == 'stack'
|
|
then
|
|
stack( od, nd )
|
|
|
|
nd.dpos = self.delays:push( nd )
|
|
elseif ac == 'toDelete,stack'
|
|
then
|
|
if od.status ~= 'active'
|
|
then
|
|
-- turns olddelay into a delete
|
|
local rd = Delay.new( 'Delete', self, od.alarm, od.path )
|
|
|
|
self.delays:replace( il, rd )
|
|
|
|
rd.dpos = il
|
|
|
|
-- and stacks delay2
|
|
stack( rd, nd )
|
|
else
|
|
-- and stacks delay2
|
|
stack( od, nd )
|
|
end
|
|
|
|
nd.dpos = self.delays:push( nd )
|
|
elseif ac == 'absorb'
|
|
then
|
|
-- nada
|
|
elseif ac == 'replace'
|
|
then
|
|
if od.status ~= 'active'
|
|
then
|
|
self.delays:replace( il, nd )
|
|
|
|
nd.dpos = il
|
|
else
|
|
stack( od, nd )
|
|
|
|
nd.dpos = self.delays:push( nd )
|
|
end
|
|
elseif ac == 'split'
|
|
then
|
|
delay( self, 'Delete', time, path, nil )
|
|
|
|
delay( self, 'Create', time, path2, nil )
|
|
else
|
|
error( 'unknown result of combine()' )
|
|
end
|
|
|
|
recurse( )
|
|
|
|
return
|
|
end
|
|
|
|
il = il - 1
|
|
end
|
|
|
|
if nd.path2
|
|
then
|
|
log( 'Delay', 'New ', nd.etype, ': ', nd.path, ' -> ', nd.path2 )
|
|
else
|
|
log( 'Delay', 'New ', nd.etype, ': ', nd.path )
|
|
end
|
|
|
|
-- no block or combo
|
|
nd.dpos = self.delays:push( nd )
|
|
|
|
recurse( )
|
|
end
|
|
|
|
|
|
--
|
|
-- Returns the soonest alarm for this Sync.
|
|
--
|
|
local function getAlarm
|
|
(
|
|
self
|
|
)
|
|
if #self.processes >= self.config.maxProcesses
|
|
then
|
|
return false
|
|
end
|
|
|
|
-- first checks if more processes could be spawned
|
|
-- finds the nearest delay waiting to be spawned
|
|
for _, d in self.delays:qpairs( )
|
|
do
|
|
if d.status == 'wait'
|
|
then
|
|
return d.alarm
|
|
end
|
|
end
|
|
|
|
-- nothing to spawn
|
|
return false
|
|
end
|
|
|
|
|
|
--
|
|
-- Gets all delays that are not blocked by active delays.
|
|
--
|
|
local function getDelays
|
|
(
|
|
self, -- the sync
|
|
test -- function to test each delay
|
|
)
|
|
local dlist = { sync = self }
|
|
|
|
local dlistn = 1
|
|
|
|
local blocks = { }
|
|
|
|
--
|
|
-- inheritly transfers all blocks from delay
|
|
--
|
|
local function getBlocks
|
|
(
|
|
delay
|
|
)
|
|
blocks[ delay ] = true
|
|
|
|
if delay.blocks
|
|
then
|
|
for _, d in ipairs( delay.blocks )
|
|
do
|
|
getBlocks( d )
|
|
end
|
|
end
|
|
end
|
|
|
|
for _, d in self.delays:qpairs( )
|
|
do
|
|
local tr = true
|
|
|
|
if test
|
|
then
|
|
tr = test( InletFactory.d2e( d ) )
|
|
end
|
|
|
|
if tr == 'break' then break end
|
|
|
|
if d.status == 'active' or not tr
|
|
then
|
|
getBlocks( d )
|
|
elseif not blocks[ d ]
|
|
then
|
|
dlist[ dlistn ] = d
|
|
|
|
dlistn = dlistn + 1
|
|
end
|
|
end
|
|
|
|
return dlist
|
|
end
|
|
|
|
|
|
--
|
|
-- Creates new actions
|
|
--
|
|
local function invokeActions
|
|
(
|
|
self,
|
|
timestamp
|
|
)
|
|
log( 'Function', 'invokeActions( "', self.config.name, '", ', timestamp, ' )' )
|
|
|
|
if #self.processes >= self.config.maxProcesses
|
|
then
|
|
-- no new processes
|
|
return
|
|
end
|
|
|
|
for _, d in self.delays:qpairs( )
|
|
do
|
|
-- if reached the global limit return
|
|
if uSettings.maxProcesses
|
|
and processCount >= uSettings.maxProcesses
|
|
then
|
|
log( 'Alarm', 'at global process limit.' )
|
|
return
|
|
end
|
|
|
|
if #self.delays < self.config.maxDelays
|
|
then
|
|
-- time constrains are only concerned if not maxed
|
|
-- the delay FIFO already.
|
|
if d.alarm ~= true and timestamp < d.alarm
|
|
then
|
|
-- reached point in stack where delays are in future
|
|
return
|
|
end
|
|
end
|
|
|
|
if d.status == 'wait'
|
|
then
|
|
-- found a waiting delay
|
|
if d.etype ~= 'Init'
|
|
then
|
|
self.config.action( self.inlet )
|
|
else
|
|
self.config.init( InletFactory.d2e( d ) )
|
|
end
|
|
|
|
if #self.processes >= self.config.maxProcesses
|
|
then
|
|
-- no further processes
|
|
return
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
--
|
|
-- Gets the next event to be processed.
|
|
--
|
|
local function getNextDelay
|
|
(
|
|
self,
|
|
timestamp
|
|
)
|
|
for i, d in self.delays:qpairs( )
|
|
do
|
|
if #self.delays < self.config.maxDelays
|
|
then
|
|
-- time constrains are only concerned if not maxed
|
|
-- the delay FIFO already.
|
|
if d.alarm ~= true and timestamp < d.alarm
|
|
then
|
|
-- reached point in stack where delays are in future
|
|
return nil
|
|
end
|
|
end
|
|
|
|
if d.status == 'wait'
|
|
then
|
|
-- found a waiting delay
|
|
return d
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
--
|
|
-- Adds and returns a blanket delay thats blocks all.
|
|
-- Used as custom marker.
|
|
--
|
|
local function addBlanketDelay
|
|
(
|
|
self
|
|
)
|
|
local newd = Delay.new( 'Blanket', self, true, '' )
|
|
|
|
newd.dpos = self.delays:push( newd )
|
|
|
|
return newd
|
|
end
|
|
|
|
|
|
--
|
|
-- Adds and returns a blanket delay thats blocks all.
|
|
-- Used as startup marker to call init asap.
|
|
--
|
|
local function addInitDelay
|
|
(
|
|
self
|
|
)
|
|
local newd = Delay.new( 'Init', self, true, '' )
|
|
|
|
newd.dpos = self.delays:push( newd )
|
|
|
|
return newd
|
|
end
|
|
|
|
|
|
--
|
|
-- Writes a status report about delays in this sync.
|
|
--
|
|
local function statusReport
|
|
(
|
|
self,
|
|
f
|
|
)
|
|
local spaces = ' '
|
|
|
|
f:write( self.config.name, ' source=', self.source, '\n' )
|
|
|
|
f:write( 'There are ', #self.delays, ' delays\n')
|
|
|
|
for i, vd in self.delays:qpairs( )
|
|
do
|
|
local st = vd.status
|
|
|
|
f:write( st, string.sub( spaces, 1, 7 - #st ) )
|
|
f:write( vd.etype, ' ' )
|
|
f:write( vd.path )
|
|
|
|
if vd.path2
|
|
then
|
|
f:write( ' -> ',vd.path2 )
|
|
end
|
|
|
|
f:write('\n')
|
|
|
|
end
|
|
|
|
f:write( 'Filtering:\n' )
|
|
|
|
local nothing = true
|
|
|
|
if self.filters
|
|
then
|
|
for _, e in pairs( self.filters.list )
|
|
do
|
|
nothing = false
|
|
|
|
f:write( e.rule, ' ', e.pattern,'\n' )
|
|
end
|
|
end
|
|
|
|
if nothing
|
|
then
|
|
f:write(' nothing.\n')
|
|
end
|
|
|
|
f:write( '\n' )
|
|
end
|
|
|
|
|
|
--
|
|
-- Creates a new Sync.
|
|
--
|
|
local function new
|
|
(
|
|
config
|
|
)
|
|
local s =
|
|
{
|
|
-- fields
|
|
config = config,
|
|
delays = Queue.new( ),
|
|
source = config.source,
|
|
processes = Counter.new( ),
|
|
filters = nil,
|
|
|
|
-- functions
|
|
addBlanketDelay = addBlanketDelay,
|
|
addExclude = addExclude,
|
|
addInitDelay = addInitDelay,
|
|
appendFilter = appendFilter,
|
|
collect = collect,
|
|
concerns = concerns,
|
|
delay = delay,
|
|
getAlarm = getAlarm,
|
|
getDelays = getDelays,
|
|
getNextDelay = getNextDelay,
|
|
invokeActions = invokeActions,
|
|
removeDelay = removeDelay,
|
|
rmExclude = rmExclude,
|
|
statusReport = statusReport,
|
|
}
|
|
|
|
s.inlet = InletFactory.newInlet( s )
|
|
|
|
-- provides a default name if needed
|
|
if not config.name
|
|
then
|
|
config.name = 'Sync' .. nextDefaultName
|
|
end
|
|
|
|
-- increments defaults if a config name was given or not
|
|
-- so Sync{n} will be the n-th call to sync{}
|
|
nextDefaultName = nextDefaultName + 1
|
|
|
|
-- loads filters
|
|
if config.filter
|
|
then
|
|
local te = type( config.filter )
|
|
|
|
s.filters = Filters.new( )
|
|
|
|
if te == 'table'
|
|
then
|
|
s.filters:appendList( config.filter )
|
|
elseif te == 'string'
|
|
then
|
|
s.filters:append( config.filter )
|
|
else
|
|
error( 'type for filter must be table or string', 2 )
|
|
end
|
|
|
|
end
|
|
|
|
if config.delay ~= nil
|
|
and ( type( config.delay ) ~= 'number' or config.delay < 0 )
|
|
then
|
|
error( 'delay must be a number and >= 0', 2 )
|
|
end
|
|
|
|
if config.filterFrom
|
|
then
|
|
if not s.filters then s.filters = Filters.new( ) end
|
|
|
|
s.filters:loadFile( config.filterFrom )
|
|
end
|
|
|
|
return s
|
|
end
|
|
|
|
|
|
--
|
|
-- Exported interface.
|
|
--
|
|
Sync = { new = new }
|
|
|