[WIP] Add tunnel support

This commit is contained in:
Daniel Poelzleithner 2022-03-11 07:11:39 +01:00
parent 24ef43f5fd
commit 274b2b0416
3 changed files with 615 additions and 110 deletions

View File

@ -52,6 +52,7 @@ default.checkgauge = {
prepare = true,
source = true,
target = true,
tunnel = true,
}
--

View File

@ -443,6 +443,20 @@ printlogf0(lua_State *L,
}
/*
| Print a traceback of the error
*/
static int traceback (lua_State *L) {
lua_getglobal(L, "debug");
lua_getfield(L, -1, "traceback");
lua_pushvalue(L, 1);
lua_pushinteger(L, 2);
lua_call(L, 2, 1);
printlogf( L, "traceback", "%s", lua_tostring(L, -1) );
return 1;
}
/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*
( Simple memory management )
*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/
@ -1083,6 +1097,28 @@ l_now(lua_State *L)
return 1;
}
/*
| Sends a signal to proceess pid
|
| Params on Lua stack:
| 1: pid
| 2: signal
|
| Returns on Lua stack:
| return value of kill
*/
static int
l_kill( lua_State *L )
{
pid_t pid = luaL_checkinteger( L, 1 );
int sig = luaL_checkinteger( L, 2 );
int rv = kill(pid, sig );
lua_pushinteger( L, rv );
return 1;
}
/*
| Executes a subprocess. Does not wait for it to return.
@ -1844,6 +1880,7 @@ static const luaL_Reg lsyncdlib[] =
{ "exec", l_exec },
{ "log", l_log },
{ "now", l_now },
{ "kill", l_kill },
{ "nonobserve_fd", l_nonobserve_fd },
{ "observe_fd", l_observe_fd },
{ "readdir", l_readdir },
@ -2796,6 +2833,7 @@ main1( int argc, char *argv[] )
lsyncd_config_file,
lua_tostring( L, -1 )
);
traceback(L);
exit( -1 );
}

View File

@ -28,16 +28,6 @@ end
lsyncd_version = '2.2.3'
--
-- Hides the core interface from user scripts.
--
local _l = lsyncd
lsyncd = nil
local lsyncd = _l
_l = nil
--
-- Shortcuts (which user is supposed to be able to use them as well)
--
@ -47,6 +37,120 @@ now = lsyncd.now
readdir = lsyncd.readdir
inheritKV = nil
--
-- Recurvely inherits a source table to a destionation table
-- copying all keys from source.
--
-- All entries with integer keys are inherited as additional
-- sources for non-verbatim tables
--
inherit = function
(
cd, -- table copy destination
cs, -- table copy source
verbatim, -- forced verbatim ( for e.g. 'exitcodes' )
ignored -- table of keys not to copy
)
-- First copies all entries with non-integer keys.
--
-- Tables are merged; already present keys are not
-- overwritten
--
-- For verbatim tables integer keys are treated like
-- non-integer keys
for k, v in pairs( cs )
do
if type(ignored) == 'table' and table.contains(ignored, k)
then
-- do nothing
-- print("ignore x", k)
elseif
(
type( k ) ~= 'number'
or verbatim
or cs._verbatim == true
)
and
(
type( cs ) ~= CountArray
and type( cs._merge ) ~= 'table'
or cs._merge[ k ] == true
)
then
inheritKV( cd, k, v )
end
end
-- recursevely inherits all integer keyed tables
-- ( for non-verbatim tables )
if cs._verbatim ~= true
then
for k, v in ipairs( cs )
do
if type( v ) == 'table'
then
inherit( cd, v )
else
cd[ #cd + 1 ] = v
end
end
end
end
table.contains = function
(
t, -- array to search in. Only the numeric values are tested
needle -- value to search for
)
for _, v in ipairs(t) do
if needle == v then
return true
end
end
return false
end
-- lsyncd.inherit = inherit
-- print("inherit ")
--
-- Helper to inherit. Inherits one key.
--
inheritKV =
function(
cd, -- table copy destination
k, -- key
v -- value
)
-- don't merge inheritance controls
if k == '_merge' or k == '_verbatim' then return end
local dtype = type( cd [ k ] )
if type( v ) == 'table'
then
if dtype == 'nil'
then
cd[ k ] = { }
inherit( cd[ k ], v, k == 'exitcodes' )
elseif
dtype == 'table' and
v._merge ~= false
then
inherit( cd[ k ], v, k == 'exitcodes' )
end
elseif dtype == 'nil'
then
cd[ k ] = v
end
end
--
-- Coping globals to ensure userscripts cannot change this.
--
@ -54,6 +158,8 @@ local log = log
local terminate = terminate
local now = now
local readdir = readdir
local inherit = inherit
local inheritKV = inheritKV
--
-- Predeclarations.
@ -201,6 +307,10 @@ local CountArray = ( function
t, -- table being accessed
k -- key used to access
)
if k == '_merge' or k == '_verbatim'
then
return nil
end
if type( k ) ~= 'number'
then
error( 'Key "' .. k .. '" invalid for CountArray', 2 )
@ -991,6 +1101,7 @@ local Combiner = ( function
-- The new delay replaces the old one if it's a file.
--
local function logReplace
(
d1, -- old delay
d2 -- new delay
@ -2812,6 +2923,14 @@ local Sync = ( function
return
end
-- tunnel configured but not up
if self.config.tunnel and
self.config.tunnel:isReady() == false then
log('Tunnel', 'Tunnel for Sync ', self.config.name, ' not ready. Blocking events')
self.config.tunnel:blockSync(self)
return
end
if self.processes:size( ) >= self.config.maxProcesses
then
-- no new processes
@ -3001,6 +3120,7 @@ local Sync = ( function
filters = nil,
initDone = false,
disabled = false,
tunnelBlock = nil,
-- functions
addBlanketDelay = addBlanketDelay,
@ -3095,6 +3215,422 @@ local Sync = ( function
end )( )
--
-- Basic Tunnel provider.
--
Tunnel = (function()
Tunnel = {}
local TUNNEL_CMD_TYPES = {
CMD = 1,
CHK = 2
}
local TUNNEL_STATUS = {
DOWN = 0,
CONNECTING = 1,
UP = 2,
RETRY_TIMEOUT = 3,
}
local nextTunnelName = 1
Tunnel.defaults = {
mode = "command",
oneShoot = false,
command = nil,
checkCommand = nil,
checkExitCodes = {0},
checkMaxFailed = 5,
reconnectDelay = 10
}
-- export constants
Tunnel.TUNNEL_CMD_TYPES = TUNNEL_CMD_TYPES
Tunnel.TUNNEL_STATUS = TUNNEL_STATUS
function Tunnel.new(
options
)
local rv = {
processes = CountArray.new( ),
blocks = {},
ready = false,
checksFailed = 0,
status = TUNNEL_STATUS.DOWN,
alarm = false
}
-- provides a default name if needed
if options.name ~= nil
then
options.name = 'Tunnel' .. nextTunnelName
end
nextTunnelName = nextTunnelName + 1
inherit(options, Tunnel.defaults)
rv.options = options
inherit(rv, Tunnel)
--setmetatable(rv, Tunnel)
-- self.__index = self
return rv
end
--
-- Returns next alarm
--
function Tunnel:getAlarm()
return self.alarm
end
--
-- Check if the tunnel is up
function Tunnel:check()
end
function Tunnel:isReady()
return self.status == TUNNEL_STATUS.UP
end
--
-- Check if the tunnel is up
function Tunnel:invoke(timestamp)
-- lsyncd.kill()
-- check if child processes are running
if self.status == TUNNEL_STATUS.CONNECTING then
local good = true
for pid, type in ipairs(self.processes) do
if type == TUNNEL_CMD_TYPES.CMD
and lsyncd.kill(pid, 0) ~= 0 then
-- required command does not exist
good = false
end
end
if good then
log(
'Tunnel',
'Setup of tunnel ', self.options.name, ' sucessfull'
)
self.status = TUNNEL_STATUS.UP
self.alarm = false
self:unblockSyncs()
else
-- not ready, postpone next check
self.alarm = now() + 1
end
end
if self.status == TUNNEL_STATUS.DOWN then
self:start()
elseif self.status == TUNNEL_STATUS.RETRY_TIMEOUT then
log(
'Tunnel',
'Retry setup ', self.options.name
)
if self.alarm <= timestamp then
self:start()
end
end
end
--
-- Check if Sync is already blocked by Tunnel
function Tunnel:getBlockerForSync(
sync
)
for _, eblock in ipairs(self.blocks) do
if eblock.sync == sync then
return eblock
end
end
return nil
end
--
-- Create a block on the sync until the tunnel reaches ready state
function Tunnel:blockSync(
sync
)
local block = self:getBlockerForSync(sync)
if block then
-- delay the block by another second
block:wait( now( ) + 1 )
return
end
local block = sync:addBlanketDelay()
sync.tunnelBlock = block
table.insert (self.blocks, block)
-- set the new delay to be a block for existing delays
for _, eblock in sync.delays:qpairs() do
if eblock ~= block then
eblock:blockedBy(block)
end
end
-- delay tunnel check by 1 second
block:wait( now( ) + 1 )
end
--
-- Create a block on the sync until the tunnel reaches ready state
function Tunnel:unblockSyncs()
for i,blk in ipairs(self.blocks) do
blk.sync:removeDelay(blk)
blk.sync.tunnelBlock = nil
end
self.blocks = {}
end
function Tunnel:start()
if self.status == TUNNEL_STATUS.CONNECTING or
self.status == TUNNEL_STATUS.UP then
return
end
if self.options.mode == "command" then
self.status = TUNNEL_STATUS.CONNECTING
self.alarm = false
log(
'Info',
'Start tunnel command ',
self.options.command
)
if #self.options.command < 1 then
log('Error',
'',
self.options
)
error( 'start tunnel of mode command with empty command', 2 )
-- FIXME: add line which tunnel was called
return false
end
local bin = self.options.command[1]
for _,v in ipairs(self.options.command) do
if type( v ) ~= 'string' then
error( 'tunnel command must be a list of strings', 2 )
end
end
local pid = lsyncd.exec(bin, table.unpack(self.options.command, 2))
--local pid = spawn(bin, table.unpack(self.options.command, 2))
if pid and pid > 0 then
self.processes[pid] = TUNNEL_CMD_TYPES.CMD
self.checksFailed = 0
self.alarm = now() + 1
else
self.alarm = now() + self.options.reconnectDelay
self.status = TUNNEL_STATUS.RETRY_TIMEOUT
end
else
error('unknown tunnel mode:' .. self.options.mode)
self.status = TUNNEL_STATUS.DOWN
end
end
--
-- collect pids of exited child processes. Restart the tunnel if necessary
---
function Tunnel:collect (
pid,
exitcode
)
local ctype = self.processes[pid]
-- cases in which the tunnel command is handled
if ctype == TUNNEL_CMD_TYPES.CMD then
if self.options.onShot then
log(
'Info',
'Tunnel setup complete.',
self.options.name
)
self.status = TUNNEL_STATUS.UP
else
log(
'Info',
'Tunnel died. Restarting',
self.options.name
)
self.status = TUNNEL_STATUS.DOWN
self.alarm = now() + 1
self:start()
end
-- cases in which the check function has executed a program
elseif ctype == TUNNEL_CMD_TYPES.CHK then
local found = false
if type(self.options.checkExitCodes) == 'table' then
for _,i in iwalk(self.options.checkExitCodes) do
if exitcode == i then
found = true
end
end
else
if self.options.checkExitCodes == exitcode then
found = true
end
end
if found then
if self.ready == false then
log(
'Info',
'Tunnel setup complete ',
self.options.name
)
end
self.ready = true
else
if self.ready == true then
log(
'Info',
'Check failed.',
self.options.name
)
self.checksFailed = self.checksFailed + 1
if self.checksFailed > self.options.checkMaxFailed then
self:kill()
end
end
self.ready = false
end
end
self.processes[pid] = nil
end
--
-- Stops all tunnel processes
--
function Tunnel:kill ()
for pid, typ in pairs(self.processes) do
if typ == TUNNEL_CMD_TYPES.CMD then
lsyncd.kill(pid, 9)
end
end
self.status = TUNNEL_STATUS.DOWN
end
function Tunnel:isReady ()
return self.status == TUNNEL_STATUS.UP
end
return Tunnel
end)() -- Tunnel scope
--
-- Tunnels - a singleton
--
-- Tunnels maintains all configured tunnels.
--
local Tunnels = ( function
( )
--
-- the list of all tunnels
--
local tunnelList = Array.new( )
--
-- Returns sync at listpos i
--
local function get
( i )
return tunnelList[ i ];
end
--
-- Adds a new tunnel.
--
local function add
(
tunnel
)
table.insert( tunnelList, tunnel )
return tunnel
end
--
-- Allows a for-loop to walk through all syncs.
--
local function iwalk
( )
return ipairs( tunnelList )
end
--
-- Returns the number of syncs.
--
local size = function
( )
return #tunnelList
end
local nextCycle = false
--
-- Cycle through all tunnels and call their invoke function
--
local function invoke(timestamp)
if nextCycle and nextCycle > timestamp then
return
end
for _,tunnel in ipairs( tunnelList )
do
print("invoke t", tunnel)
tunnel:invoke(timestamp)
end
nextCycle = now() + 5
end
--
-- returns the next alarm
--
local function getAlarm()
local rv = nextCycle
for _, tunnel in ipairs( tunnelList ) do
local ta = tunnel:getAlarm()
if ta ~= false
and ta < rv then
rv = ta
end
end
return rv
end
--
-- Public interface
--
return {
add = add,
get = get,
iwalk = iwalk,
size = size,
invoke = invoke,
getAlarm = getAlarm
}
end )( )
--
-- create a new tunnel from the passed options and registers the tunnel
tunnel = function (options)
log(
'Debug',
'create tunnel:', options
)
local rv = Tunnel.new(options)
Tunnels.add(rv)
return rv
end
--
-- Syncs - a singleton
--
@ -3144,100 +3680,6 @@ local Syncs = ( function
return syncsList[ i ];
end
--
-- Helper function for inherit
-- defined below
--
local inheritKV
--
-- Recurvely inherits a source table to a destionation table
-- copying all keys from source.
--
-- All entries with integer keys are inherited as additional
-- sources for non-verbatim tables
--
local function inherit
(
cd, -- table copy destination
cs, -- table copy source
verbatim -- forced verbatim ( for e.g. 'exitcodes' )
)
-- First copies all entries with non-integer keys.
--
-- Tables are merged; already present keys are not
-- overwritten
--
-- For verbatim tables integer keys are treated like
-- non-integer keys
for k, v in pairs( cs )
do
if
(
type( k ) ~= 'number'
or verbatim
or cs._verbatim == true
)
and
(
type( cs._merge ) ~= 'table'
or cs._merge[ k ] == true
)
then
inheritKV( cd, k, v )
end
end
-- recursevely inherits all integer keyed tables
-- ( for non-verbatim tables )
if cs._verbatim ~= true
then
for k, v in ipairs( cs )
do
if type( v ) == 'table'
then
inherit( cd, v )
else
cd[ #cd + 1 ] = v
end
end
end
end
--
-- Helper to inherit. Inherits one key.
--
inheritKV =
function(
cd, -- table copy destination
k, -- key
v -- value
)
-- don't merge inheritance controls
if k == '_merge' or k == '_verbatim' then return end
local dtype = type( cd [ k ] )
if type( v ) == 'table'
then
if dtype == 'nil'
then
cd[ k ] = { }
inherit( cd[ k ], v, k == 'exitcodes' )
elseif
dtype == 'table' and
v._merge ~= false
then
inherit( cd[ k ], v, k == 'exitcodes' )
end
elseif dtype == 'nil'
then
cd[ k ] = v
end
end
--
-- Adds a new sync.
@ -3265,13 +3707,18 @@ local Syncs = ( function
config = { }
inherit( config, uconfig )
-- inherit the config but do not deep copy the tunnel object
-- the tunnel object is a reference to a object that might be shared
inherit( config, uconfig, nil, {"tunnel"} )
--
-- last and least defaults are inherited
--
inherit( config, default )
-- copy references
config.tunnel = uconfig.tunnel
local inheritSettings = {
'delay',
'maxDelays',
@ -4529,17 +4976,27 @@ function runner.collectProcess
pid, -- process id
exitcode -- exitcode
)
processCount = processCount - 1
for _, s in Syncs.iwalk( )
do
if s:collect( pid, exitcode ) then
processCount = processCount - 1
break
end
end
for _, s in Tunnels.iwalk( )
do
if s:collect( pid, exitcode ) then
break
end
end
if processCount < 0
then
error( 'negative number of processes!' )
end
for _, s in Syncs.iwalk( )
do
if s:collect( pid, exitcode ) then return end
end
end
--
@ -4585,6 +5042,13 @@ function runner.cycle(
error( 'runner.cycle() called while not running!' )
end
-- check and start tunnels
if not uSettings.maxProcesses
or processCount < uSettings.maxProcesses
then
Tunnels.invoke( timestamp )
end
if uSettings.onepass
then
local allDone = true
@ -5221,6 +5685,8 @@ function runner.getAlarm
'at global process limit.'
)
end
-- checks for tunnel alarm
checkAlarm( Tunnels.getAlarm( ) )
-- checks if a statusfile write has been delayed
checkAlarm( StatusFile.getAlarm( ) )