Implement tunnel pool mode.

In this mode, multiple tunnel processes are started and connection a load
balanced on the pool of connections.

Example config:

...
sync {
  default.rsync,
  tunnel = tunnel {
    command = {"ssh", "-N", "-L", "localhost:${localport}:localhost:873", "user@testmachine"},
    mode = "pool",
    parallel = 2,
  },
  target    = "rsync://localhost:${localport}/test",
  ...
}
This commit is contained in:
Daniel Poelzleithner 2022-03-23 23:44:19 +01:00
parent 6e60f6b89e
commit 46188f9f60
3 changed files with 278 additions and 122 deletions

View File

@ -129,6 +129,9 @@ rsync.action = function
-- gets all events ready for syncing -- gets all events ready for syncing
local elist = inlet.getEvents( eventNotInitBlank ) local elist = inlet.getEvents( eventNotInitBlank )
local substitudes = inlet.getSubstitutionData(elist, {})
local target = substitudeCommands(config.target, substitudes)
-- gets the list of paths for the event list -- gets the list of paths for the event list
-- deletes create multi match patterns -- deletes create multi match patterns
local paths = elist.getPaths( ) local paths = elist.getPaths( )
@ -237,7 +240,7 @@ rsync.action = function
'--include-from=-', '--include-from=-',
'--exclude=*', '--exclude=*',
config.source, config.source,
config.target target
) )
end end
@ -332,6 +335,9 @@ rsync.init = function
target = config.host .. ':' .. config.targetdir target = config.host .. ':' .. config.targetdir
end end
local substitudes = inlet.getSubstitutionData(event, {})
target = substitudeCommands(target, substitudes)
if config.delete == true if config.delete == true
or config.delete == 'startup' or config.delete == 'startup'
then then

View File

