Implement batchSizeLimit for rsync based transfers

If the batchSizeLimit is set, only files lower then this limit will
be grouped in one rsync transfer.
Each file larger then this limit will spawn their own transfer process.
This will cause large files to no longer block small file transfers under the
circumstance the maxProcess limit on the sync is larger then 1

A very optimized, very secure transfer configuration based on a
pool of ssh connection looks like this:

```
sync {
        default.rsync,
        tunnel = tunnel {
            command = {"ssh", "-N", "-L", "localhost:${localport}:localhost:873", "user@targetmachine"},
            mode = "pool",
            parallel = 2,
        },
        source = "/tmp/src",
        target    = "rsync://localhost:${localport}/test",
        delay = 5,
        batchSizeLimit = 1024 * 1024 * 30,
        maxProcesses = 4,
        rsync = {
                verbose = true,
                inplace = true,
        }
}
```

If you configure remote ssh configuration only allows portforwarding and your rsync daemon
is configured correctly, you can very securely transfer data without giving shell access.
This commit is contained in:
Daniel Poelzleithner 2022-04-14 14:05:49 +02:00
parent 7f0127548b
commit 075d64a069
2 changed files with 135 additions and 31 deletions

View File

@ -45,6 +45,7 @@ rsync.checkgauge = {
filter = true,
filterFrom = true,
target = true,
batchSizeLimit = true,
rsync = {
acls = true,
@ -102,40 +103,17 @@ rsync.checkgauge = {
}
--
-- Returns true for non Init and Blanket events.
--
local eventNotInitBlank =
function
(
event
)
return event.etype ~= 'Init' and event.etype ~= 'Blanket'
end
--
-- Spawns rsync for a list of events
--
-- Exclusions are already handled by not having
-- events for them.
--
rsync.action = function
(
inlet
)
-- internal function to actually do the transfer
local run_action = function
(
inlet,
elist
)
local config = inlet.getConfig( )
-- gets all events ready for syncing
local elist = inlet.getEvents( eventNotInitBlank )
local substitudes = inlet.getSubstitutionData(elist, {})
local target = substitudeCommands(config.target, substitudes)
-- gets the list of paths for the event list
-- deletes create multi match patterns
local paths = elist.getPaths( )
--
-- Replaces what rsync would consider filter rules by literals
--
@ -245,6 +223,114 @@ rsync.action = function
end
--
-- Returns true for non Init and Blanket events.
--
local eventNotInitBlank =
function
(
event
)
return event.etype ~= 'Init' and event.etype ~= 'Blanket'
end
--
-- Returns size or true if the event is for batch processing
--
local getBatchSize =
function
(
event
)
-- print("getBatchSize", event, event.status, event.etype, event.pathname)
if event.status == 'active' then
return false
end
if event.etype == 'Init' or event.etype == 'Blanket' then
return false
end
-- moves and deletes go always into batch
if event.etype == 'Move' or event.etype == 'Delete' then
return true
end
return lsyncd.get_file_size(event.sourcePath)
end
--
-- Spawns rsync for a list of events
--
-- Exclusions are already handled by not having
-- events for them.
--
rsync.action = function
(
inlet
)
local sizeLimit = inlet.getConfig().batchSizeLimit
if sizeLimit == nil then
-- gets all events ready for syncing
return run_action(inlet, inlet.getEvents(eventNotInitBlank))
else
-- spawn all files under the size limit/deletes/moves in batch mode
local eventInBatch = function(event)
local size = getBatchSize(event)
if type(size) == "boolean" then
return size
elseif size == nil then
return true
end
if size <= sizeLimit then
return true
end
return false
end
-- indicator for grabbing one element of the queue
local single_returned = false
-- grab all events for seperate transfers
local eventNoBatch = function(event)
local size = getBatchSize(event)
if type(size) ~= "number" or size == nil then
return false
end
if single_returned then
return 'break'
end
if size > sizeLimit then
single_returned = true
return true
end
return false
end
local elist = inlet.getEvents(eventInBatch)
if elist.size() > 0 then
run_action(inlet, elist)
end
while true do
local cnt, maxcnt = lsyncd.get_process_info()
if inlet.getSync().processes:size( ) >= inlet.getConfig().maxProcesses then
log('Normal',
'Maximum processes for sync reached. Delaying large transfer for sync: '..inlet.getConfig().name)
break
elseif maxcnt and cnt >= maxcnt then
log('Normal',
'Maximum process count reached. Delaying large transfer for sync: '..inlet.getConfig().name)
break
end
local extralist = inlet.getEvents(eventNoBatch)
-- no more single size events
if extralist.size() == 0 then break end
run_action(inlet, extralist)
-- get next result
single_returned = false
end
end
end
----
---- NOTE: This optimized version can be used once
---- https://bugzilla.samba.org/show_bug.cgi?id=12569

View File

@ -232,6 +232,11 @@ local uSettings = { }
local settingsSafe
-- Access to process counter
lsyncd.get_process_info = function()
return processCount, uSettings.maxProcesses
end
--============================================================================
-- Lsyncd Prototypes
--============================================================================
@ -1545,7 +1550,7 @@ local InletFactory = ( function
end,
--
-- Returns the relative dir/file appended to the target
-- Returns the absolute dir/file appended to the target
-- including a trailing slash for dirs.
--
targetPath = function
@ -1661,7 +1666,20 @@ local InletFactory = ( function
return result
end
end,
--
-- Returns the size of the eventlist
--
size = function( elist )
local dlist = e2d[ elist ]
if not dlist then
return 0
else
return #dlist
end
end,
}
--