From 274b2b04161246dbfd1eec010bc6ee5aa71088f4 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Fri, 11 Mar 2022 07:11:39 +0100 Subject: [PATCH] [WIP] Add tunnel support --- default.lua | 1 + lsyncd.c | 38 +++ lsyncd.lua | 686 +++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 615 insertions(+), 110 deletions(-) diff --git a/default.lua b/default.lua index 6a25474..e180fd7 100644 --- a/default.lua +++ b/default.lua @@ -52,6 +52,7 @@ default.checkgauge = { prepare = true, source = true, target = true, + tunnel = true, } -- diff --git a/lsyncd.c b/lsyncd.c index 4106333..f6b55ab 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -443,6 +443,20 @@ printlogf0(lua_State *L, } +/* + | Print a traceback of the error + */ +static int traceback (lua_State *L) { + lua_getglobal(L, "debug"); + lua_getfield(L, -1, "traceback"); + lua_pushvalue(L, 1); + lua_pushinteger(L, 2); + lua_call(L, 2, 1); + printlogf( L, "traceback", "%s", lua_tostring(L, -1) ); + return 1; +} + + /*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~* ( Simple memory management ) *~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/ @@ -1083,6 +1097,28 @@ l_now(lua_State *L) return 1; } +/* +| Sends a signal to proceess pid +| +| Params on Lua stack: +| 1: pid +| 2: signal +| +| Returns on Lua stack: +| return value of kill +*/ +static int +l_kill( lua_State *L ) +{ + pid_t pid = luaL_checkinteger( L, 1 ); + int sig = luaL_checkinteger( L, 2 ); + + int rv = kill(pid, sig ); + + lua_pushinteger( L, rv ); + + return 1; +} /* | Executes a subprocess. Does not wait for it to return. @@ -1844,6 +1880,7 @@ static const luaL_Reg lsyncdlib[] = { "exec", l_exec }, { "log", l_log }, { "now", l_now }, + { "kill", l_kill }, { "nonobserve_fd", l_nonobserve_fd }, { "observe_fd", l_observe_fd }, { "readdir", l_readdir }, @@ -2796,6 +2833,7 @@ main1( int argc, char *argv[] ) lsyncd_config_file, lua_tostring( L, -1 ) ); + traceback(L); exit( -1 ); } diff --git a/lsyncd.lua b/lsyncd.lua index f0b2df4..5949f05 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -28,16 +28,6 @@ end lsyncd_version = '2.2.3' --- --- Hides the core interface from user scripts. --- -local _l = lsyncd -lsyncd = nil - -local lsyncd = _l -_l = nil - - -- -- Shortcuts (which user is supposed to be able to use them as well) -- @@ -47,6 +37,120 @@ now = lsyncd.now readdir = lsyncd.readdir +inheritKV = nil + +-- +-- Recurvely inherits a source table to a destionation table +-- copying all keys from source. +-- +-- All entries with integer keys are inherited as additional +-- sources for non-verbatim tables +-- +inherit = function +( + cd, -- table copy destination + cs, -- table copy source + verbatim, -- forced verbatim ( for e.g. 'exitcodes' ) + ignored -- table of keys not to copy +) + -- First copies all entries with non-integer keys. + -- + -- Tables are merged; already present keys are not + -- overwritten + -- + -- For verbatim tables integer keys are treated like + -- non-integer keys + for k, v in pairs( cs ) + do + if type(ignored) == 'table' and table.contains(ignored, k) + then + -- do nothing + -- print("ignore x", k) + elseif + ( + type( k ) ~= 'number' + or verbatim + or cs._verbatim == true + ) + and + ( + type( cs ) ~= CountArray + and type( cs._merge ) ~= 'table' + or cs._merge[ k ] == true + ) + then + inheritKV( cd, k, v ) + end + end + + -- recursevely inherits all integer keyed tables + -- ( for non-verbatim tables ) + if cs._verbatim ~= true + then + for k, v in ipairs( cs ) + do + if type( v ) == 'table' + then + inherit( cd, v ) + else + cd[ #cd + 1 ] = v + end + end + + end +end + + +table.contains = function +( + t, -- array to search in. Only the numeric values are tested + needle -- value to search for +) + for _, v in ipairs(t) do + if needle == v then + return true + end + end + return false +end + +-- lsyncd.inherit = inherit +-- print("inherit ") + + +-- +-- Helper to inherit. Inherits one key. +-- +inheritKV = + function( + cd, -- table copy destination + k, -- key + v -- value + ) + + -- don't merge inheritance controls + if k == '_merge' or k == '_verbatim' then return end + + local dtype = type( cd [ k ] ) + + if type( v ) == 'table' + then + if dtype == 'nil' + then + cd[ k ] = { } + inherit( cd[ k ], v, k == 'exitcodes' ) + elseif + dtype == 'table' and + v._merge ~= false + then + inherit( cd[ k ], v, k == 'exitcodes' ) + end + elseif dtype == 'nil' + then + cd[ k ] = v + end +end + -- -- Coping globals to ensure userscripts cannot change this. -- @@ -54,6 +158,8 @@ local log = log local terminate = terminate local now = now local readdir = readdir +local inherit = inherit +local inheritKV = inheritKV -- -- Predeclarations. @@ -201,6 +307,10 @@ local CountArray = ( function t, -- table being accessed k -- key used to access ) + if k == '_merge' or k == '_verbatim' + then + return nil + end if type( k ) ~= 'number' then error( 'Key "' .. k .. '" invalid for CountArray', 2 ) @@ -991,6 +1101,7 @@ local Combiner = ( function -- The new delay replaces the old one if it's a file. -- local function logReplace + ( d1, -- old delay d2 -- new delay @@ -2812,6 +2923,14 @@ local Sync = ( function return end + -- tunnel configured but not up + if self.config.tunnel and + self.config.tunnel:isReady() == false then + log('Tunnel', 'Tunnel for Sync ', self.config.name, ' not ready. Blocking events') + self.config.tunnel:blockSync(self) + return + end + if self.processes:size( ) >= self.config.maxProcesses then -- no new processes @@ -3001,6 +3120,7 @@ local Sync = ( function filters = nil, initDone = false, disabled = false, + tunnelBlock = nil, -- functions addBlanketDelay = addBlanketDelay, @@ -3095,6 +3215,422 @@ local Sync = ( function end )( ) +-- +-- Basic Tunnel provider. +-- +Tunnel = (function() + + Tunnel = {} + + local TUNNEL_CMD_TYPES = { + CMD = 1, + CHK = 2 + } + + local TUNNEL_STATUS = { + DOWN = 0, + CONNECTING = 1, + UP = 2, + RETRY_TIMEOUT = 3, + } + + local nextTunnelName = 1 + + Tunnel.defaults = { + mode = "command", + oneShoot = false, + command = nil, + checkCommand = nil, + checkExitCodes = {0}, + checkMaxFailed = 5, + reconnectDelay = 10 + } + -- export constants + Tunnel.TUNNEL_CMD_TYPES = TUNNEL_CMD_TYPES + Tunnel.TUNNEL_STATUS = TUNNEL_STATUS + + function Tunnel.new( + options + ) + local rv = { + processes = CountArray.new( ), + blocks = {}, + ready = false, + checksFailed = 0, + status = TUNNEL_STATUS.DOWN, + alarm = false + } + -- provides a default name if needed + if options.name ~= nil + then + options.name = 'Tunnel' .. nextTunnelName + end + + nextTunnelName = nextTunnelName + 1 + + inherit(options, Tunnel.defaults) + + rv.options = options + + inherit(rv, Tunnel) + + --setmetatable(rv, Tunnel) + -- self.__index = self + + return rv + end + + -- + -- Returns next alarm + -- + function Tunnel:getAlarm() + return self.alarm + end + + -- + -- Check if the tunnel is up + function Tunnel:check() + + end + + function Tunnel:isReady() + return self.status == TUNNEL_STATUS.UP + end + + -- + -- Check if the tunnel is up + function Tunnel:invoke(timestamp) + -- lsyncd.kill() + -- check if child processes are running + if self.status == TUNNEL_STATUS.CONNECTING then + local good = true + for pid, type in ipairs(self.processes) do + if type == TUNNEL_CMD_TYPES.CMD + and lsyncd.kill(pid, 0) ~= 0 then + -- required command does not exist + good = false + end + end + if good then + log( + 'Tunnel', + 'Setup of tunnel ', self.options.name, ' sucessfull' + ) + self.status = TUNNEL_STATUS.UP + self.alarm = false + self:unblockSyncs() + else + -- not ready, postpone next check + self.alarm = now() + 1 + end + end + + if self.status == TUNNEL_STATUS.DOWN then + self:start() + elseif self.status == TUNNEL_STATUS.RETRY_TIMEOUT then + log( + 'Tunnel', + 'Retry setup ', self.options.name + ) + if self.alarm <= timestamp then + self:start() + end + end + end + + -- + -- Check if Sync is already blocked by Tunnel + function Tunnel:getBlockerForSync( + sync + ) + for _, eblock in ipairs(self.blocks) do + if eblock.sync == sync then + return eblock + end + end + return nil + end + + -- + -- Create a block on the sync until the tunnel reaches ready state + function Tunnel:blockSync( + sync + ) + local block = self:getBlockerForSync(sync) + + if block then + -- delay the block by another second + block:wait( now( ) + 1 ) + return + end + + local block = sync:addBlanketDelay() + sync.tunnelBlock = block + + table.insert (self.blocks, block) + -- set the new delay to be a block for existing delays + for _, eblock in sync.delays:qpairs() do + if eblock ~= block then + eblock:blockedBy(block) + end + end + -- delay tunnel check by 1 second + block:wait( now( ) + 1 ) + end + + -- + -- Create a block on the sync until the tunnel reaches ready state + function Tunnel:unblockSyncs() + for i,blk in ipairs(self.blocks) do + blk.sync:removeDelay(blk) + blk.sync.tunnelBlock = nil + end + self.blocks = {} + end + + function Tunnel:start() + if self.status == TUNNEL_STATUS.CONNECTING or + self.status == TUNNEL_STATUS.UP then + return + end + + if self.options.mode == "command" then + self.status = TUNNEL_STATUS.CONNECTING + self.alarm = false + log( + 'Info', + 'Start tunnel command ', + self.options.command + ) + if #self.options.command < 1 then + log('Error', + '', + self.options + ) + error( 'start tunnel of mode command with empty command', 2 ) + -- FIXME: add line which tunnel was called + return false + end + local bin = self.options.command[1] + for _,v in ipairs(self.options.command) do + if type( v ) ~= 'string' then + error( 'tunnel command must be a list of strings', 2 ) + end + end + local pid = lsyncd.exec(bin, table.unpack(self.options.command, 2)) + --local pid = spawn(bin, table.unpack(self.options.command, 2)) + if pid and pid > 0 then + self.processes[pid] = TUNNEL_CMD_TYPES.CMD + self.checksFailed = 0 + self.alarm = now() + 1 + else + self.alarm = now() + self.options.reconnectDelay + self.status = TUNNEL_STATUS.RETRY_TIMEOUT + end + + else + error('unknown tunnel mode:' .. self.options.mode) + self.status = TUNNEL_STATUS.DOWN + end + end + + -- + -- collect pids of exited child processes. Restart the tunnel if necessary + --- + function Tunnel:collect ( + pid, + exitcode + ) + local ctype = self.processes[pid] + -- cases in which the tunnel command is handled + if ctype == TUNNEL_CMD_TYPES.CMD then + if self.options.onShot then + log( + 'Info', + 'Tunnel setup complete.', + self.options.name + ) + self.status = TUNNEL_STATUS.UP + else + log( + 'Info', + 'Tunnel died. Restarting', + self.options.name + ) + self.status = TUNNEL_STATUS.DOWN + self.alarm = now() + 1 + self:start() + end + -- cases in which the check function has executed a program + elseif ctype == TUNNEL_CMD_TYPES.CHK then + local found = false + if type(self.options.checkExitCodes) == 'table' then + + for _,i in iwalk(self.options.checkExitCodes) do + if exitcode == i then + found = true + end + end + else + if self.options.checkExitCodes == exitcode then + found = true + end + end + if found then + if self.ready == false then + log( + 'Info', + 'Tunnel setup complete ', + self.options.name + ) + end + self.ready = true + else + if self.ready == true then + log( + 'Info', + 'Check failed.', + self.options.name + ) + self.checksFailed = self.checksFailed + 1 + if self.checksFailed > self.options.checkMaxFailed then + self:kill() + end + end + self.ready = false + end + end + self.processes[pid] = nil + end + + -- + -- Stops all tunnel processes + -- + function Tunnel:kill () + for pid, typ in pairs(self.processes) do + if typ == TUNNEL_CMD_TYPES.CMD then + lsyncd.kill(pid, 9) + end + end + self.status = TUNNEL_STATUS.DOWN + end + + function Tunnel:isReady () + return self.status == TUNNEL_STATUS.UP + end + + return Tunnel +end)() -- Tunnel scope + +-- +-- Tunnels - a singleton +-- +-- Tunnels maintains all configured tunnels. +-- +local Tunnels = ( function + ( ) + -- + -- the list of all tunnels + -- + local tunnelList = Array.new( ) + + -- + -- Returns sync at listpos i + -- + local function get + ( i ) + return tunnelList[ i ]; + end + + -- + -- Adds a new tunnel. + -- + local function add + ( + tunnel + ) + table.insert( tunnelList, tunnel ) + + return tunnel + end + + -- + -- Allows a for-loop to walk through all syncs. + -- + local function iwalk + ( ) + return ipairs( tunnelList ) + end + + -- + -- Returns the number of syncs. + -- + local size = function + ( ) + return #tunnelList + end + + local nextCycle = false + -- + -- Cycle through all tunnels and call their invoke function + -- + local function invoke(timestamp) + if nextCycle and nextCycle > timestamp then + return + end + for _,tunnel in ipairs( tunnelList ) + do + print("invoke t", tunnel) + tunnel:invoke(timestamp) + end + nextCycle = now() + 5 + end + + -- + -- returns the next alarm + -- + local function getAlarm() + local rv = nextCycle + for _, tunnel in ipairs( tunnelList ) do + local ta = tunnel:getAlarm() + if ta ~= false + and ta < rv then + rv = ta + end + end + + return rv + end + + -- + -- Public interface + -- + return { + add = add, + get = get, + iwalk = iwalk, + size = size, + invoke = invoke, + getAlarm = getAlarm + } + end )( ) + + +-- +-- create a new tunnel from the passed options and registers the tunnel +tunnel = function (options) + log( + 'Debug', + 'create tunnel:', options + ) + local rv = Tunnel.new(options) + Tunnels.add(rv) + + return rv + +end + + -- -- Syncs - a singleton -- @@ -3144,100 +3680,6 @@ local Syncs = ( function return syncsList[ i ]; end - -- - -- Helper function for inherit - -- defined below - -- - local inheritKV - - -- - -- Recurvely inherits a source table to a destionation table - -- copying all keys from source. - -- - -- All entries with integer keys are inherited as additional - -- sources for non-verbatim tables - -- - local function inherit - ( - cd, -- table copy destination - cs, -- table copy source - verbatim -- forced verbatim ( for e.g. 'exitcodes' ) - ) - -- First copies all entries with non-integer keys. - -- - -- Tables are merged; already present keys are not - -- overwritten - -- - -- For verbatim tables integer keys are treated like - -- non-integer keys - for k, v in pairs( cs ) - do - if - ( - type( k ) ~= 'number' - or verbatim - or cs._verbatim == true - ) - and - ( - type( cs._merge ) ~= 'table' - or cs._merge[ k ] == true - ) - then - inheritKV( cd, k, v ) - end - end - - -- recursevely inherits all integer keyed tables - -- ( for non-verbatim tables ) - if cs._verbatim ~= true - then - for k, v in ipairs( cs ) - do - if type( v ) == 'table' - then - inherit( cd, v ) - else - cd[ #cd + 1 ] = v - end - end - - end - end - - -- - -- Helper to inherit. Inherits one key. - -- - inheritKV = - function( - cd, -- table copy destination - k, -- key - v -- value - ) - - -- don't merge inheritance controls - if k == '_merge' or k == '_verbatim' then return end - - local dtype = type( cd [ k ] ) - - if type( v ) == 'table' - then - if dtype == 'nil' - then - cd[ k ] = { } - inherit( cd[ k ], v, k == 'exitcodes' ) - elseif - dtype == 'table' and - v._merge ~= false - then - inherit( cd[ k ], v, k == 'exitcodes' ) - end - elseif dtype == 'nil' - then - cd[ k ] = v - end - end - -- -- Adds a new sync. @@ -3265,13 +3707,18 @@ local Syncs = ( function config = { } - inherit( config, uconfig ) + -- inherit the config but do not deep copy the tunnel object + -- the tunnel object is a reference to a object that might be shared + inherit( config, uconfig, nil, {"tunnel"} ) -- -- last and least defaults are inherited -- inherit( config, default ) + -- copy references + config.tunnel = uconfig.tunnel + local inheritSettings = { 'delay', 'maxDelays', @@ -4529,17 +4976,27 @@ function runner.collectProcess pid, -- process id exitcode -- exitcode ) - processCount = processCount - 1 + for _, s in Syncs.iwalk( ) + do + if s:collect( pid, exitcode ) then + processCount = processCount - 1 + break + end + end + + for _, s in Tunnels.iwalk( ) + do + if s:collect( pid, exitcode ) then + break + end + end if processCount < 0 then error( 'negative number of processes!' ) end - for _, s in Syncs.iwalk( ) - do - if s:collect( pid, exitcode ) then return end - end + end -- @@ -4585,6 +5042,13 @@ function runner.cycle( error( 'runner.cycle() called while not running!' ) end + -- check and start tunnels + if not uSettings.maxProcesses + or processCount < uSettings.maxProcesses + then + Tunnels.invoke( timestamp ) + end + if uSettings.onepass then local allDone = true @@ -5221,6 +5685,8 @@ function runner.getAlarm 'at global process limit.' ) end + -- checks for tunnel alarm + checkAlarm( Tunnels.getAlarm( ) ) -- checks if a statusfile write has been delayed checkAlarm( StatusFile.getAlarm( ) )