From 46188f9f60e3e275d30c4c6171dc2022cec0ea70 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Wed, 23 Mar 2022 23:44:19 +0100 Subject: [PATCH] Implement tunnel pool mode. In this mode, multiple tunnel processes are started and connection a load balanced on the pool of connections. Example config: ... sync { default.rsync, tunnel = tunnel { command = {"ssh", "-N", "-L", "localhost:${localport}:localhost:873", "user@testmachine"}, mode = "pool", parallel = 2, }, target = "rsync://localhost:${localport}/test", ... } --- default-rsync.lua | 8 +- lsyncd.lua | 388 ++++++++++++++++++++++++++++++------------- tests/utils_test.lua | 4 +- 3 files changed, 278 insertions(+), 122 deletions(-) diff --git a/default-rsync.lua b/default-rsync.lua index 3eedb7f..e422b72 100644 --- a/default-rsync.lua +++ b/default-rsync.lua @@ -129,6 +129,9 @@ rsync.action = function -- gets all events ready for syncing local elist = inlet.getEvents( eventNotInitBlank ) + local substitudes = inlet.getSubstitutionData(elist, {}) + local target = substitudeCommands(config.target, substitudes) + -- gets the list of paths for the event list -- deletes create multi match patterns local paths = elist.getPaths( ) @@ -237,7 +240,7 @@ rsync.action = function '--include-from=-', '--exclude=*', config.source, - config.target + target ) end @@ -332,6 +335,9 @@ rsync.init = function target = config.host .. ':' .. config.targetdir end + local substitudes = inlet.getSubstitutionData(event, {}) + target = substitudeCommands(target, substitudes) + if config.delete == true or config.delete == 'startup' then diff --git a/lsyncd.lua b/lsyncd.lua index 8101d34..5f1493a 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -1905,6 +1905,20 @@ local InletFactory = ( function -- TODO give a readonly handler only. return sync.config end, + + -- + -- Returns the sync for this Inlet + -- + getSync = function( sync ) + return sync + end, + + -- + -- Substitutes parameters in arguments + -- + getSubstitutionData = function( sync, event, data) + return sync.getSubstitutionData(sync, event, data) + end, } -- @@ -3106,6 +3120,16 @@ local Sync = ( function f:write( '\n' ) end + -- + -- Returns substitude data for event + -- + local function getSubstitutionData(self, event, data) + if self.config.tunnel then + data = self.config.tunnel:getSubstitutionData(event, data) + end + return data + end + -- -- Creates a new Sync. -- @@ -3141,6 +3165,7 @@ local Sync = ( function removeDelay = removeDelay, rmExclude = rmExclude, statusReport = statusReport, + getSubstitutionData = getSubstitutionData, } s.inlet = InletFactory.newInlet( s ) @@ -3238,23 +3263,43 @@ Tunnel = (function() CONNECTING = 3, UP = 4, RETRY_TIMEOUT = 5, + UP_RETRY_TIMEOUT = 6, + } + + local TUNNEL_MODES = { + COMMAND = "command", + POOL = "pool", + } + + local TUNNEL_DISTRIBUTION = { + ROUNDROBIN = "rr", + } + + local TUNNEL_SUBSTITIONS = { + "localhost", + "localport" } local nextTunnelName = 1 Tunnel.defaults = { - mode = "command", - oneShoot = false, + mode = TUNNEL_MODES.COMMAND, + parallel = 1, + distribution = TUNNEL_DISTRIBUTION.ROUNDROBIN, command = nil, checkCommand = nil, checkExitCodes = {0}, checkMaxFailed = 5, retryDelay = 10, - readyDelay = 5 + readyDelay = 5, + localhost = 'localhost', } -- export constants Tunnel.TUNNEL_CMD_TYPES = TUNNEL_CMD_TYPES Tunnel.TUNNEL_STATUS = TUNNEL_STATUS + Tunnel.TUNNEL_MODES = TUNNEL_MODES + Tunnel.TUNNEL_DISTRIBUTION = TUNNEL_DISTRIBUTION + Tunnel.TUNNEL_SUBSTITIONS = TUNNEL_SUBSTITIONS function Tunnel.new( options @@ -3263,9 +3308,10 @@ Tunnel = (function() processes = CountArray.new( ), blocks = {}, ready = false, - checksFailed = 0, + retryCount = 0, status = TUNNEL_STATUS.DOWN, - alarm = false + alarm = false, + rrCounter = 0, } -- provides a default name if needed if options.name == nil @@ -3287,16 +3333,20 @@ Tunnel = (function() return rv end - -- Returns the status of tunnel as text - function Tunnel:statusText() + function Tunnel.statusToText(status) for n, i in pairs(TUNNEL_STATUS) do - if self.status == i then + if status == i then return n end end return TUNNEL_STATUS.UNKNOWN end + -- Returns the status of tunnel as text + function Tunnel:statusText() + return Tunnel.statusToText(self.status) + end + -- -- Returns next alarm -- @@ -3311,7 +3361,42 @@ Tunnel = (function() end function Tunnel:isReady() - return self.status == TUNNEL_STATUS.UP + return self.status == TUNNEL_STATUS.UP or + self.status == TUNNEL_STATUS.UP_RETRY_TIMEOUT + end + + function Tunnel:setStatus(status) + log('Tunnel',self.options.name,': status change: ', + self:statusText(), " -> ", Tunnel.statusToText(status)) + self.status = status + end + + -- + -- Returns the number of processes currently running + -- + function Tunnel:countProcs(timestamp) + local run = 0 + local starting = 0 + local dead = 0 + if timestamp == nil then + timestamp = now() + end + + for pid, pd in self.processes:walk() do + if pd.type == TUNNEL_CMD_TYPES.CMD then + -- process needs to run for at least some time + if lsyncd.kill(pid, 0) ~= 0 then + dead = dead + 1 + elseif (pd.started + self.options.readyDelay) > timestamp then + starting = starting + 1 + else + pd.ready = true + run = run + 1 + end + end + end + + return run, starting, dead end -- @@ -3320,51 +3405,64 @@ Tunnel = (function() -- lsyncd.kill() if self:check() == false then -- check failed, consider tunnel broken - self.status = TUNNEL_STATUS.DOWN + self:setStatus(TUNNEL_STATUS.DOWN) end - -- check if child processes are running - if self.status == TUNNEL_STATUS.CONNECTING then - -- we can only be good if processes exist - local good = self.processes:size() > 0 or self.options.onShoot == true - - for pid, pd in self.processes:walk() do - if pd.type == TUNNEL_CMD_TYPES.CMD then - -- process needs to run for at least some time - if (pd.started + self.options.readyDelay) > timestamp - or lsyncd.kill(pid, 0) ~= 0 then - -- required command does not exist› - good = false - end - end - end - - if good then - log( - 'Tunnel', - 'Setup of tunnel ', self.options.name, ' sucessfull' - ) - self.status = TUNNEL_STATUS.UP - self.alarm = false - self:unblockSyncs() - else - -- not ready, postpone next check - self.alarm = now() + 1 - end - end - - if self.status == TUNNEL_STATUS.DOWN then - self:start() - elseif self.status == TUNNEL_STATUS.RETRY_TIMEOUT then + if self.status == TUNNEL_STATUS.RETRY_TIMEOUT then if self.alarm <= timestamp then log( 'Tunnel', 'Retry setup ', self.options.name ) self:start() + return + else + -- timeout not yet reached + self.alarm = now() + 1 + return end + elseif self.status == TUNNEL_STATUS.DOWN then + self:start() + return elseif self.status == TUNNEL_STATUS.DISABLED then self.alarm = false + return + end + + local parallel = self.options.parallel + local run, starting, dead = self:countProcs(timestamp) + + -- check if enough child processes are running + if self.status == TUNNEL_STATUS.CONNECTING then + if run > 0 then + log( + 'Tunnel', + 'Setup of tunnel ', self.options.name, ' sucessfull' + ) + self:setStatus(TUNNEL_STATUS.UP) + self:unblockSyncs() + end + elseif self.status == TUNNEL_STATUS.UP and run == 0 then + -- no good process running, degrade + log( + 'Tunnel', + 'Tunnel ', self.options.name, ' changed to CONNECTING' + ) + self:setStatus(TUNNEL_STATUS.CONNECTING) + end + + local spawned = 0 + -- start more processes if necesarry + while run + starting + spawned < self.options.parallel do + self:spawn() + spawned = spawned + 1 + end + + -- trigger next delay + if starting + spawned == 0 then + self.alarm = false + else + self.alarm = now() + 1 end end @@ -3418,52 +3516,73 @@ Tunnel = (function() self.blocks = {} end + -- + -- Spawn a single tunnel program + -- + function Tunnel:spawn() + local opts = { + type = TUNNEL_CMD_TYPES.CMD, + started = now(), + localhost = self.options.localhost, + ready = false, + } + local cmd = self.options.command + + if self.options.mode == TUNNEL_MODES.POOL then + opts.localport = lsyncd.get_free_port() + end + cmd = substitudeCommands(cmd, opts) + + if #cmd < 1 then + log('Error', + '', + self.options + ) + error( 'start tunnel of mode command with empty command', 2 ) + -- FIXME: add line which tunnel was called + return false + end + local bin = cmd[1] + -- for _,v in ipairs(cmd) do + -- if type( v ) ~= 'string' then + -- error( 'tunnel command must be a list of strings', 2 ) + -- end + -- end + log( + 'Info', + 'Start tunnel command ', + cmd + ) + local pid = lsyncd.exec(bin, table.unpack(cmd, 2)) + --local pid = spawn(bin, table.unpack(self.options.command, 2)) + if pid and pid > 0 then + self.processes[pid] = opts + self.retryCount = 0 + self.alarm = now() + 1 + else + self.alarm = now() + self.options.retryDelay + if self.status == TUNNEL_STATUS.UP then + self:setStatus(TUNNEL_STATUS.UP_RETRY_TIMEOUT) + else + self:setStatus(TUNNEL_STATUS.RETRY_TIMEOUT) + end + end + end + function Tunnel:start() - if self.status == TUNNEL_STATUS.CONNECTING or - self.status == TUNNEL_STATUS.UP then + if self.status == TUNNEL_STATUS.UP or + self.status == TUNNEL_STATUS.CONNECTING then return end - if self.options.mode == "command" then - self.status = TUNNEL_STATUS.CONNECTING - self.alarm = false - log( - 'Info', - 'Start tunnel command ', - self.options.command - ) - if #self.options.command < 1 then - log('Error', - '', - self.options - ) - error( 'start tunnel of mode command with empty command', 2 ) - -- FIXME: add line which tunnel was called - return false - end - local bin = self.options.command[1] - for _,v in ipairs(self.options.command) do - if type( v ) ~= 'string' then - error( 'tunnel command must be a list of strings', 2 ) - end - end - local pid = lsyncd.exec(bin, table.unpack(self.options.command, 2)) - --local pid = spawn(bin, table.unpack(self.options.command, 2)) - if pid and pid > 0 then - self.processes[pid] = { - type = TUNNEL_CMD_TYPES.CMD, - started = now() - } - self.checksFailed = 0 - self.alarm = now() + 1 - else - self.alarm = now() + self.options.retryDelay - self.status = TUNNEL_STATUS.RETRY_TIMEOUT - end + if self.options.mode == TUNNEL_MODES.COMMAND or + self.options.mode == TUNNEL_MODES.POOL then + self:setStatus(TUNNEL_STATUS.CONNECTING) + self:invoke(now()) else error('unknown tunnel mode:' .. self.options.mode) - self.status = TUNNEL_STATUS.DOWN + self:setStatus(TUNNEL_STATUS.DISABLED) end end @@ -3481,73 +3600,67 @@ Tunnel = (function() end log('Debug', "collect tunnel event. pid: ", pid," exitcode: ", exitcode) + local run, starting, dead = self:countProcs() -- cases in which the tunnel command is handled if proc.type == TUNNEL_CMD_TYPES.CMD then - if self.options.onShot then + if self.status == TUNNEL_STATUS.CONNECTING then + log( + 'Warning', + 'Starting tunnel failed.', + self.options.name + ) + self:setStatus(TUNNEL_STATUS.RETRY_TIMEOUT) + self.alarm = now() + self.options.retryDelay + else log( 'Info', - 'Tunnel setup complete.', + 'Tunnel died. Will Restarting', self.options.name ) - self.status = TUNNEL_STATUS.UP - else - if self.status == TUNNEL_STATUS.CONNECTING then - log( - 'Warning', - 'Starting tunnel failed.', - self.options.name - ) - self.status = TUNNEL_STATUS.RETRY_TIMEOUT - self.alarm = now() + 5 - else - log( - 'Info', - 'Tunnel died. Restarting', - self.options.name - ) - self.status = TUNNEL_STATUS.DOWN - self.alarm = true - self:start() + if run == 0 then + self:setStatus(TUNNEL_STATUS.DOWN) end + self.alarm = true end -- cases in which the check function has executed a program elseif proc.type == TUNNEL_CMD_TYPES.CHK then - local found = false + local good = false if type(self.options.checkExitCodes) == 'table' then for _,i in iwalk(self.options.checkExitCodes) do if exitcode == i then - found = true + good = true end end else if self.options.checkExitCodes == exitcode then - found = true + good = true end end - if found then - if self.ready == false then + if good then + if self.isReady() == false then log( 'Info', - 'Tunnel setup complete ', - self.options.name + self.options.name, + ' Tunnel setup complete ' ) end - self.ready = true + self:setStatus(TUNNEL_STATUS.UP) + self.checksFailed = 0 else - if self.ready == true then + if self.ready then log( - 'Info', - 'Check failed.', + 'Tunnel', self.options.name + ' Check failed.' ) self.checksFailed = self.checksFailed + 1 if self.checksFailed > self.options.checkMaxFailed then self:kill() end end - self.ready = false + self:setStatus(TUNNEL_STATUS.DOWN) end end self.processes[pid] = nil @@ -3564,13 +3677,50 @@ Tunnel = (function() lsyncd.kill(pid, 9) end end - self.status = TUNNEL_STATUS.DISABLED + self:setStatus(TUNNEL_STATUS.DISABLED) end function Tunnel:isReady () return self.status == TUNNEL_STATUS.UP end + -- + -- Fills/changes the opts table with additional values + -- for the transfer to be started + -- + function Tunnel:getSubstitutionData(event, opts) + local useProc, useProcLast = nil, nil + if self.options.mode == TUNNEL_MODES.POOL then + if self.options.distribution == TUNNEL_DISTRIBUTION.ROUNDROBIN then + local i = 0 + for pid, proc in self.processes:walk() do + if proc.ready == true then + useProcLast = proc + if (i % self.processes:size()) == self.rrCounter then + useProc = proc + self.rrCounter = self.rrCounter + 1 + end + end + end + if useProc == nil then + self.rrCounter = 0 + useProc = useProcLast + end + else + log('Tunnel', 'Unknown distribution mode: ', self.options.distribution) + os.exit(1) + end + end + if useProc then + for k,v in pairs(self.TUNNEL_SUBSTITIONS) do + if useProc[v] ~= nil then + opts[v] = useProc[v] + end + end + end + return opts + end + -- -- Writes a status report about this tunnel -- @@ -3581,7 +3731,7 @@ Tunnel = (function() for pid, prc in self.processes:walk( ) do - f:write(" pid=", pid, " type=", prc.type, " started=", ''..prc.started, '\n') + f:write(" pid=", pid, " type=", prc.type, " started="..prc.started, '\n') end f:write( '\n' ) @@ -4017,7 +4167,7 @@ function splitQuotedString return rv end -function replaceCommand(cmd, data) +function substitudeCommands(cmd, data) assert(type(data) == "table") local getData = function(arg) local rv = data[arg] @@ -5892,7 +6042,7 @@ function runner.teardown return exitCode end - + --============================================================================ -- Lsyncd runner's user interface --============================================================================ diff --git a/tests/utils_test.lua b/tests/utils_test.lua index f6f94c4..c793c3f 100644 --- a/tests/utils_test.lua +++ b/tests/utils_test.lua @@ -14,11 +14,11 @@ local testData = { localPort = 1234, localHost = "localhorst" } -assert(replaceCommand("echo ssh ${localHost}:${localPort}", testData) == +assert(substitudeCommands("echo ssh ${localHost}:${localPort}", testData) == "echo ssh localhorst:1234") assert(isTableEqual( - replaceCommand({"-p${doesNotExist}", "2${localHost}2", "-i '${localPort}'"}, testData), + substitudeCommands({"-p${doesNotExist}", "2${localHost}2", "-i '${localPort}'"}, testData), {"-p", "2localhorst2", "-i '1234'"} ))