lsyncd/default-rsync.lua
Daniel Poelzleithner 0f9a99f6ea Implement batchSizeLimit for rsync based transfers
If the batchSizeLimit is set, only files lower then this limit will
be grouped in one rsync transfer.
Each file larger then this limit will spawn their own transfer process.
This will cause large files to no longer block small file transfers under the
circumstance the maxProcess limit on the sync is larger then 1

A very optimized, very secure transfer configuration based on a
pool of ssh connection looks like this:

```
sync {
        default.rsync,
        tunnel = tunnel {
            command = {"ssh", "-N", "-L", "localhost:${localport}:localhost:873", "user@targetmachine"},
            mode = "pool",
            parallel = 2,
        },
        source = "/tmp/src",
        target    = "rsync://localhost:${localport}/test",
        delay = 5,
        batchSizeLimit = 1024 * 1024 * 30,
        maxProcesses = 4,
        rsync = {
                verbose = true,
                inplace = true,
        }
}
```

If you configure remote ssh configuration only allows portforwarding and your rsync daemon
is configured correctly, you can very securely transfer data without giving shell access.
2022-04-14 14:05:49 +02:00

811 lines
16 KiB
Lua

--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- default-rsync.lua
--
-- Syncs with rsync ("classic" Lsyncd)
-- A (Layer 1) configuration.
--
-- Note:
-- this is infact just a configuration using Layer 1 configuration
-- like any other. It only gets compiled into the binary by default.
-- You can simply use a modified one, by copying everything into a
-- config file of yours and name it differently.
--
-- License: GPLv2 (see COPYING) or any later version
-- Authors: Axel Kittenberger <axkibe@gmail.com>
--
--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if not default then error( 'default not loaded' ) end
if default.rsync then error( 'default-rsync already loaded' ) end
local rsync = { }
default.rsync = rsync
-- uses default collect
--
-- used to ensure there aren't typos in the keys
--
rsync.checkgauge = {
-- unsets default user action handlers
onCreate = false,
onModify = false,
onDelete = false,
onStartup = false,
onMove = false,
delete = true,
exclude = true,
excludeFrom = true,
filter = true,
filterFrom = true,
target = true,
batchSizeLimit = true,
rsync = {
acls = true,
append = true,
append_verify = true,
archive = true,
backup = true,
backup_dir = true,
binary = true,
bwlimit = true,
checksum = true,
chown = true,
chmod = true,
compress = true,
copy_dirlinks = true,
copy_links = true,
copy_unsafe_links = true,
cvs_exclude = true,
delete_excluded = true,
dry_run = true,
executability = true,
existing = true,
group = true,
groupmap = true,
hard_links = true,
ignore_times = true,
inplace = true,
ipv4 = true,
ipv6 = true,
keep_dirlinks = true,
links = true,
one_file_system = true,
omit_dir_times = true,
omit_link_times = true,
owner = true,
password_file = true,
perms = true,
protect_args = true,
prune_empty_dirs = true,
quiet = true,
rsh = true,
rsync_path = true,
sparse = true,
suffix = true,
temp_dir = true,
timeout = true,
times = true,
update = true,
usermap = true,
verbose = true,
whole_file = true,
xattrs = true,
_extra = true,
},
}
-- internal function to actually do the transfer
local run_action = function
(
inlet,
elist
)
local config = inlet.getConfig( )
local substitudes = inlet.getSubstitutionData(elist, {})
local target = substitudeCommands(config.target, substitudes)
--
-- Replaces what rsync would consider filter rules by literals
--
local function sub
(
p -- pattern
)
if not p then return end
return p:
gsub( '%?', '\\?' ):
gsub( '%*', '\\*' ):
gsub( '%[', '\\[' ):
gsub( '%]', '\\]' )
end
--
-- Gets the list of paths for the event list
--
-- Deletes create multi match patterns
--
local paths = elist.getPaths(
function
(
etype, -- event type
path1, -- path
path2 -- path to for move events
)
if string.byte( path1, -1 ) == 47 and etype == 'Delete'
then
return sub( path1 )..'***', sub( path2 )
else
return sub( path1 ), sub( path2 )
end
end
)
-- stores all filters by integer index
local filterI = { }
-- stores all filters with path index
local filterP = { }
-- adds one path to the filter
local function addToFilter
(
path
)
if filterP[ path ] then return end
filterP[ path ] = true
table.insert( filterI, path )
end
-- adds a path to the filter.
--
-- rsync needs to have entries for all steps in the path,
-- so the file for example d1/d2/d3/f1 needs following filters:
-- 'd1/', 'd1/d2/', 'd1/d2/d3/' and 'd1/d2/d3/f1'
for _, path in ipairs( paths )
do
if path and path ~= ''
then
addToFilter( path )
local pp = string.match( path, '^(.*/)[^/]+/?' )
while pp
do
addToFilter( pp )
pp = string.match( pp, '^(.*/)[^/]+/?' )
end
end
end
log(
'Normal',
'Calling rsync with filter-list of new/modified files/dirs\n',
table.concat( filterI, '\n' )
)
local config = inlet.getConfig( )
local delete = nil
if config.delete == true or config.delete == 'running'
then
delete = { '--delete', '--ignore-errors' }
end
spawn(
elist,
config.rsync.binary,
'<', table.concat( filterI, '\000' ),
config.rsync._computed,
'-r',
delete,
'--force',
'--from0',
'--include-from=-',
'--exclude=*',
config.source,
target
)
end
--
-- Returns true for non Init and Blanket events.
--
local eventNotInitBlank =
function
(
event
)
return event.etype ~= 'Init' and event.etype ~= 'Blanket'
end
--
-- Returns size or true if the event is for batch processing
--
local getBatchSize =
function
(
event
)
-- print("getBatchSize", event, event.status, event.etype, event.pathname)
if event.status == 'active' then
return false
end
if event.etype == 'Init' or event.etype == 'Blanket' then
return false
end
-- moves and deletes go always into batch
if event.etype == 'Move' or event.etype == 'Delete' then
return true
end
return lsyncd.get_file_size(event.sourcePath)
end
--
-- Spawns rsync for a list of events
--
-- Exclusions are already handled by not having
-- events for them.
--
rsync.action = function
(
inlet
)
local sizeLimit = inlet.getConfig().batchSizeLimit
if sizeLimit == nil then
-- gets all events ready for syncing
return run_action(inlet, inlet.getEvents(eventNotInitBlank))
else
-- spawn all files under the size limit/deletes/moves in batch mode
local eventInBatch = function(event)
local size = getBatchSize(event)
if type(size) == "boolean" then
return size
elseif size == nil then
return true
end
if size <= sizeLimit then
return true
end
return false
end
-- indicator for grabbing one element of the queue
local single_returned = false
-- grab all events for seperate transfers
local eventNoBatch = function(event)
local size = getBatchSize(event)
if type(size) ~= "number" or size == nil then
return false
end
if single_returned then
return 'break'
end
if size > sizeLimit then
single_returned = true
return true
end
return false
end
local elist = inlet.getEvents(eventInBatch)
if elist.size() > 0 then
run_action(inlet, elist)
end
while true do
local cnt, maxcnt = lsyncd.get_process_info()
if inlet.getSync().processes:size( ) >= inlet.getConfig().maxProcesses then
log('Normal',
'Maximum processes for sync reached. Delaying large transfer for sync: '..inlet.getConfig().name)
break
elseif maxcnt and cnt >= maxcnt then
log('Normal',
'Maximum process count reached. Delaying large transfer for sync: '..inlet.getConfig().name)
break
end
local extralist = inlet.getEvents(eventNoBatch)
-- no more single size events
if extralist.size() == 0 then break end
run_action(inlet, extralist)
-- get next result
single_returned = false
end
end
end
----
---- NOTE: This optimized version can be used once
---- https://bugzilla.samba.org/show_bug.cgi?id=12569
---- is fixed.
----
---- Spawns rsync for a list of events
----
---- Exclusions are already handled by not having
---- events for them.
----
--rsync.action = function
--(
-- inlet
--)
-- local config = inlet.getConfig( )
--
-- -- gets all events ready for syncing
-- local elist = inlet.getEvents( eventNotInitBlank )
--
-- -- gets the list of paths for the event list
-- -- deletes create multi match patterns
-- local paths = elist.getPaths( )
--
-- -- removes trailing slashes from dirs.
-- for k, v in ipairs( paths )
-- do
-- if string.byte( v, -1 ) == 47
-- then
-- paths[ k ] = string.sub( v, 1, -2 )
-- end
-- end
--
-- log(
-- 'Normal',
-- 'Calling rsync with filter-list of new/modified files/dirs\n',
-- table.concat( paths, '\n' )
-- )
--
-- local delete = nil
--
-- if config.delete == true
-- or config.delete == 'running'
-- then
-- delete = { '--delete-missing-args', '--ignore-errors' }
-- end
--
-- spawn(
-- elist,
-- config.rsync.binary,
-- '<', table.concat( paths, '\000' ),
-- config.rsync._computed,
-- delete,
-- '--force',
-- '--from0',
-- '--files-from=-',
-- config.source,
-- config.target
-- )
--end
--
-- Spawns the recursive startup sync.
--
rsync.init = function
(
event
)
return rsync.full(event)
end
--
-- Triggers a full sync event
--
rsync.full = function
(
event
)
local config = event.config
local inlet = event.inlet
local excludes = inlet.getExcludes( )
local filters = inlet.hasFilters( ) and inlet.getFilters( )
local delete = {}
local target = config.target
if not target
then
if not config.host
then
error('Internal fail, Neither target nor host is configured')
end
target = config.host .. ':' .. config.targetdir
end
local substitudes = inlet.getSubstitutionData(event, {})
target = substitudeCommands(target, substitudes)
if config.delete == true
or config.delete == 'startup'
then
delete = { '--delete', '--ignore-errors' }
end
if config.rsync.delete_excluded == true
then
table.insert( delete, '--delete-excluded' )
end
if not filters and #excludes == 0
then
-- starts rsync without any filters or excludes
log(
'Normal',
'recursive startup rsync: ',
config.source,
' -> ',
target
)
spawn(
event,
config.rsync.binary,
delete,
config.rsync._computed,
'-r',
config.source,
target
)
elseif not filters
then
-- starts rsync providing an exclusion list
-- on stdin
local exS = table.concat( excludes, '\n' )
log(
'Normal',
'recursive startup rsync: ',
config.source,
' -> ',
target,
' excluding\n',
exS
)
spawn(
event,
config.rsync.binary,
'<', exS,
'--exclude-from=-',
delete,
config.rsync._computed,
'-r',
config.source,
target
)
else
-- starts rsync providing a filter list
-- on stdin
local fS = table.concat( filters, '\n' )
log(
'Normal',
'recursive startup rsync: ',
config.source,
' -> ',
target,
' filtering\n',
fS
)
spawn(
event,
config.rsync.binary,
'<', fS,
'--filter=. -',
delete,
config.rsync._computed,
'-r',
config.source,
target
)
end
end
--
-- Prepares and checks a syncs configuration on startup.
--
rsync.prepare = function
(
config, -- the configuration
level, -- additional error level for inherited use ( by rsyncssh )
skipTarget -- used by rsyncssh, do not check for target
)
-- First let default.prepare test the checkgauge
default.prepare( config, level + 6 )
if not skipTarget and not config.target
then
error(
'default.rsync needs "target" configured',
level
)
end
-- checks if the _computed argument exists already
if config.rsync._computed
then
error(
'please do not use the internal rsync._computed parameter',
level
)
end
-- computes the rsync arguments into one list
local crsync = config.rsync;
-- everything implied by archive = true
local archiveFlags = {
recursive = true,
links = true,
perms = true,
times = true,
group = true,
owner = true,
devices = true,
specials = true,
hard_links = false,
acls = false,
xattrs = false,
}
-- if archive is given the implications are filled in
if crsync.archive
then
for k, v in pairs( archiveFlags )
do
if crsync[ k ] == nil
then
crsync[ k ] = v
end
end
end
crsync._computed = { true }
local computed = crsync._computed
local computedN = 2
local shortFlags = {
acls = 'A',
backup = 'b',
checksum = 'c',
compress = 'z',
copy_dirlinks = 'k',
copy_links = 'L',
cvs_exclude = 'C',
dry_run = 'n',
executability = 'E',
group = 'g',
hard_links = 'H',
ignore_times = 'I',
ipv4 = '4',
ipv6 = '6',
keep_dirlinks = 'K',
links = 'l',
one_file_system = 'x',
omit_dir_times = 'O',
omit_link_times = 'J',
owner = 'o',
perms = 'p',
protect_args = 's',
prune_empty_dirs = 'm',
quiet = 'q',
sparse = 'S',
times = 't',
update = 'u',
verbose = 'v',
whole_file = 'W',
xattrs = 'X',
}
local shorts = { '-' }
local shortsN = 2
if crsync._extra
then
for k, v in ipairs( crsync._extra )
do
computed[ computedN ] = v
computedN = computedN + 1
end
end
for k, flag in pairs( shortFlags )
do
if crsync[ k ]
then
shorts[ shortsN ] = flag
shortsN = shortsN + 1
end
end
if crsync.devices and crsync.specials
then
shorts[ shortsN ] = 'D'
shortsN = shortsN + 1
else
if crsync.devices
then
computed[ computedN ] = '--devices'
computedN = computedN + 1
end
if crsync.specials
then
computed[ computedN ] = '--specials'
computedN = computedN + 1
end
end
if crsync.append
then
computed[ computedN ] = '--append'
computedN = computedN + 1
end
if crsync.append_verify
then
computed[ computedN ] = '--append-verify'
computedN = computedN + 1
end
if crsync.backup_dir
then
computed[ computedN ] = '--backup-dir=' .. crsync.backup_dir
computedN = computedN + 1
end
if crsync.bwlimit
then
computed[ computedN ] = '--bwlimit=' .. crsync.bwlimit
computedN = computedN + 1
end
if crsync.chmod
then
computed[ computedN ] = '--chmod=' .. crsync.chmod
computedN = computedN + 1
end
if crsync.chown
then
computed[ computedN ] = '--chown=' .. crsync.chown
computedN = computedN + 1
end
if crsync.copy_unsafe_links
then
computed[ computedN ] = '--copy-unsafe-links'
computedN = computedN + 1
end
if crsync.groupmap
then
computed[ computedN ] = '--groupmap=' .. crsync.groupmap
computedN = computedN + 1
end
if crsync.existing
then
computed[ computedN ] = '--existing'
computedN = computedN + 1
end
if crsync.inplace
then
computed[ computedN ] = '--inplace'
computedN = computedN + 1
end
if crsync.password_file
then
computed[ computedN ] = '--password-file=' .. crsync.password_file
computedN = computedN + 1
end
if crsync.rsh
then
computed[ computedN ] = '--rsh=' .. crsync.rsh
computedN = computedN + 1
end
if crsync.rsync_path
then
computed[ computedN ] = '--rsync-path=' .. crsync.rsync_path
computedN = computedN + 1
end
if crsync.suffix
then
computed[ computedN ] = '--suffix=' .. crsync.suffix
computedN = computedN + 1
end
if crsync.temp_dir
then
computed[ computedN ] = '--temp-dir=' .. crsync.temp_dir
computedN = computedN + 1
end
if crsync.timeout
then
computed[ computedN ] = '--timeout=' .. crsync.timeout
computedN = computedN + 1
end
if crsync.usermap
then
computed[ computedN ] = '--usermap=' .. crsync.usermap
computedN = computedN + 1
end
if shortsN ~= 2
then
computed[ 1 ] = table.concat( shorts, '' )
else
computed[ 1 ] = { }
end
-- appends a / to target if not present
-- and not a ':' for home dir.
if not skipTarget
and string.sub( config.target, -1 ) ~= '/'
and string.sub( config.target, -1 ) ~= ':'
then
config.target = config.target..'/'
end
end
--
-- By default do deletes.
--
rsync.delete = true
--
-- Rsyncd exitcodes
--
rsync.exitcodes = default.rsyncExitCodes
--
-- Calls rsync with this default options
--
rsync.rsync =
{
-- The rsync binary to be called.
binary = 'rsync',
links = true,
times = true,
protect_args = true
}
--
-- Default delay
--
rsync.delay = 15