Implement tunnel pool mode.

In this mode, multiple tunnel processes are started and connection a load
balanced on the pool of connections.

Example config:

...
sync {
  default.rsync,
  tunnel = tunnel {
    command = {"ssh", "-N", "-L", "localhost:${localport}:localhost:873", "user@testmachine"},
    mode = "pool",
    parallel = 2,
  },
  target    = "rsync://localhost:${localport}/test",
  ...
}
This commit is contained in:
Daniel Poelzleithner 2022-03-23 23:44:19 +01:00
parent 07ec98174a
commit bfc604a591
3 changed files with 278 additions and 122 deletions

View File

@ -129,6 +129,9 @@ rsync.action = function
-- gets all events ready for syncing
local elist = inlet.getEvents( eventNotInitBlank )
local substitudes = inlet.getSubstitutionData(elist, {})
local target = substitudeCommands(config.target, substitudes)
-- gets the list of paths for the event list
-- deletes create multi match patterns
local paths = elist.getPaths( )
@ -237,7 +240,7 @@ rsync.action = function
'--include-from=-',
'--exclude=*',
config.source,
config.target
target
)
end
@ -332,6 +335,9 @@ rsync.init = function
target = config.host .. ':' .. config.targetdir
end
local substitudes = inlet.getSubstitutionData(event, {})
target = substitudeCommands(target, substitudes)
if config.delete == true
or config.delete == 'startup'
then

View File

