--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -- default-rsync.lua -- -- Syncs with rsync ("classic" Lsyncd) -- A (Layer 1) configuration. -- -- Note: -- this is infact just a configuration using Layer 1 configuration -- like any other. It only gets compiled into the binary by default. -- You can simply use a modified one, by copying everything into a -- config file of yours and name it differently. -- -- License: GPLv2 (see COPYING) or any later version -- Authors: Axel Kittenberger -- --~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if not default then error( 'default not loaded' ) end if default.rsync then error( 'default-rsync already loaded' ) end local rsync = { } default.rsync = rsync -- uses default collect -- -- used to ensure there aren't typos in the keys -- rsync.checkgauge = { -- unsets default user action handlers onCreate = false, onModify = false, onDelete = false, onStartup = false, onMove = false, delete = true, exclude = true, excludeFrom = true, filter = true, filterFrom = true, target = true, batchSizeLimit = true, rsync = { acls = true, append = true, append_verify = true, archive = true, backup = true, backup_dir = true, binary = true, bwlimit = true, checksum = true, chown = true, chmod = true, compress = true, copy_dirlinks = true, copy_links = true, copy_unsafe_links = true, cvs_exclude = true, delete_excluded = true, dry_run = true, executability = true, existing = true, group = true, groupmap = true, hard_links = true, ignore_times = true, inplace = true, ipv4 = true, ipv6 = true, keep_dirlinks = true, links = true, one_file_system = true, omit_dir_times = true, omit_link_times = true, owner = true, password_file = true, perms = true, protect_args = true, prune_empty_dirs = true, quiet = true, rsh = true, rsync_path = true, sparse = true, suffix = true, temp_dir = true, timeout = true, times = true, update = true, usermap = true, verbose = true, whole_file = true, xattrs = true, _extra = true, }, } -- internal function to actually do the transfer local run_action = function ( inlet, elist ) local config = inlet.getConfig( ) local substitudes = inlet.getSubstitutionData(elist, {}) local target = substitudeCommands(config.target, substitudes) -- -- Replaces what rsync would consider filter rules by literals -- local function sub ( p -- pattern ) if not p then return end return p: gsub( '%?', '\\?' ): gsub( '%*', '\\*' ): gsub( '%[', '\\[' ): gsub( '%]', '\\]' ) end -- -- Gets the list of paths for the event list -- -- Deletes create multi match patterns -- local paths = elist.getPaths( function ( etype, -- event type path1, -- path path2 -- path to for move events ) if string.byte( path1, -1 ) == 47 and etype == 'Delete' then return sub( path1 )..'***', sub( path2 ) else return sub( path1 ), sub( path2 ) end end ) -- stores all filters by integer index local filterI = { } -- stores all filters with path index local filterP = { } -- adds one path to the filter local function addToFilter ( path ) if filterP[ path ] then return end filterP[ path ] = true table.insert( filterI, path ) end -- adds a path to the filter. -- -- rsync needs to have entries for all steps in the path, -- so the file for example d1/d2/d3/f1 needs following filters: -- 'd1/', 'd1/d2/', 'd1/d2/d3/' and 'd1/d2/d3/f1' for _, path in ipairs( paths ) do if path and path ~= '' then addToFilter( path ) local pp = string.match( path, '^(.*/)[^/]+/?' ) while pp do addToFilter( pp ) pp = string.match( pp, '^(.*/)[^/]+/?' ) end end end log( 'Normal', 'Calling rsync with filter-list of new/modified files/dirs\n', table.concat( filterI, '\n' ) ) local config = inlet.getConfig( ) local delete = nil if config.delete == true or config.delete == 'running' then delete = { '--delete', '--ignore-errors' } end spawn( elist, config.rsync.binary, '<', table.concat( filterI, '\000' ), config.rsync._computed, '-r', delete, '--force', '--from0', '--include-from=-', '--exclude=*', config.source, target ) end -- -- Returns true for non Init and Blanket events. -- local eventNotInitBlank = function ( event ) return event.etype ~= 'Init' and event.etype ~= 'Blanket' end -- -- Returns size or true if the event is for batch processing -- local getBatchSize = function ( event ) -- print("getBatchSize", event, event.status, event.etype, event.pathname) if event.status == 'active' then return false end if event.etype == 'Init' or event.etype == 'Blanket' or event.etype == 'Full' then return false end -- moves and deletes go always into batch if event.etype == 'Move' or event.etype == 'Delete' then return true end return lsyncd.get_file_size(event.sourcePath) end -- -- Spawns rsync for a list of events -- -- Exclusions are already handled by not having -- events for them. -- rsync.action = function ( inlet ) local sizeLimit = inlet.getConfig().batchSizeLimit if sizeLimit == nil then -- gets all events ready for syncing return run_action(inlet, inlet.getEvents(eventNotInitBlank)) else -- spawn all files under the size limit/deletes/moves in batch mode local eventInBatch = function(event) if event.etype == "Full" then return false end local size = getBatchSize(event) if type(size) == "boolean" then return size elseif size == nil then return true end if size <= sizeLimit then return true end return false end -- indicator for grabbing one element of the queue local single_returned = false -- grab all events for seperate transfers local eventNoBatch = function(event) if event.etype == "Full" then return false end local size = getBatchSize(event) if type(size) ~= "number" or size == nil then return false end if single_returned then return 'break' end if size > sizeLimit then single_returned = true return true end return false end local extralist = inlet.getEvents(eventInBatch) -- get all batched events if extralist.size() > 0 then run_action(inlet, extralist) end while true do local cnt, maxcnt = lsyncd.get_process_info() if inlet.getSync().processes:size( ) >= inlet.getConfig().maxProcesses then log('Normal', 'Maximum processes for sync reached. Delaying large transfer for sync: '..inlet.getConfig().name) break elseif maxcnt and cnt >= maxcnt then log('Normal', 'Maximum process count reached. Delaying large transfer for sync: '..inlet.getConfig().name) break end local extralist = inlet.getEvents(eventNoBatch) -- no more single size events if extralist.size() == 0 then break end run_action(inlet, extralist) -- get next result single_returned = false end end end ---- ---- NOTE: This optimized version can be used once ---- https://bugzilla.samba.org/show_bug.cgi?id=12569 ---- is fixed. ---- ---- Spawns rsync for a list of events ---- ---- Exclusions are already handled by not having ---- events for them. ---- --rsync.action = function --( -- inlet --) -- local config = inlet.getConfig( ) -- -- -- gets all events ready for syncing -- local elist = inlet.getEvents( eventNotInitBlank ) -- -- -- gets the list of paths for the event list -- -- deletes create multi match patterns -- local paths = elist.getPaths( ) -- -- -- removes trailing slashes from dirs. -- for k, v in ipairs( paths ) -- do -- if string.byte( v, -1 ) == 47 -- then -- paths[ k ] = string.sub( v, 1, -2 ) -- end -- end -- -- log( -- 'Normal', -- 'Calling rsync with filter-list of new/modified files/dirs\n', -- table.concat( paths, '\n' ) -- ) -- -- local delete = nil -- -- if config.delete == true -- or config.delete == 'running' -- then -- delete = { '--delete-missing-args', '--ignore-errors' } -- end -- -- spawn( -- elist, -- config.rsync.binary, -- '<', table.concat( paths, '\000' ), -- config.rsync._computed, -- delete, -- '--force', -- '--from0', -- '--files-from=-', -- config.source, -- config.target -- ) --end -- -- Spawns the recursive startup sync. -- rsync.init = function ( event ) return rsync.full(event) end -- -- Triggers a full sync event -- rsync.full = function ( event ) local config = event.config local inlet = event.inlet local excludes = inlet.getExcludes( ) local filters = inlet.hasFilters( ) and inlet.getFilters( ) local delete = {} local target = config.target if not target then if not config.host then error('Internal fail, Neither target nor host is configured') end target = config.host .. ':' .. config.targetdir end local substitudes = inlet.getSubstitutionData(event, {}) target = substitudeCommands(target, substitudes) if config.delete == true or config.delete == 'startup' then delete = { '--delete', '--ignore-errors' } end if config.rsync.delete_excluded == true then table.insert( delete, '--delete-excluded' ) end if not filters and #excludes == 0 then -- starts rsync without any filters or excludes log( 'Normal', 'recursive full rsync: ', config.source, ' -> ', target ) spawn( event, config.rsync.binary, delete, config.rsync._computed, '-r', config.source, target ) elseif not filters then -- starts rsync providing an exclusion list -- on stdin local exS = table.concat( excludes, '\n' ) log( 'Normal', 'recursive full rsync: ', config.source, ' -> ', target, ' excluding\n', exS ) spawn( event, config.rsync.binary, '<', exS, '--exclude-from=-', delete, config.rsync._computed, '-r', config.source, target ) else -- starts rsync providing a filter list -- on stdin local fS = table.concat( filters, '\n' ) log( 'Normal', 'recursive full rsync: ', config.source, ' -> ', target, ' filtering\n', fS ) spawn( event, config.rsync.binary, '<', fS, '--filter=. -', delete, config.rsync._computed, '-r', config.source, target ) end end -- -- Prepares and checks a syncs configuration on startup. -- rsync.prepare = function ( config, -- the configuration level, -- additional error level for inherited use ( by rsyncssh ) skipTarget -- used by rsyncssh, do not check for target ) -- First let default.prepare test the checkgauge default.prepare( config, level + 6 ) if not skipTarget and not config.target then error( 'default.rsync needs "target" configured', level ) end -- checks if the _computed argument exists already if config.rsync._computed then error( 'please do not use the internal rsync._computed parameter', level ) end -- computes the rsync arguments into one list local crsync = config.rsync; -- everything implied by archive = true local archiveFlags = { recursive = true, links = true, perms = true, times = true, group = true, owner = true, devices = true, specials = true, hard_links = false, acls = false, xattrs = false, } -- if archive is given the implications are filled in if crsync.archive then for k, v in pairs( archiveFlags ) do if crsync[ k ] == nil then crsync[ k ] = v end end end crsync._computed = { true } --- @type any local computed = crsync._computed local computedN = 2 local shortFlags = { acls = 'A', backup = 'b', checksum = 'c', compress = 'z', copy_dirlinks = 'k', copy_links = 'L', cvs_exclude = 'C', dry_run = 'n', executability = 'E', group = 'g', hard_links = 'H', ignore_times = 'I', ipv4 = '4', ipv6 = '6', keep_dirlinks = 'K', links = 'l', one_file_system = 'x', omit_dir_times = 'O', omit_link_times = 'J', owner = 'o', perms = 'p', protect_args = 's', prune_empty_dirs = 'm', quiet = 'q', sparse = 'S', times = 't', update = 'u', verbose = 'v', whole_file = 'W', xattrs = 'X', } local shorts = { '-' } local shortsN = 2 if crsync._extra then for k, v in ipairs( crsync._extra ) do computed[ computedN ] = v computedN = computedN + 1 end end for k, flag in pairs( shortFlags ) do if crsync[ k ] then shorts[ shortsN ] = flag shortsN = shortsN + 1 end end if crsync.devices and crsync.specials then shorts[ shortsN ] = 'D' shortsN = shortsN + 1 else if crsync.devices then computed[ computedN ] = '--devices' computedN = computedN + 1 end if crsync.specials then computed[ computedN ] = '--specials' computedN = computedN + 1 end end if crsync.append then computed[ computedN ] = '--append' computedN = computedN + 1 end if crsync.append_verify then computed[ computedN ] = '--append-verify' computedN = computedN + 1 end if crsync.backup_dir then computed[ computedN ] = '--backup-dir=' .. crsync.backup_dir computedN = computedN + 1 end if crsync.bwlimit then computed[ computedN ] = '--bwlimit=' .. crsync.bwlimit computedN = computedN + 1 end if crsync.chmod then computed[ computedN ] = '--chmod=' .. crsync.chmod computedN = computedN + 1 end if crsync.chown then computed[ computedN ] = '--chown=' .. crsync.chown computedN = computedN + 1 end if crsync.copy_unsafe_links then computed[ computedN ] = '--copy-unsafe-links' computedN = computedN + 1 end if crsync.groupmap then computed[ computedN ] = '--groupmap=' .. crsync.groupmap computedN = computedN + 1 end if crsync.existing then computed[ computedN ] = '--existing' computedN = computedN + 1 end if crsync.inplace then computed[ computedN ] = '--inplace' computedN = computedN + 1 end if crsync.password_file then computed[ computedN ] = '--password-file=' .. crsync.password_file computedN = computedN + 1 end if crsync.rsh then computed[ computedN ] = '--rsh=' .. crsync.rsh computedN = computedN + 1 end if crsync.rsync_path then computed[ computedN ] = '--rsync-path=' .. crsync.rsync_path computedN = computedN + 1 end if crsync.suffix then computed[ computedN ] = '--suffix=' .. crsync.suffix computedN = computedN + 1 end if crsync.temp_dir then computed[ computedN ] = '--temp-dir=' .. crsync.temp_dir computedN = computedN + 1 end if crsync.timeout then computed[ computedN ] = '--timeout=' .. crsync.timeout computedN = computedN + 1 end if crsync.usermap then computed[ computedN ] = '--usermap=' .. crsync.usermap computedN = computedN + 1 end if shortsN ~= 2 then computed[ 1 ] = table.concat( shorts, '' ) else computed[ 1 ] = { } end -- appends a / to target if not present -- and not a ':' for home dir. if not skipTarget and string.sub( config.target, -1 ) ~= '/' and string.sub( config.target, -1 ) ~= ':' then config.target = config.target..'/' end end -- -- By default do deletes. -- rsync.delete = true -- -- Rsyncd exitcodes -- rsync.exitcodes = default.rsyncExitCodes -- -- Calls rsync with this default options -- rsync.rsync = { -- The rsync binary to be called. binary = 'rsync', links = true, times = true, protect_args = true } -- -- Default delay -- rsync.delay = 15