@ -1905,6 +1905,20 @@ local InletFactory = ( function
-- TODO give a readonly handler only. -- TODO give a readonly handler only.
return sync.config return sync.config
end, end,
--
-- Returns the sync for this Inlet
--
getSync = function( sync )
return sync
end,
--
-- Substitutes parameters in arguments
--
getSubstitutionData = function( sync, event, data)
return sync.getSubstitutionData(sync, event, data)
end,
} }
-- --
@ -3106,6 +3120,16 @@ local Sync = ( function
f:write( '\n' ) f:write( '\n' )
end end
--
-- Returns substitude data for event
--
local function getSubstitutionData(self, event, data)
if self.config.tunnel then
data = self.config.tunnel:getSubstitutionData(event, data)
end
return data
end
-- --
-- Creates a new Sync. -- Creates a new Sync.
-- --
@ -3141,6 +3165,7 @@ local Sync = ( function
removeDelay = removeDelay, removeDelay = removeDelay,
rmExclude = rmExclude, rmExclude = rmExclude,
statusReport = statusReport, statusReport = statusReport,
getSubstitutionData = getSubstitutionData,
} }
s.inlet = InletFactory.newInlet( s ) s.inlet = InletFactory.newInlet( s )
@ -3238,23 +3263,43 @@ Tunnel = (function()
CONNECTING = 3, CONNECTING = 3,
UP = 4, UP = 4,
RETRY_TIMEOUT = 5, RETRY_TIMEOUT = 5,
UP_RETRY_TIMEOUT = 6,
}
local TUNNEL_MODES = {
COMMAND = "command",
POOL = "pool",
}
local TUNNEL_DISTRIBUTION = {
ROUNDROBIN = "rr",
}
local TUNNEL_SUBSTITIONS = {
"localhost",
"localport"
} }
local nextTunnelName = 1 local nextTunnelName = 1
Tunnel.defaults = { Tunnel.defaults = {
mode = "command", mode = TUNNEL_MODES.COMMAND,
oneShoot = false, parallel = 1,
distribution = TUNNEL_DISTRIBUTION.ROUNDROBIN,
command = nil, command = nil,
checkCommand = nil, checkCommand = nil,
checkExitCodes = {0}, checkExitCodes = {0},
checkMaxFailed = 5, checkMaxFailed = 5,
retryDelay = 10, retryDelay = 10,
readyDelay = 5 readyDelay = 5,
localhost = 'localhost',
} }
-- export constants -- export constants
Tunnel.TUNNEL_CMD_TYPES = TUNNEL_CMD_TYPES Tunnel.TUNNEL_CMD_TYPES = TUNNEL_CMD_TYPES
Tunnel.TUNNEL_STATUS = TUNNEL_STATUS Tunnel.TUNNEL_STATUS = TUNNEL_STATUS
Tunnel.TUNNEL_MODES = TUNNEL_MODES
Tunnel.TUNNEL_DISTRIBUTION = TUNNEL_DISTRIBUTION
Tunnel.TUNNEL_SUBSTITIONS = TUNNEL_SUBSTITIONS
function Tunnel.new( function Tunnel.new(
options options
@ -3263,9 +3308,10 @@ Tunnel = (function()
processes = CountArray.new( ), processes = CountArray.new( ),
blocks = {}, blocks = {},
ready = false, ready = false,
checksFailed = 0, retryCount = 0,
status = TUNNEL_STATUS.DOWN, status = TUNNEL_STATUS.DOWN,
alarm = false alarm = false,
rrCounter = 0,
} }
-- provides a default name if needed -- provides a default name if needed
if options.name == nil if options.name == nil
@ -3287,16 +3333,20 @@ Tunnel = (function()
return rv return rv
end end
-- Returns the status of tunnel as text function Tunnel.statusToText(status)
function Tunnel:statusText()
for n, i in pairs(TUNNEL_STATUS) do for n, i in pairs(TUNNEL_STATUS) do
if self.status == i then if status == i then
return n return n
end end
end end
return TUNNEL_STATUS.UNKNOWN return TUNNEL_STATUS.UNKNOWN
end end
-- Returns the status of tunnel as text
function Tunnel:statusText()
return Tunnel.statusToText(self.status)
end
-- --
-- Returns next alarm -- Returns next alarm
-- --
@ -3311,7 +3361,42 @@ Tunnel = (function()
end end
function Tunnel:isReady() function Tunnel:isReady()
return self.status == TUNNEL_STATUS.UP return self.status == TUNNEL_STATUS.UP or
self.status == TUNNEL_STATUS.UP_RETRY_TIMEOUT
end
function Tunnel:setStatus(status)
log('Tunnel',self.options.name,': status change: ',
self:statusText(), " -> ", Tunnel.statusToText(status))
self.status = status
end
--
-- Returns the number of processes currently running
--
function Tunnel:countProcs(timestamp)
local run = 0
local starting = 0
local dead = 0
if timestamp == nil then
timestamp = now()
end
for pid, pd in self.processes:walk() do
if pd.type == TUNNEL_CMD_TYPES.CMD then
-- process needs to run for at least some time
if lsyncd.kill(pid, 0) ~= 0 then
dead = dead + 1
elseif (pd.started + self.options.readyDelay) > timestamp then
starting = starting + 1
else
pd.ready = true
run = run + 1
end
end
end
return run, starting, dead
end end
-- --
@ -3320,51 +3405,64 @@ Tunnel = (function()
-- lsyncd.kill() -- lsyncd.kill()
if self:check() == false then if self:check() == false then
-- check failed, consider tunnel broken -- check failed, consider tunnel broken
self.status = TUNNEL_STATUS.DOWN self:setStatus(TUNNEL_STATUS.DOWN)
end end
-- check if child processes are running if self.status == TUNNEL_STATUS.RETRY_TIMEOUT then
if self.status == TUNNEL_STATUS.CONNECTING then
-- we can only be good if processes exist
local good = self.processes:size() > 0 or self.options.onShoot == true
for pid, pd in self.processes:walk() do
if pd.type == TUNNEL_CMD_TYPES.CMD then
-- process needs to run for at least some time
if (pd.started + self.options.readyDelay) > timestamp
or lsyncd.kill(pid, 0) ~= 0 then
-- required command does not exist
good = false
end
end
end
if good then
log(
'Tunnel',
'Setup of tunnel ', self.options.name, ' sucessfull'
)
self.status = TUNNEL_STATUS.UP
self.alarm = false
self:unblockSyncs()
else
-- not ready, postpone next check
self.alarm = now() + 1
end
end
if self.status == TUNNEL_STATUS.DOWN then
self:start()
elseif self.status == TUNNEL_STATUS.RETRY_TIMEOUT then
if self.alarm <= timestamp then if self.alarm <= timestamp then
log( log(
'Tunnel', 'Tunnel',
'Retry setup ', self.options.name 'Retry setup ', self.options.name
) )
self:start() self:start()
return
else
-- timeout not yet reached
self.alarm = now() + 1
return
end end
elseif self.status == TUNNEL_STATUS.DOWN then
self:start()
return
elseif self.status == TUNNEL_STATUS.DISABLED then elseif self.status == TUNNEL_STATUS.DISABLED then
self.alarm = false self.alarm = false
return
end
local parallel = self.options.parallel
local run, starting, dead = self:countProcs(timestamp)
-- check if enough child processes are running
if self.status == TUNNEL_STATUS.CONNECTING then
if run > 0 then
log(
'Tunnel',
'Setup of tunnel ', self.options.name, ' sucessfull'
)
self:setStatus(TUNNEL_STATUS.UP)
self:unblockSyncs()
end
elseif self.status == TUNNEL_STATUS.UP and run == 0 then
-- no good process running, degrade
log(
'Tunnel',
'Tunnel ', self.options.name, ' changed to CONNECTING'
)
self:setStatus(TUNNEL_STATUS.CONNECTING)
end
local spawned = 0
-- start more processes if necesarry
while run + starting + spawned < self.options.parallel do
self:spawn()
spawned = spawned + 1
end
-- trigger next delay
if starting + spawned == 0 then
self.alarm = false
else
self.alarm = now() + 1
end end
end end
@ -3418,52 +3516,73 @@ Tunnel = (function()
self.blocks = {} self.blocks = {}
end end
--
-- Spawn a single tunnel program
--
function Tunnel:spawn()
local opts = {
type = TUNNEL_CMD_TYPES.CMD,
started = now(),
localhost = self.options.localhost,
ready = false,
}
local cmd = self.options.command
if self.options.mode == TUNNEL_MODES.POOL then
opts.localport = lsyncd.get_free_port()
end
cmd = substitudeCommands(cmd, opts)
if #cmd < 1 then
log('Error',
'',
self.options
)
error( 'start tunnel of mode command with empty command', 2 )
-- FIXME: add line which tunnel was called
return false
end
local bin = cmd[1]
-- for _,v in ipairs(cmd) do
-- if type( v ) ~= 'string' then
-- error( 'tunnel command must be a list of strings', 2 )
-- end
-- end
log(
'Info',
'Start tunnel command ',
cmd
)
local pid = lsyncd.exec(bin, table.unpack(cmd, 2))
--local pid = spawn(bin, table.unpack(self.options.command, 2))
if pid and pid > 0 then
self.processes[pid] = opts
self.retryCount = 0
self.alarm = now() + 1
else
self.alarm = now() + self.options.retryDelay
if self.status == TUNNEL_STATUS.UP then
self:setStatus(TUNNEL_STATUS.UP_RETRY_TIMEOUT)
else
self:setStatus(TUNNEL_STATUS.RETRY_TIMEOUT)
end
end
end
function Tunnel:start() function Tunnel:start()
if self.status == TUNNEL_STATUS.CONNECTING or if self.status == TUNNEL_STATUS.UP or
self.status == TUNNEL_STATUS.UP then self.status == TUNNEL_STATUS.CONNECTING then
return return
end end
if self.options.mode == "command" then if self.options.mode == TUNNEL_MODES.COMMAND or
self.status = TUNNEL_STATUS.CONNECTING self.options.mode == TUNNEL_MODES.POOL then
self.alarm = false
log(
'Info',
'Start tunnel command ',
self.options.command
)
if #self.options.command < 1 then
log('Error',
'',
self.options
)
error( 'start tunnel of mode command with empty command', 2 )
-- FIXME: add line which tunnel was called
return false
end
local bin = self.options.command[1]
for _,v in ipairs(self.options.command) do
if type( v ) ~= 'string' then
error( 'tunnel command must be a list of strings', 2 )
end
end
local pid = lsyncd.exec(bin, table.unpack(self.options.command, 2))
--local pid = spawn(bin, table.unpack(self.options.command, 2))
if pid and pid > 0 then
self.processes[pid] = {
type = TUNNEL_CMD_TYPES.CMD,
started = now()
}
self.checksFailed = 0
self.alarm = now() + 1
else
self.alarm = now() + self.options.retryDelay
self.status = TUNNEL_STATUS.RETRY_TIMEOUT
end
self:setStatus(TUNNEL_STATUS.CONNECTING)
self:invoke(now())
else else
error('unknown tunnel mode:' .. self.options.mode) error('unknown tunnel mode:' .. self.options.mode)
self.status = TUNNEL_STATUS.DOWN self:setStatus(TUNNEL_STATUS.DISABLED)
end end
end end
@ -3481,73 +3600,67 @@ Tunnel = (function()
end end
log('Debug', log('Debug',
"collect tunnel event. pid: ", pid," exitcode: ", exitcode) "collect tunnel event. pid: ", pid," exitcode: ", exitcode)
local run, starting, dead = self:countProcs()
-- cases in which the tunnel command is handled -- cases in which the tunnel command is handled
if proc.type == TUNNEL_CMD_TYPES.CMD then if proc.type == TUNNEL_CMD_TYPES.CMD then
if self.options.onShot then if self.status == TUNNEL_STATUS.CONNECTING then
log(
'Warning',
'Starting tunnel failed.',
self.options.name
)
self:setStatus(TUNNEL_STATUS.RETRY_TIMEOUT)
self.alarm = now() + self.options.retryDelay
else
log( log(
'Info', 'Info',
'Tunnel setup complete.', 'Tunnel died. Will Restarting',
self.options.name self.options.name
) )
self.status = TUNNEL_STATUS.UP
else
if self.status == TUNNEL_STATUS.CONNECTING then
log(
'Warning',
'Starting tunnel failed.',
self.options.name
)
self.status = TUNNEL_STATUS.RETRY_TIMEOUT
self.alarm = now() + 5
else
log(
'Info',
'Tunnel died. Restarting',
self.options.name
)
self.status = TUNNEL_STATUS.DOWN if run == 0 then
self.alarm = true self:setStatus(TUNNEL_STATUS.DOWN)
self:start()
end end
self.alarm = true
end end
-- cases in which the check function has executed a program -- cases in which the check function has executed a program
elseif proc.type == TUNNEL_CMD_TYPES.CHK then elseif proc.type == TUNNEL_CMD_TYPES.CHK then
local found = false local good = false
if type(self.options.checkExitCodes) == 'table' then if type(self.options.checkExitCodes) == 'table' then
for _,i in iwalk(self.options.checkExitCodes) do for _,i in iwalk(self.options.checkExitCodes) do
if exitcode == i then if exitcode == i then
found = true good = true
end end
end end
else else
if self.options.checkExitCodes == exitcode then if self.options.checkExitCodes == exitcode then
found = true good = true
end end
end end
if found then if good then
if self.ready == false then if self.isReady() == false then
log( log(
'Info', 'Info',
'Tunnel setup complete ', self.options.name,
self.options.name ' Tunnel setup complete '
) )
end end
self.ready = true self:setStatus(TUNNEL_STATUS.UP)
self.checksFailed = 0
else else
if self.ready == true then if self.ready then
log( log(
'Info', 'Tunnel',
'Check failed.',
self.options.name self.options.name
' Check failed.'
) )
self.checksFailed = self.checksFailed + 1 self.checksFailed = self.checksFailed + 1
if self.checksFailed > self.options.checkMaxFailed then if self.checksFailed > self.options.checkMaxFailed then
self:kill() self:kill()
end end
end end
self.ready = false self:setStatus(TUNNEL_STATUS.DOWN)
end end
end end
self.processes[pid] = nil self.processes[pid] = nil
@ -3564,13 +3677,50 @@ Tunnel = (function()
lsyncd.kill(pid, 9) lsyncd.kill(pid, 9)
end end
end end
self.status = TUNNEL_STATUS.DISABLED self:setStatus(TUNNEL_STATUS.DISABLED)
end end
function Tunnel:isReady () function Tunnel:isReady ()
return self.status == TUNNEL_STATUS.UP return self.status == TUNNEL_STATUS.UP
end end
--
-- Fills/changes the opts table with additional values
-- for the transfer to be started
--
function Tunnel:getSubstitutionData(event, opts)
local useProc, useProcLast = nil, nil
if self.options.mode == TUNNEL_MODES.POOL then
if self.options.distribution == TUNNEL_DISTRIBUTION.ROUNDROBIN then
local i = 0
for pid, proc in self.processes:walk() do
if proc.ready == true then
useProcLast = proc
if (i % self.processes:size()) == self.rrCounter then
useProc = proc
self.rrCounter = self.rrCounter + 1
end
end
end
if useProc == nil then
self.rrCounter = 0
useProc = useProcLast
end
else
log('Tunnel', 'Unknown distribution mode: ', self.options.distribution)
os.exit(1)
end
end
if useProc then
for k,v in pairs(self.TUNNEL_SUBSTITIONS) do
if useProc[v] ~= nil then
opts[v] = useProc[v]
end
end
end
return opts
end
-- --
-- Writes a status report about this tunnel -- Writes a status report about this tunnel
-- --
@ -3581,7 +3731,7 @@ Tunnel = (function()
for pid, prc in self.processes:walk( ) for pid, prc in self.processes:walk( )
do do
f:write(" pid=", pid, " type=", prc.type, " started=", ''..prc.started, '\n') f:write(" pid=", pid, " type=", prc.type, " started="..prc.started, '\n')
end end
f:write( '\n' ) f:write( '\n' )
@ -4017,7 +4167,7 @@ function splitQuotedString
return rv return rv
end end
function replaceCommand(cmd, data) function substitudeCommands(cmd, data)
assert(type(data) == "table") assert(type(data) == "table")
local getData = function(arg) local getData = function(arg)
local rv = data[arg] local rv = data[arg]
@ -5892,7 +6042,7 @@ function runner.teardown
return exitCode return exitCode
end end
--============================================================================ --============================================================================
-- Lsyncd runner's user interface -- Lsyncd runner's user interface
--============================================================================ --============================================================================

View File

@ -14,11 +14,11 @@ local testData = {
localPort = 1234, localPort = 1234,
localHost = "localhorst" localHost = "localhorst"
} }
assert(replaceCommand("echo ssh ${localHost}:${localPort}", testData) == assert(substitudeCommands("echo ssh ${localHost}:${localPort}", testData) ==
"echo ssh localhorst:1234") "echo ssh localhorst:1234")
assert(isTableEqual( assert(isTableEqual(
replaceCommand({"-p${doesNotExist}", "2${localHost}2", "-i '${localPort}'"}, testData), substitudeCommands({"-p${doesNotExist}", "2${localHost}2", "-i '${localPort}'"}, testData),
{"-p", "2localhorst2", "-i '1234'"} {"-p", "2localhorst2", "-i '1234'"}
)) ))