diff --git a/default-rsync.lua b/default-rsync.lua index 7f566d9..dfcfb17 100644 --- a/default-rsync.lua +++ b/default-rsync.lua @@ -45,6 +45,7 @@ rsync.checkgauge = { filter = true, filterFrom = true, target = true, + batchSizeLimit = true, rsync = { acls = true, @@ -102,40 +103,17 @@ rsync.checkgauge = { } --- --- Returns true for non Init and Blanket events. --- -local eventNotInitBlank = - function -( - event -) - return event.etype ~= 'Init' and event.etype ~= 'Blanket' -end - - --- --- Spawns rsync for a list of events --- --- Exclusions are already handled by not having --- events for them. --- -rsync.action = function -( - inlet -) +-- internal function to actually do the transfer +local run_action = function + ( + inlet, + elist + ) local config = inlet.getConfig( ) - -- gets all events ready for syncing - local elist = inlet.getEvents( eventNotInitBlank ) - local substitudes = inlet.getSubstitutionData(elist, {}) local target = substitudeCommands(config.target, substitudes) - -- gets the list of paths for the event list - -- deletes create multi match patterns - local paths = elist.getPaths( ) - -- -- Replaces what rsync would consider filter rules by literals -- @@ -245,6 +223,114 @@ rsync.action = function end +-- +-- Returns true for non Init and Blanket events. +-- +local eventNotInitBlank = + function +( + event +) + return event.etype ~= 'Init' and event.etype ~= 'Blanket' +end + +-- +-- Returns size or true if the event is for batch processing +-- +local getBatchSize = + function +( + event +) + -- print("getBatchSize", event, event.status, event.etype, event.pathname) + if event.status == 'active' then + return false + end + if event.etype == 'Init' or event.etype == 'Blanket' then + return false + end + -- moves and deletes go always into batch + if event.etype == 'Move' or event.etype == 'Delete' then + return true + end + return lsyncd.get_file_size(event.sourcePath) +end + +-- +-- Spawns rsync for a list of events +-- +-- Exclusions are already handled by not having +-- events for them. +-- +rsync.action = function + ( + inlet + ) + local sizeLimit = inlet.getConfig().batchSizeLimit + + if sizeLimit == nil then + -- gets all events ready for syncing + return run_action(inlet, inlet.getEvents(eventNotInitBlank)) + else + -- spawn all files under the size limit/deletes/moves in batch mode + local eventInBatch = function(event) + local size = getBatchSize(event) + if type(size) == "boolean" then + return size + elseif size == nil then + return true + end + if size <= sizeLimit then + return true + end + return false + end + + -- indicator for grabbing one element of the queue + local single_returned = false + -- grab all events for seperate transfers + local eventNoBatch = function(event) + local size = getBatchSize(event) + if type(size) ~= "number" or size == nil then + return false + end + if single_returned then + return 'break' + end + if size > sizeLimit then + single_returned = true + return true + end + return false + end + + local elist = inlet.getEvents(eventInBatch) + if elist.size() > 0 then + run_action(inlet, elist) + end + + while true do + local cnt, maxcnt = lsyncd.get_process_info() + if inlet.getSync().processes:size( ) >= inlet.getConfig().maxProcesses then + log('Normal', + 'Maximum processes for sync reached. Delaying large transfer for sync: '..inlet.getConfig().name) + break + elseif maxcnt and cnt >= maxcnt then + log('Normal', + 'Maximum process count reached. Delaying large transfer for sync: '..inlet.getConfig().name) + break + end + local extralist = inlet.getEvents(eventNoBatch) + + -- no more single size events + if extralist.size() == 0 then break end + run_action(inlet, extralist) + -- get next result + single_returned = false + end + end +end + ---- ---- NOTE: This optimized version can be used once ---- https://bugzilla.samba.org/show_bug.cgi?id=12569 diff --git a/lsyncd.lua b/lsyncd.lua index 6ad1630..fc3e4a6 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -232,6 +232,11 @@ local uSettings = { } local settingsSafe +-- Access to process counter +lsyncd.get_process_info = function() + return processCount, uSettings.maxProcesses +end + --============================================================================ -- Lsyncd Prototypes --============================================================================ @@ -1545,7 +1550,7 @@ local InletFactory = ( function end, -- - -- Returns the relative dir/file appended to the target + -- Returns the absolute dir/file appended to the target -- including a trailing slash for dirs. -- targetPath = function @@ -1661,7 +1666,20 @@ local InletFactory = ( function return result - end + end, + + -- + -- Returns the size of the eventlist + -- + size = function( elist ) + local dlist = e2d[ elist ] + + if not dlist then + return 0 + else + return #dlist + end + end, } --