@ -1905,6 +1905,20 @@ local InletFactory = ( function
-- TODO give a readonly handler only.
return sync.config
end,
--
-- Returns the sync for this Inlet
--
getSync = function( sync )
return sync
end,
--
-- Substitutes parameters in arguments
--
getSubstitutionData = function( sync, event, data)
return sync.getSubstitutionData(sync, event, data)
end,
}
--
@ -3106,6 +3120,16 @@ local Sync = ( function
f:write( '\n' )
end
--
-- Returns substitude data for event
--
local function getSubstitutionData(self, event, data)
if self.config.tunnel then
data = self.config.tunnel:getSubstitutionData(event, data)
end
return data
end
--
-- Creates a new Sync.
--
@ -3141,6 +3165,7 @@ local Sync = ( function
removeDelay = removeDelay,
rmExclude = rmExclude,
statusReport = statusReport,
getSubstitutionData = getSubstitutionData,
}
s.inlet = InletFactory.newInlet( s )
@ -3238,23 +3263,43 @@ Tunnel = (function()
CONNECTING = 3,
UP = 4,
RETRY_TIMEOUT = 5,
UP_RETRY_TIMEOUT = 6,
}
local TUNNEL_MODES = {
COMMAND = "command",
POOL = "pool",
}
local TUNNEL_DISTRIBUTION = {
ROUNDROBIN = "rr",
}
local TUNNEL_SUBSTITIONS = {
"localhost",
"localport"
}
local nextTunnelName = 1
Tunnel.defaults = {
mode = "command",
oneShoot = false,
mode = TUNNEL_MODES.COMMAND,
parallel = 1,
distribution = TUNNEL_DISTRIBUTION.ROUNDROBIN,
command = nil,
checkCommand = nil,
checkExitCodes = {0},
checkMaxFailed = 5,
retryDelay = 10,
readyDelay = 5
readyDelay = 5,
localhost = 'localhost',
}
-- export constants
Tunnel.TUNNEL_CMD_TYPES = TUNNEL_CMD_TYPES
Tunnel.TUNNEL_STATUS = TUNNEL_STATUS
Tunnel.TUNNEL_MODES = TUNNEL_MODES
Tunnel.TUNNEL_DISTRIBUTION = TUNNEL_DISTRIBUTION
Tunnel.TUNNEL_SUBSTITIONS = TUNNEL_SUBSTITIONS
function Tunnel.new(
options
@ -3263,9 +3308,10 @@ Tunnel = (function()
processes = CountArray.new( ),
blocks = {},
ready = false,
checksFailed = 0,
retryCount = 0,
status = TUNNEL_STATUS.DOWN,
alarm = false
alarm = false,
rrCounter = 0,
}
-- provides a default name if needed
if options.name == nil
@ -3287,16 +3333,20 @@ Tunnel = (function()
return rv
end
-- Returns the status of tunnel as text
function Tunnel:statusText()
function Tunnel.statusToText(status)
for n, i in pairs(TUNNEL_STATUS) do
if self.status == i then
if status == i then
return n
end
end
return TUNNEL_STATUS.UNKNOWN
end
-- Returns the status of tunnel as text
function Tunnel:statusText()
return Tunnel.statusToText(self.status)
end
--
-- Returns next alarm
--
@ -3311,7 +3361,42 @@ Tunnel = (function()
end
function Tunnel:isReady()
return self.status == TUNNEL_STATUS.UP
return self.status == TUNNEL_STATUS.UP or
self.status == TUNNEL_STATUS.UP_RETRY_TIMEOUT
end
function Tunnel:setStatus(status)
log('Tunnel',self.options.name,': status change: ',
self:statusText(), " -> ", Tunnel.statusToText(status))
self.status = status
end
--
-- Returns the number of processes currently running
--
function Tunnel:countProcs(timestamp)
local run = 0
local starting = 0
local dead = 0
if timestamp == nil then
timestamp = now()
end
for pid, pd in self.processes:walk() do
if pd.type == TUNNEL_CMD_TYPES.CMD then
-- process needs to run for at least some time
if lsyncd.kill(pid, 0) ~= 0 then
dead = dead + 1
elseif (pd.started + self.options.readyDelay) > timestamp then
starting = starting + 1
else
pd.ready = true
run = run + 1
end
end
end
return run, starting, dead
end
--
@ -3320,51 +3405,64 @@ Tunnel = (function()
-- lsyncd.kill()
if self:check() == false then
-- check failed, consider tunnel broken
self.status = TUNNEL_STATUS.DOWN
self:setStatus(TUNNEL_STATUS.DOWN)
end
-- check if child processes are running
if self.status == TUNNEL_STATUS.CONNECTING then
-- we can only be good if processes exist
local good = self.processes:size() > 0 or self.options.onShoot == true
for pid, pd in self.processes:walk() do
if pd.type == TUNNEL_CMD_TYPES.CMD then
-- process needs to run for at least some time
if (pd.started + self.options.readyDelay) > timestamp
or lsyncd.kill(pid, 0) ~= 0 then
-- required command does not exist
good = false
end
end
end
if good then
log(
'Tunnel',
'Setup of tunnel ', self.options.name, ' sucessfull'
)
self.status = TUNNEL_STATUS.UP
self.alarm = false
self:unblockSyncs()
else
-- not ready, postpone next check
self.alarm = now() + 1
end
end
if self.status == TUNNEL_STATUS.DOWN then
self:start()
elseif self.status == TUNNEL_STATUS.RETRY_TIMEOUT then
if self.status == TUNNEL_STATUS.RETRY_TIMEOUT then
if self.alarm <= timestamp then
log(
'Tunnel',
'Retry setup ', self.options.name
)
self:start()
return
else
-- timeout not yet reached
self.alarm = now() + 1
return
end
elseif self.status == TUNNEL_STATUS.DOWN then
self:start()
return
elseif self.status == TUNNEL_STATUS.DISABLED then
self.alarm = false
return
end
local parallel = self.options.parallel
local run, starting, dead = self:countProcs(timestamp)
-- check if enough child processes are running
if self.status == TUNNEL_STATUS.CONNECTING then
if run > 0 then
log(
'Tunnel',
'Setup of tunnel ', self.options.name, ' sucessfull'
)
self:setStatus(TUNNEL_STATUS.UP)
self:unblockSyncs()
end
elseif self.status == TUNNEL_STATUS.UP and run == 0 then
-- no good process running, degrade
log(
'Tunnel',
'Tunnel ', self.options.name, ' changed to CONNECTING'
)
self:setStatus(TUNNEL_STATUS.CONNECTING)
end
local spawned = 0
-- start more processes if necesarry
while run + starting + spawned < self.options.parallel do
self:spawn()
spawned = spawned + 1
end
-- trigger next delay
if starting + spawned == 0 then
self.alarm = false
else
self.alarm = now() + 1
end
end
@ -3418,21 +3516,24 @@ Tunnel = (function()
self.blocks = {}
end
function Tunnel:start()
if self.status == TUNNEL_STATUS.CONNECTING or
self.status == TUNNEL_STATUS.UP then
return
end
--
-- Spawn a single tunnel program
--
function Tunnel:spawn()
local opts = {
type = TUNNEL_CMD_TYPES.CMD,
started = now(),
localhost = self.options.localhost,
ready = false,
}
local cmd = self.options.command
if self.options.mode == "command" then
self.status = TUNNEL_STATUS.CONNECTING
self.alarm = false
log(
'Info',
'Start tunnel command ',
self.options.command
)
if #self.options.command < 1 then
if self.options.mode == TUNNEL_MODES.POOL then
opts.localport = lsyncd.get_free_port()
end
cmd = substitudeCommands(cmd, opts)
if #cmd < 1 then
log('Error',
'',
self.options
@ -3441,29 +3542,47 @@ Tunnel = (function()
-- FIXME: add line which tunnel was called
return false
end
local bin = self.options.command[1]
for _,v in ipairs(self.options.command) do
if type( v ) ~= 'string' then
error( 'tunnel command must be a list of strings', 2 )
end
end
local pid = lsyncd.exec(bin, table.unpack(self.options.command, 2))
local bin = cmd[1]
-- for _,v in ipairs(cmd) do
-- if type( v ) ~= 'string' then
-- error( 'tunnel command must be a list of strings', 2 )
-- end
-- end
log(
'Info',
'Start tunnel command ',
cmd
)
local pid = lsyncd.exec(bin, table.unpack(cmd, 2))
--local pid = spawn(bin, table.unpack(self.options.command, 2))
if pid and pid > 0 then
self.processes[pid] = {
type = TUNNEL_CMD_TYPES.CMD,
started = now()
}
self.checksFailed = 0
self.processes[pid] = opts
self.retryCount = 0
self.alarm = now() + 1
else
self.alarm = now() + self.options.retryDelay
self.status = TUNNEL_STATUS.RETRY_TIMEOUT
if self.status == TUNNEL_STATUS.UP then
self:setStatus(TUNNEL_STATUS.UP_RETRY_TIMEOUT)
else
self:setStatus(TUNNEL_STATUS.RETRY_TIMEOUT)
end
end
end
function Tunnel:start()
if self.status == TUNNEL_STATUS.UP or
self.status == TUNNEL_STATUS.CONNECTING then
return
end
if self.options.mode == TUNNEL_MODES.COMMAND or
self.options.mode == TUNNEL_MODES.POOL then
self:setStatus(TUNNEL_STATUS.CONNECTING)
self:invoke(now())
else
error('unknown tunnel mode:' .. self.options.mode)
self.status = TUNNEL_STATUS.DOWN
self:setStatus(TUNNEL_STATUS.DISABLED)
end
end
@ -3481,73 +3600,67 @@ Tunnel = (function()
end
log('Debug',
"collect tunnel event. pid: ", pid," exitcode: ", exitcode)
local run, starting, dead = self:countProcs()
-- cases in which the tunnel command is handled
if proc.type == TUNNEL_CMD_TYPES.CMD then
if self.options.onShot then
log(
'Info',
'Tunnel setup complete.',
self.options.name
)
self.status = TUNNEL_STATUS.UP
else
if self.status == TUNNEL_STATUS.CONNECTING then
log(
'Warning',
'Starting tunnel failed.',
self.options.name
)
self.status = TUNNEL_STATUS.RETRY_TIMEOUT
self.alarm = now() + 5
self:setStatus(TUNNEL_STATUS.RETRY_TIMEOUT)
self.alarm = now() + self.options.retryDelay
else
log(
'Info',
'Tunnel died. Restarting',
'Tunnel died. Will Restarting',
self.options.name
)
self.status = TUNNEL_STATUS.DOWN
self.alarm = true
self:start()
if run == 0 then
self:setStatus(TUNNEL_STATUS.DOWN)
end
self.alarm = true
end
-- cases in which the check function has executed a program
elseif proc.type == TUNNEL_CMD_TYPES.CHK then
local found = false
local good = false
if type(self.options.checkExitCodes) == 'table' then
for _,i in iwalk(self.options.checkExitCodes) do
if exitcode == i then
found = true
good = true
end
end
else
if self.options.checkExitCodes == exitcode then
found = true
good = true
end
end
if found then
if self.ready == false then
if good then
if self.isReady() == false then
log(
'Info',
'Tunnel setup complete ',
self.options.name
self.options.name,
' Tunnel setup complete '
)
end
self.ready = true
self:setStatus(TUNNEL_STATUS.UP)
self.checksFailed = 0
else
if self.ready == true then
if self.ready then
log(
'Info',
'Check failed.',
'Tunnel',
self.options.name
' Check failed.'
)
self.checksFailed = self.checksFailed + 1
if self.checksFailed > self.options.checkMaxFailed then
self:kill()
end
end
self.ready = false
self:setStatus(TUNNEL_STATUS.DOWN)
end
end
self.processes[pid] = nil
@ -3564,13 +3677,50 @@ Tunnel = (function()
lsyncd.kill(pid, 9)
end
end
self.status = TUNNEL_STATUS.DISABLED
self:setStatus(TUNNEL_STATUS.DISABLED)
end
function Tunnel:isReady ()
return self.status == TUNNEL_STATUS.UP
end
--
-- Fills/changes the opts table with additional values
-- for the transfer to be started
--
function Tunnel:getSubstitutionData(event, opts)
local useProc, useProcLast = nil, nil
if self.options.mode == TUNNEL_MODES.POOL then
if self.options.distribution == TUNNEL_DISTRIBUTION.ROUNDROBIN then
local i = 0
for pid, proc in self.processes:walk() do
if proc.ready == true then
useProcLast = proc
if (i % self.processes:size()) == self.rrCounter then
useProc = proc
self.rrCounter = self.rrCounter + 1
end
end
end
if useProc == nil then
self.rrCounter = 0
useProc = useProcLast
end
else
log('Tunnel', 'Unknown distribution mode: ', self.options.distribution)
os.exit(1)
end
end
if useProc then
for k,v in pairs(self.TUNNEL_SUBSTITIONS) do
if useProc[v] ~= nil then
opts[v] = useProc[v]
end
end
end
return opts
end
--
-- Writes a status report about this tunnel
--
@ -3581,7 +3731,7 @@ Tunnel = (function()
for pid, prc in self.processes:walk( )
do
f:write(" pid=", pid, " type=", prc.type, " started=", ''..prc.started, '\n')
f:write(" pid=", pid, " type=", prc.type, " started="..prc.started, '\n')
end
f:write( '\n' )
@ -4017,7 +4167,7 @@ function splitQuotedString
return rv
end
function replaceCommand(cmd, data)
function substitudeCommands(cmd, data)
assert(type(data) == "table")
local getData = function(arg)
local rv = data[arg]

View File

@ -14,11 +14,11 @@ local testData = {
localPort = 1234,
localHost = "localhorst"
}
assert(replaceCommand("echo ssh ${localHost}:${localPort}", testData) ==
assert(substitudeCommands("echo ssh ${localHost}:${localPort}", testData) ==
"echo ssh localhorst:1234")
assert(isTableEqual(
replaceCommand({"-p${doesNotExist}", "2${localHost}2", "-i '${localPort}'"}, testData),
substitudeCommands({"-p${doesNotExist}", "2${localHost}2", "-i '${localPort}'"}, testData),
{"-p", "2localhorst2", "-i '1234'"}
))