From d32957d5a25b6369fe6c01300fe6af2ecaf4c04c Mon Sep 17 00:00:00 2001 From: Lucas Gabriel Schneider Date: Sat, 6 Oct 2018 11:51:07 -0300 Subject: [PATCH 01/22] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2361daf..a922838 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Lsyncd watches a local directory trees event monitor interface (inotify or fseve Rsync+ssh is an advanced action configuration that uses a SSH to act file and directory moves directly on the target instead of re-transmitting the move destination over the wire. -Fine-grained customization can be achieved through the config file. Custom action configs can even be written from scratch in cascading layers ranging from shell scripts to code written in the [Lua language](http://www.lua.org/). This way simple, powerful and flexible configurations can be acheived. See [the manual](https://axkibe.github.io/lsyncd/) for details. +Fine-grained customization can be achieved through the config file. Custom action configs can even be written from scratch in cascading layers ranging from shell scripts to code written in the [Lua language](http://www.lua.org/). This way simple, powerful and flexible configurations can be achieved. See [the manual](https://axkibe.github.io/lsyncd/) for details. Lsyncd 2.2.1 requires rsync >= 3.1 on all source and target machines. From 188b691bea28e25e770673ead2a8650d369e4f4a Mon Sep 17 00:00:00 2001 From: Bernard Gray Date: Wed, 5 Dec 2018 17:05:18 +1100 Subject: [PATCH 02/22] add onepass option to exit after initial sync --- default.lua | 9 ++++++++- lsyncd.c | 5 +++++ lsyncd.h | 1 + lsyncd.lua | 16 ++++++++++++++++ 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/default.lua b/default.lua index d689260..8a43598 100644 --- a/default.lua +++ b/default.lua @@ -120,7 +120,14 @@ default.collect = function agent.target, ' finished.' ) - + if settings('onepass') + then + log( + 'Normal', + 'onepass config set, exiting' + ) + terminate( 0 ) + end return 'ok' elseif rc == 'again' then diff --git a/lsyncd.c b/lsyncd.c index 5ab21fd..9bde274 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -91,6 +91,7 @@ struct settings settings = { .log_facility = LOG_USER, .log_level = LOG_NOTICE, .nodaemon = false, + .onepass = false, }; @@ -1630,6 +1631,10 @@ l_configure( lua_State *L ) { settings.nodaemon = true; } + else if( !strcmp( command, "onepass" ) ) + { + settings.onepass = true; + } else if( !strcmp( command, "logfile" ) ) { const char * file = luaL_checkstring( L, 2 ); diff --git a/lsyncd.h b/lsyncd.h index 1961a5c..9a4ec67 100644 --- a/lsyncd.h +++ b/lsyncd.h @@ -57,6 +57,7 @@ extern struct settings { int log_facility; // The syslog facility int log_level; // -1 logs everything, 0 normal mode, LOG_ERROR errors only. bool nodaemon; // True if Lsyncd shall not daemonize. + bool onepass; // True if Lsyncd should exit after first sync pass char * pidfile; // If not NULL Lsyncd writes its pid into this file. } settings; diff --git a/lsyncd.lua b/lsyncd.lua index 3c9a75a..7622808 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -75,6 +75,7 @@ local settingsCheckgauge = logfile = true, pidfile = true, nodaemon = true, + onepass = true, statusFile = true, statusInterval = true, logfacility = true, @@ -4614,6 +4615,7 @@ OPTIONS: -log [Category] Turns on logging for a debug category -logfile FILE Writes log to FILE (DEFAULT: uses syslog) -nodaemon Does not detach and logs to stdout/stderr + -onepass Sync once and exit -pidfile FILE Writes Lsyncds PID into FILE -runner FILE Loads Lsyncds lua part from FILE -version Prints versions and exits @@ -4729,6 +4731,15 @@ function runner.configure( args, monitors ) end }, + onepass = + { + 0, + function + ( ) + clSettings.onepass = true + end + }, + pidfile = { 1, @@ -4968,6 +4979,11 @@ function runner.initialize( firstTime ) lsyncd.configure( 'nodaemon' ) end + if uSettings.onepass + then + lsyncd.configure( 'onepass' ) + end + if uSettings.logfile then lsyncd.configure( 'logfile', uSettings.logfile ) From a57b80f9c832ea21c59bde920ad135986ee308ac Mon Sep 17 00:00:00 2001 From: Bernard Gray Date: Thu, 6 Dec 2018 08:41:53 +1100 Subject: [PATCH 03/22] make indenting consistent --- default.lua | 16 ++++++++-------- lsyncd.lua | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/default.lua b/default.lua index 8a43598..6a25474 100644 --- a/default.lua +++ b/default.lua @@ -120,14 +120,14 @@ default.collect = function agent.target, ' finished.' ) - if settings('onepass') - then - log( - 'Normal', - 'onepass config set, exiting' - ) - terminate( 0 ) - end + if settings('onepass') + then + log( + 'Normal', + 'onepass config set, exiting' + ) + terminate( 0 ) + end return 'ok' elseif rc == 'again' then diff --git a/lsyncd.lua b/lsyncd.lua index 7622808..36ef4ce 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -4735,7 +4735,7 @@ function runner.configure( args, monitors ) { 0, function - ( ) + ( ) clSettings.onepass = true end }, From c4b546562271fadd4209f4cf1f39d352851bde2f Mon Sep 17 00:00:00 2001 From: Thomas Nixon Date: Tue, 17 Mar 2020 00:01:44 +0000 Subject: [PATCH 04/22] allow deleting excluded files with rsync This option can't be added to both init and action, because the rsync exclude mechanism is used to select the files to send, so would delete all non-changed files. It's only relevant during init anyway, as after that no excluded files will be made on the destination. --- default-rsync.lua | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/default-rsync.lua b/default-rsync.lua index 106c58d..f846b44 100644 --- a/default-rsync.lua +++ b/default-rsync.lua @@ -62,6 +62,7 @@ rsync.checkgauge = { copy_dirlinks = true, copy_links = true, cvs_exclude = true, + delete_excluded = true, dry_run = true, executability = true, existing = true, @@ -316,7 +317,7 @@ rsync.init = function local filters = inlet.hasFilters( ) and inlet.getFilters( ) - local delete = nil + local delete = {} local target = config.target @@ -336,6 +337,11 @@ rsync.init = function delete = { '--delete', '--ignore-errors' } end + if config.rsync.delete_excluded == true + then + table.insert( delete, '--delete-excluded' ) + end + if not filters and #excludes == 0 then -- starts rsync without any filters or excludes From 39f2f3a37338163d3cbf5bb332f9bedc80282b79 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Fri, 10 Dec 2021 15:05:30 +0100 Subject: [PATCH 05/22] Finish -onepass option to exit when all syncs ran sucessfully once --- lsyncd.c | 5 ----- lsyncd.lua | 32 +++++++++++++++++++++++++++----- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/lsyncd.c b/lsyncd.c index dddc9f1..7ca6ce8 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -91,7 +91,6 @@ struct settings settings = { .log_facility = LOG_USER, .log_level = LOG_NOTICE, .nodaemon = false, - .onepass = false, }; @@ -1631,10 +1630,6 @@ l_configure( lua_State *L ) { settings.nodaemon = true; } - else if( !strcmp( command, "onepass" ) ) - { - settings.onepass = true; - } else if( !strcmp( command, "logfile" ) ) { const char * file = luaL_checkstring( L, 2 ); diff --git a/lsyncd.lua b/lsyncd.lua index 478da3f..c123e09 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -2401,6 +2401,9 @@ local Sync = ( function ' = ', exitcode ) + -- sets the initDone after the first success + self.initDone = true + else -- sets the delay on wait again local alarm = self.config.delay @@ -2804,6 +2807,10 @@ local Sync = ( function timestamp, ' )' ) + if self.disabled + then + return + end if self.processes:size( ) >= self.config.maxProcesses then @@ -2992,6 +2999,8 @@ local Sync = ( function processes = CountArray.new( ), excludes = Excludes.new( ), filters = nil, + initDone = false, + disabled = false, -- functions addBlanketDelay = addBlanketDelay, @@ -4576,6 +4585,24 @@ function runner.cycle( error( 'runner.cycle() called while not running!' ) end + if uSettings.onepass + then + local allDone = true + for i, s in Syncs.iwalk( ) + do + if s.initDone == true + then + s.disabled = true + else + allDone = false + end + end + if allDone and processCount == 0 then + log( 'Info', 'onepass active and all syncs finished. Exiting successfully') + os.exit(0) + end + end + -- -- goes through all syncs and spawns more actions -- if possibly. But only let Syncs invoke actions if @@ -5045,11 +5072,6 @@ function runner.initialize( firstTime ) lsyncd.configure( 'nodaemon' ) end - if uSettings.onepass - then - lsyncd.configure( 'onepass' ) - end - if uSettings.logfile then lsyncd.configure( 'logfile', uSettings.logfile ) From a609f34971955450c90db246e992c511cc933d5e Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Tue, 14 Dec 2021 16:45:25 +0100 Subject: [PATCH 06/22] fix compilation with lua 5.4 fixes #621 --- lsyncd.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lsyncd.c b/lsyncd.c index 7ca6ce8..4106333 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -46,6 +46,11 @@ #include #include +#if defined(LUA_VERSION_NUM) && LUA_VERSION_NUM >= 504 +#define lua_objlen lua_rawlen +#endif + + /* | The Lua part of Lsyncd */ From f65cdd282d387852c7b3d416c032fccab9f67c07 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Wed, 12 Jan 2022 20:46:14 +0100 Subject: [PATCH 07/22] finish lua2.4 changes --- flake.lock | 8 +++--- flake.nix | 66 ++++++++++++++++++++++++++++++++++++++--------- tests/testlib.lua | 41 ++++++++++++++++++++++++++--- 3 files changed, 95 insertions(+), 20 deletions(-) diff --git a/flake.lock b/flake.lock index b06a406..68efe45 100644 --- a/flake.lock +++ b/flake.lock @@ -17,16 +17,16 @@ }, "nixpkgs": { "locked": { - "lastModified": 1637709854, - "narHash": "sha256-y98gkOBUEiPAmwRhZPzTQ0YayZKPS2loNgA0GcNewMM=", + "lastModified": 1639488789, + "narHash": "sha256-Ey12CBni1jlEGoW4eH4X0hugWs25MxHMcNH4N8VVX0U=", "owner": "nixos", "repo": "nixpkgs", - "rev": "9c43581935a23d56734bd02da0ba8e7fda21e747", + "rev": "ce635e9dca8f7e2bfab19a3667d7e697c019c68b", "type": "github" }, "original": { "owner": "nixos", - "ref": "release-21.05", + "ref": "release-21.11", "repo": "nixpkgs", "type": "github" } diff --git a/flake.nix b/flake.nix index b28c6c1..8d4b5da 100644 --- a/flake.nix +++ b/flake.nix @@ -1,30 +1,65 @@ { description = "Lsyncd (Live Syncing Daemon)"; - inputs.nixpkgs.url = "github:nixos/nixpkgs/release-21.05"; + inputs.nixpkgs.url = "github:nixos/nixpkgs/release-21.11"; inputs.flake-utils.url = "github:numtide/flake-utils"; outputs = { self, nixpkgs, flake-utils }: flake-utils.lib.eachDefaultSystem (system: let - pkgs = nixpkgs.legacyPackages.${system}; + pkgs = (import nixpkgs { + inherit system; + # Makes the config pure as well. See /top-level/impure.nix: + config = { + allowBroken = true; + };}); #.legacyPackages.${system}; defaultDeps = with pkgs; [ gcc cmake + gnumake glib rsync openssh + curl ]; version = builtins.elemAt (builtins.match ''.*set\(.LSYNCD_VERSION ([0-9\.]*).*'' (builtins.substring 0 500 (builtins.readFile ./CMakeLists.txt))) 0; + mylua5_4 = pkgs.lua5_4.override({ + packageOverrides = luaself: luaprev: { + luarocks = luaprev.luarocks-3_7; + }; + }); + luaposix35 = mylua: mylua.pkgs.buildLuarocksPackage { + pname = "luaposix"; + lua = mylua; + version = "35.1-1"; + knownRockspec = (pkgs.fetchurl { + url = "https://luarocks.org/luaposix-35.1-1.rockspec"; + sha256 = "1n6c7qyabj2y95jmbhf8fxbrp9i73kphmwalsam07f9w9h995xh1"; + }).outPath; + src = pkgs.fetchurl { + url = "http://github.com/luaposix/luaposix/archive/v35.1.zip"; + sha256 = "1c03chkzwr2p1wd0hs1bafl2890fqbrfc3qk0wxbd202gc6128zi"; + }; + + # + propagatedBuildInputs = [ mylua ]; + + meta = { + homepage = "http://github.com/luaposix/luaposix/"; + description = "Lua bindings for POSIX"; + license.fullName = "MIT/X11"; + }; + }; + buildTypes = { - lua5_1 = [pkgs.lua5_1 pkgs.lua51Packages.luaposix pkgs.lua51Packages.penlight]; - lua5_2 = [pkgs.lua5_2 pkgs.lua52Packages.luaposix pkgs.lua52Packages.penlight]; - lua5_3 = [pkgs.lua5_3 pkgs.lua53Packages.luaposix pkgs.lua53Packages.penlight]; - lua5_4 = [pkgs.lua5_4 pkgs.lua54Packages.luaposix pkgs.lua54Packages.penlight]; + lua5_1 = [pkgs.lua5_1 pkgs.lua51Packages.luaposix]; + lua5_2 = [pkgs.lua5_2 pkgs.lua52Packages.luaposix]; + lua5_3 = [pkgs.lua5_3 pkgs.lua53Packages.luaposix]; + lua5_4 = [pkgs.lua5_3 (luaposix35 mylua5_4)]; }; in let @@ -33,10 +68,12 @@ name = "lsyncd"; src = ./.; - - # nativeBuildInputs = [ pkgs.qt5.wrapQtAppsHook ]; + buildInputs = defaultDeps ++ luaPackages; - }); + }); + mkDev = packages: pkgs.mkShell { + propagatedBuildInputs = defaultDeps ++ packages; + }; in { packages = { @@ -47,10 +84,15 @@ lsyncd_lua5_4 = mkLsync buildTypes.lua5_4; }; + devShells = { + lsyncd = mkDev buildTypes.lua5_3; + lsyncd_lua5_1 = mkDev buildTypes.lua5_1; + lsyncd_lua5_2 = mkDev buildTypes.lua5_2; + lsyncd_lua5_3 = mkDev buildTypes.lua5_3; + lsyncd_lua5_4 = mkDev buildTypes.lua5_4; + }; + defaultPackage = self.packages.${system}.lsyncd; - # devShell = pkgs.mkShell { - # buildInputs = defaultDeps ++ buildTypes.lua5_3; - # }; } ); } \ No newline at end of file diff --git a/tests/testlib.lua b/tests/testlib.lua index 34fee4e..5a5ba69 100644 --- a/tests/testlib.lua +++ b/tests/testlib.lua @@ -1,8 +1,7 @@ -- common testing environment posix = require( 'posix' ) string = require( 'string' ) -path = require( 'pl.path' ) -stringx = require( 'pl.stringx' ) + local sys_stat = require "posix.sys.stat" -- escape codes to colorize output on terminal @@ -94,10 +93,39 @@ function writefile return true end +function splitpath(P) + local i = #P + local ch = P:sub(i,i) + while i > 0 and ch ~= "/" do + i = i - 1 + ch = P:sub(i,i) + end + if i == 0 then + return '',P + else + return P:sub(1,i-1), P:sub(i+1) + end +end + +function isabs(p) + return string.sub(p, 1, 2) == "/" +end + +function abspath(P,pwd) + P = P:gsub('[\\/]$','') + if not isabs(P) then + local rv = posix.unistd.getcwd() .. "/" .. P + return rv + end + return P +end + + function script_path() -- local str = debug.getinfo(2, "S").source:sub(2) -- return str:match("(.*/)") - return path.dirname(path.abspath(debug.getinfo(1).short_src)) + local dir, file = splitpath(abspath(debug.getinfo(1).short_src)) + return dir end function which(exec) @@ -159,6 +187,11 @@ function startSshd() return true end + +function strip(s) + return s:match "^%s*(.-)%s*$" +end + -- -- Stop test ssh server -- @@ -169,7 +202,7 @@ function stopSshd() then return false end - pid = stringx.strip(f:read("*a")) + pid = strip(f:read("*a")) posix.kill(tonumber(pid)) end From e2a27af6e7f91d8cc0f63087eb2e80dc287435dd Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Wed, 12 Jan 2022 20:46:41 +0100 Subject: [PATCH 08/22] use relative sh path --- lsyncd.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lsyncd.lua b/lsyncd.lua index c123e09..f0b2df4 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -5425,7 +5425,7 @@ function spawnShell command, -- the shell command ... -- additonal arguments ) - return spawn( agent, '/bin/sh', '-c', command, '/bin/sh', ... ) + return spawn( agent, 'sh', '-c', command, 'sh', ... ) end From efa8155b682b5062cd903ae7cf7c877c86289077 Mon Sep 17 00:00:00 2001 From: Marko Oldenburg Date: Mon, 14 Feb 2022 12:52:46 +0100 Subject: [PATCH 09/22] fix error - bad binary format (version mismatch) The problem is that the generated make files end up referencing a mix of lua versions This patch will fix it. --- cmake/FindLua.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/FindLua.cmake b/cmake/FindLua.cmake index a20af5b..0ed5b56 100644 --- a/cmake/FindLua.cmake +++ b/cmake/FindLua.cmake @@ -36,7 +36,7 @@ SET(_POSSIBLE_LUA_INCLUDE include include/lua) #SET(_POSSIBLE_LUA_LIBRARY lua) # Determine possible naming suffixes (there is no standard for this) -SET(_POSSIBLE_SUFFIXES "52" "5.2" "-5.2" "53" "5.3" "-5.3" "") +SET(_POSSIBLE_SUFFIXES "54" "5.4" "-5.4" "53" "5.3" "-5.3" "52" "5.2" "-5.2" "") # Set up possible search names and locations FOREACH(_SUFFIX IN LISTS _POSSIBLE_SUFFIXES) From b7d11f6b006c8fd49c43aeeda609c989f58c5d94 Mon Sep 17 00:00:00 2001 From: Marko Oldenburg Date: Thu, 24 Feb 2022 14:40:46 +0100 Subject: [PATCH 10/22] change header discription --- cmake/FindLua.cmake | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmake/FindLua.cmake b/cmake/FindLua.cmake index 0ed5b56..6628209 100644 --- a/cmake/FindLua.cmake +++ b/cmake/FindLua.cmake @@ -16,6 +16,7 @@ #============================================================================= # Copyright 2007-2009 Kitware, Inc. # Modified to support Lua 5.2 by LuaDist 2012 +# Modified to support Lua 5.4 by LuaDist 2022 # # Distributed under the OSI-approved BSD License (the "License"); # see accompanying file Copyright.txt for details. @@ -27,7 +28,7 @@ # (To distribute this file outside of CMake, substitute the full # License text for the above reference.) # -# This module will try to find the newest Lua version down to 5.2 +# This module will try to find the newest Lua version down to 5.4 # Always search for non-versioned lua first (recommended) SET(_POSSIBLE_LUA_INCLUDE include include/lua) From a410ddebb88bdc346476e80b3e50f5d2e35b4e41 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Fri, 4 Mar 2022 11:47:28 +0100 Subject: [PATCH 11/22] Install man page to correct subfolder fixes #655 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e30821a..8c92de6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -118,6 +118,6 @@ add_executable( lsyncd ${LSYNCD_SRC} ) target_link_libraries( lsyncd ${LUA_LIBRARIES} ) install( TARGETS lsyncd RUNTIME DESTINATION bin ) -install( FILES doc/manpage/lsyncd.1 DESTINATION man ) +install( FILES doc/manpage/lsyncd.1 DESTINATION ${CMAKE_INSTALL_MANDIR}/man1 COMPONENT man ) install( DIRECTORY examples DESTINATION doc ) From 2841ef85285fa57975de2d8ec054896d61ddabd5 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Fri, 11 Mar 2022 07:12:39 +0100 Subject: [PATCH 12/22] add .editorconfig --- .editorconfig | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..7708c2b --- /dev/null +++ b/.editorconfig @@ -0,0 +1,8 @@ +# 4 tab indentation + +indent_style = tab +indent_size = 4 + +[*.nix] +indent_style = space +indent_size = 2 From acff33211c6637c9752f0716cfe911ea7bac775f Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Fri, 11 Mar 2022 07:11:39 +0100 Subject: [PATCH 13/22] [WIP] Add tunnel support --- default.lua | 1 + lsyncd.c | 38 +++ lsyncd.lua | 686 +++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 615 insertions(+), 110 deletions(-) diff --git a/default.lua b/default.lua index 6a25474..e180fd7 100644 --- a/default.lua +++ b/default.lua @@ -52,6 +52,7 @@ default.checkgauge = { prepare = true, source = true, target = true, + tunnel = true, } -- diff --git a/lsyncd.c b/lsyncd.c index 4106333..f6b55ab 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -443,6 +443,20 @@ printlogf0(lua_State *L, } +/* + | Print a traceback of the error + */ +static int traceback (lua_State *L) { + lua_getglobal(L, "debug"); + lua_getfield(L, -1, "traceback"); + lua_pushvalue(L, 1); + lua_pushinteger(L, 2); + lua_call(L, 2, 1); + printlogf( L, "traceback", "%s", lua_tostring(L, -1) ); + return 1; +} + + /*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~* ( Simple memory management ) *~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/ @@ -1083,6 +1097,28 @@ l_now(lua_State *L) return 1; } +/* +| Sends a signal to proceess pid +| +| Params on Lua stack: +| 1: pid +| 2: signal +| +| Returns on Lua stack: +| return value of kill +*/ +static int +l_kill( lua_State *L ) +{ + pid_t pid = luaL_checkinteger( L, 1 ); + int sig = luaL_checkinteger( L, 2 ); + + int rv = kill(pid, sig ); + + lua_pushinteger( L, rv ); + + return 1; +} /* | Executes a subprocess. Does not wait for it to return. @@ -1844,6 +1880,7 @@ static const luaL_Reg lsyncdlib[] = { "exec", l_exec }, { "log", l_log }, { "now", l_now }, + { "kill", l_kill }, { "nonobserve_fd", l_nonobserve_fd }, { "observe_fd", l_observe_fd }, { "readdir", l_readdir }, @@ -2796,6 +2833,7 @@ main1( int argc, char *argv[] ) lsyncd_config_file, lua_tostring( L, -1 ) ); + traceback(L); exit( -1 ); } diff --git a/lsyncd.lua b/lsyncd.lua index f0b2df4..5949f05 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -28,16 +28,6 @@ end lsyncd_version = '2.2.3' --- --- Hides the core interface from user scripts. --- -local _l = lsyncd -lsyncd = nil - -local lsyncd = _l -_l = nil - - -- -- Shortcuts (which user is supposed to be able to use them as well) -- @@ -47,6 +37,120 @@ now = lsyncd.now readdir = lsyncd.readdir +inheritKV = nil + +-- +-- Recurvely inherits a source table to a destionation table +-- copying all keys from source. +-- +-- All entries with integer keys are inherited as additional +-- sources for non-verbatim tables +-- +inherit = function +( + cd, -- table copy destination + cs, -- table copy source + verbatim, -- forced verbatim ( for e.g. 'exitcodes' ) + ignored -- table of keys not to copy +) + -- First copies all entries with non-integer keys. + -- + -- Tables are merged; already present keys are not + -- overwritten + -- + -- For verbatim tables integer keys are treated like + -- non-integer keys + for k, v in pairs( cs ) + do + if type(ignored) == 'table' and table.contains(ignored, k) + then + -- do nothing + -- print("ignore x", k) + elseif + ( + type( k ) ~= 'number' + or verbatim + or cs._verbatim == true + ) + and + ( + type( cs ) ~= CountArray + and type( cs._merge ) ~= 'table' + or cs._merge[ k ] == true + ) + then + inheritKV( cd, k, v ) + end + end + + -- recursevely inherits all integer keyed tables + -- ( for non-verbatim tables ) + if cs._verbatim ~= true + then + for k, v in ipairs( cs ) + do + if type( v ) == 'table' + then + inherit( cd, v ) + else + cd[ #cd + 1 ] = v + end + end + + end +end + + +table.contains = function +( + t, -- array to search in. Only the numeric values are tested + needle -- value to search for +) + for _, v in ipairs(t) do + if needle == v then + return true + end + end + return false +end + +-- lsyncd.inherit = inherit +-- print("inherit ") + + +-- +-- Helper to inherit. Inherits one key. +-- +inheritKV = + function( + cd, -- table copy destination + k, -- key + v -- value + ) + + -- don't merge inheritance controls + if k == '_merge' or k == '_verbatim' then return end + + local dtype = type( cd [ k ] ) + + if type( v ) == 'table' + then + if dtype == 'nil' + then + cd[ k ] = { } + inherit( cd[ k ], v, k == 'exitcodes' ) + elseif + dtype == 'table' and + v._merge ~= false + then + inherit( cd[ k ], v, k == 'exitcodes' ) + end + elseif dtype == 'nil' + then + cd[ k ] = v + end +end + -- -- Coping globals to ensure userscripts cannot change this. -- @@ -54,6 +158,8 @@ local log = log local terminate = terminate local now = now local readdir = readdir +local inherit = inherit +local inheritKV = inheritKV -- -- Predeclarations. @@ -201,6 +307,10 @@ local CountArray = ( function t, -- table being accessed k -- key used to access ) + if k == '_merge' or k == '_verbatim' + then + return nil + end if type( k ) ~= 'number' then error( 'Key "' .. k .. '" invalid for CountArray', 2 ) @@ -991,6 +1101,7 @@ local Combiner = ( function -- The new delay replaces the old one if it's a file. -- local function logReplace + ( d1, -- old delay d2 -- new delay @@ -2812,6 +2923,14 @@ local Sync = ( function return end + -- tunnel configured but not up + if self.config.tunnel and + self.config.tunnel:isReady() == false then + log('Tunnel', 'Tunnel for Sync ', self.config.name, ' not ready. Blocking events') + self.config.tunnel:blockSync(self) + return + end + if self.processes:size( ) >= self.config.maxProcesses then -- no new processes @@ -3001,6 +3120,7 @@ local Sync = ( function filters = nil, initDone = false, disabled = false, + tunnelBlock = nil, -- functions addBlanketDelay = addBlanketDelay, @@ -3095,6 +3215,422 @@ local Sync = ( function end )( ) +-- +-- Basic Tunnel provider. +-- +Tunnel = (function() + + Tunnel = {} + + local TUNNEL_CMD_TYPES = { + CMD = 1, + CHK = 2 + } + + local TUNNEL_STATUS = { + DOWN = 0, + CONNECTING = 1, + UP = 2, + RETRY_TIMEOUT = 3, + } + + local nextTunnelName = 1 + + Tunnel.defaults = { + mode = "command", + oneShoot = false, + command = nil, + checkCommand = nil, + checkExitCodes = {0}, + checkMaxFailed = 5, + reconnectDelay = 10 + } + -- export constants + Tunnel.TUNNEL_CMD_TYPES = TUNNEL_CMD_TYPES + Tunnel.TUNNEL_STATUS = TUNNEL_STATUS + + function Tunnel.new( + options + ) + local rv = { + processes = CountArray.new( ), + blocks = {}, + ready = false, + checksFailed = 0, + status = TUNNEL_STATUS.DOWN, + alarm = false + } + -- provides a default name if needed + if options.name ~= nil + then + options.name = 'Tunnel' .. nextTunnelName + end + + nextTunnelName = nextTunnelName + 1 + + inherit(options, Tunnel.defaults) + + rv.options = options + + inherit(rv, Tunnel) + + --setmetatable(rv, Tunnel) + -- self.__index = self + + return rv + end + + -- + -- Returns next alarm + -- + function Tunnel:getAlarm() + return self.alarm + end + + -- + -- Check if the tunnel is up + function Tunnel:check() + + end + + function Tunnel:isReady() + return self.status == TUNNEL_STATUS.UP + end + + -- + -- Check if the tunnel is up + function Tunnel:invoke(timestamp) + -- lsyncd.kill() + -- check if child processes are running + if self.status == TUNNEL_STATUS.CONNECTING then + local good = true + for pid, type in ipairs(self.processes) do + if type == TUNNEL_CMD_TYPES.CMD + and lsyncd.kill(pid, 0) ~= 0 then + -- required command does not exist + good = false + end + end + if good then + log( + 'Tunnel', + 'Setup of tunnel ', self.options.name, ' sucessfull' + ) + self.status = TUNNEL_STATUS.UP + self.alarm = false + self:unblockSyncs() + else + -- not ready, postpone next check + self.alarm = now() + 1 + end + end + + if self.status == TUNNEL_STATUS.DOWN then + self:start() + elseif self.status == TUNNEL_STATUS.RETRY_TIMEOUT then + log( + 'Tunnel', + 'Retry setup ', self.options.name + ) + if self.alarm <= timestamp then + self:start() + end + end + end + + -- + -- Check if Sync is already blocked by Tunnel + function Tunnel:getBlockerForSync( + sync + ) + for _, eblock in ipairs(self.blocks) do + if eblock.sync == sync then + return eblock + end + end + return nil + end + + -- + -- Create a block on the sync until the tunnel reaches ready state + function Tunnel:blockSync( + sync + ) + local block = self:getBlockerForSync(sync) + + if block then + -- delay the block by another second + block:wait( now( ) + 1 ) + return + end + + local block = sync:addBlanketDelay() + sync.tunnelBlock = block + + table.insert (self.blocks, block) + -- set the new delay to be a block for existing delays + for _, eblock in sync.delays:qpairs() do + if eblock ~= block then + eblock:blockedBy(block) + end + end + -- delay tunnel check by 1 second + block:wait( now( ) + 1 ) + end + + -- + -- Create a block on the sync until the tunnel reaches ready state + function Tunnel:unblockSyncs() + for i,blk in ipairs(self.blocks) do + blk.sync:removeDelay(blk) + blk.sync.tunnelBlock = nil + end + self.blocks = {} + end + + function Tunnel:start() + if self.status == TUNNEL_STATUS.CONNECTING or + self.status == TUNNEL_STATUS.UP then + return + end + + if self.options.mode == "command" then + self.status = TUNNEL_STATUS.CONNECTING + self.alarm = false + log( + 'Info', + 'Start tunnel command ', + self.options.command + ) + if #self.options.command < 1 then + log('Error', + '', + self.options + ) + error( 'start tunnel of mode command with empty command', 2 ) + -- FIXME: add line which tunnel was called + return false + end + local bin = self.options.command[1] + for _,v in ipairs(self.options.command) do + if type( v ) ~= 'string' then + error( 'tunnel command must be a list of strings', 2 ) + end + end + local pid = lsyncd.exec(bin, table.unpack(self.options.command, 2)) + --local pid = spawn(bin, table.unpack(self.options.command, 2)) + if pid and pid > 0 then + self.processes[pid] = TUNNEL_CMD_TYPES.CMD + self.checksFailed = 0 + self.alarm = now() + 1 + else + self.alarm = now() + self.options.reconnectDelay + self.status = TUNNEL_STATUS.RETRY_TIMEOUT + end + + else + error('unknown tunnel mode:' .. self.options.mode) + self.status = TUNNEL_STATUS.DOWN + end + end + + -- + -- collect pids of exited child processes. Restart the tunnel if necessary + --- + function Tunnel:collect ( + pid, + exitcode + ) + local ctype = self.processes[pid] + -- cases in which the tunnel command is handled + if ctype == TUNNEL_CMD_TYPES.CMD then + if self.options.onShot then + log( + 'Info', + 'Tunnel setup complete.', + self.options.name + ) + self.status = TUNNEL_STATUS.UP + else + log( + 'Info', + 'Tunnel died. Restarting', + self.options.name + ) + self.status = TUNNEL_STATUS.DOWN + self.alarm = now() + 1 + self:start() + end + -- cases in which the check function has executed a program + elseif ctype == TUNNEL_CMD_TYPES.CHK then + local found = false + if type(self.options.checkExitCodes) == 'table' then + + for _,i in iwalk(self.options.checkExitCodes) do + if exitcode == i then + found = true + end + end + else + if self.options.checkExitCodes == exitcode then + found = true + end + end + if found then + if self.ready == false then + log( + 'Info', + 'Tunnel setup complete ', + self.options.name + ) + end + self.ready = true + else + if self.ready == true then + log( + 'Info', + 'Check failed.', + self.options.name + ) + self.checksFailed = self.checksFailed + 1 + if self.checksFailed > self.options.checkMaxFailed then + self:kill() + end + end + self.ready = false + end + end + self.processes[pid] = nil + end + + -- + -- Stops all tunnel processes + -- + function Tunnel:kill () + for pid, typ in pairs(self.processes) do + if typ == TUNNEL_CMD_TYPES.CMD then + lsyncd.kill(pid, 9) + end + end + self.status = TUNNEL_STATUS.DOWN + end + + function Tunnel:isReady () + return self.status == TUNNEL_STATUS.UP + end + + return Tunnel +end)() -- Tunnel scope + +-- +-- Tunnels - a singleton +-- +-- Tunnels maintains all configured tunnels. +-- +local Tunnels = ( function + ( ) + -- + -- the list of all tunnels + -- + local tunnelList = Array.new( ) + + -- + -- Returns sync at listpos i + -- + local function get + ( i ) + return tunnelList[ i ]; + end + + -- + -- Adds a new tunnel. + -- + local function add + ( + tunnel + ) + table.insert( tunnelList, tunnel ) + + return tunnel + end + + -- + -- Allows a for-loop to walk through all syncs. + -- + local function iwalk + ( ) + return ipairs( tunnelList ) + end + + -- + -- Returns the number of syncs. + -- + local size = function + ( ) + return #tunnelList + end + + local nextCycle = false + -- + -- Cycle through all tunnels and call their invoke function + -- + local function invoke(timestamp) + if nextCycle and nextCycle > timestamp then + return + end + for _,tunnel in ipairs( tunnelList ) + do + print("invoke t", tunnel) + tunnel:invoke(timestamp) + end + nextCycle = now() + 5 + end + + -- + -- returns the next alarm + -- + local function getAlarm() + local rv = nextCycle + for _, tunnel in ipairs( tunnelList ) do + local ta = tunnel:getAlarm() + if ta ~= false + and ta < rv then + rv = ta + end + end + + return rv + end + + -- + -- Public interface + -- + return { + add = add, + get = get, + iwalk = iwalk, + size = size, + invoke = invoke, + getAlarm = getAlarm + } + end )( ) + + +-- +-- create a new tunnel from the passed options and registers the tunnel +tunnel = function (options) + log( + 'Debug', + 'create tunnel:', options + ) + local rv = Tunnel.new(options) + Tunnels.add(rv) + + return rv + +end + + -- -- Syncs - a singleton -- @@ -3144,100 +3680,6 @@ local Syncs = ( function return syncsList[ i ]; end - -- - -- Helper function for inherit - -- defined below - -- - local inheritKV - - -- - -- Recurvely inherits a source table to a destionation table - -- copying all keys from source. - -- - -- All entries with integer keys are inherited as additional - -- sources for non-verbatim tables - -- - local function inherit - ( - cd, -- table copy destination - cs, -- table copy source - verbatim -- forced verbatim ( for e.g. 'exitcodes' ) - ) - -- First copies all entries with non-integer keys. - -- - -- Tables are merged; already present keys are not - -- overwritten - -- - -- For verbatim tables integer keys are treated like - -- non-integer keys - for k, v in pairs( cs ) - do - if - ( - type( k ) ~= 'number' - or verbatim - or cs._verbatim == true - ) - and - ( - type( cs._merge ) ~= 'table' - or cs._merge[ k ] == true - ) - then - inheritKV( cd, k, v ) - end - end - - -- recursevely inherits all integer keyed tables - -- ( for non-verbatim tables ) - if cs._verbatim ~= true - then - for k, v in ipairs( cs ) - do - if type( v ) == 'table' - then - inherit( cd, v ) - else - cd[ #cd + 1 ] = v - end - end - - end - end - - -- - -- Helper to inherit. Inherits one key. - -- - inheritKV = - function( - cd, -- table copy destination - k, -- key - v -- value - ) - - -- don't merge inheritance controls - if k == '_merge' or k == '_verbatim' then return end - - local dtype = type( cd [ k ] ) - - if type( v ) == 'table' - then - if dtype == 'nil' - then - cd[ k ] = { } - inherit( cd[ k ], v, k == 'exitcodes' ) - elseif - dtype == 'table' and - v._merge ~= false - then - inherit( cd[ k ], v, k == 'exitcodes' ) - end - elseif dtype == 'nil' - then - cd[ k ] = v - end - end - -- -- Adds a new sync. @@ -3265,13 +3707,18 @@ local Syncs = ( function config = { } - inherit( config, uconfig ) + -- inherit the config but do not deep copy the tunnel object + -- the tunnel object is a reference to a object that might be shared + inherit( config, uconfig, nil, {"tunnel"} ) -- -- last and least defaults are inherited -- inherit( config, default ) + -- copy references + config.tunnel = uconfig.tunnel + local inheritSettings = { 'delay', 'maxDelays', @@ -4529,17 +4976,27 @@ function runner.collectProcess pid, -- process id exitcode -- exitcode ) - processCount = processCount - 1 + for _, s in Syncs.iwalk( ) + do + if s:collect( pid, exitcode ) then + processCount = processCount - 1 + break + end + end + + for _, s in Tunnels.iwalk( ) + do + if s:collect( pid, exitcode ) then + break + end + end if processCount < 0 then error( 'negative number of processes!' ) end - for _, s in Syncs.iwalk( ) - do - if s:collect( pid, exitcode ) then return end - end + end -- @@ -4585,6 +5042,13 @@ function runner.cycle( error( 'runner.cycle() called while not running!' ) end + -- check and start tunnels + if not uSettings.maxProcesses + or processCount < uSettings.maxProcesses + then + Tunnels.invoke( timestamp ) + end + if uSettings.onepass then local allDone = true @@ -5221,6 +5685,8 @@ function runner.getAlarm 'at global process limit.' ) end + -- checks for tunnel alarm + checkAlarm( Tunnels.getAlarm( ) ) -- checks if a statusfile write has been delayed checkAlarm( StatusFile.getAlarm( ) ) From dcf611d4f86622873e7ea8658c1da3d7e7a1a5b8 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Fri, 11 Mar 2022 07:17:44 +0100 Subject: [PATCH 14/22] bump version --- CMakeLists.txt | 2 +- lsyncd.lua | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8c92de6..2bc6d93 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ # preamble project( Lsyncd ) cmake_minimum_required( VERSION 3.10 ) -set( LSYNCD_VERSION 2.2.3 ) +set( LSYNCD_VERSION 2.3.0-beta1 ) set( CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/" ) diff --git a/lsyncd.lua b/lsyncd.lua index 5949f05..c433774 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -25,7 +25,7 @@ then lsyncd.terminate( -1 ) end -lsyncd_version = '2.2.3' +lsyncd_version = '2.3.0-beta1' -- From cce6e5423b9f869ab53ea12df8c6daeec1defd54 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Wed, 16 Mar 2022 01:12:42 +0100 Subject: [PATCH 15/22] Updates to tunnel logic. Delay before tunnel is considered up. Working delay if restart fails. Disabled state. Kill all tunnel processes on graceful exit --- lsyncd.lua | 145 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 97 insertions(+), 48 deletions(-) diff --git a/lsyncd.lua b/lsyncd.lua index c433774..d46553b 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -2419,11 +2419,11 @@ local Sync = ( function end end end - - + + -- -- Returns true if the relative path is excluded or filtered - -- + -- local function testFilter ( self, -- the Sync @@ -2477,7 +2477,9 @@ local Sync = ( function local delay = self.processes[ pid ] -- not a child of this sync? - if not delay then return end + if not delay then + return false + end if delay.status then @@ -2562,6 +2564,8 @@ local Sync = ( function end self.processes[ pid ] = nil + -- we handled this process + return true end -- @@ -3229,9 +3233,10 @@ Tunnel = (function() local TUNNEL_STATUS = { DOWN = 0, - CONNECTING = 1, - UP = 2, - RETRY_TIMEOUT = 3, + DISABLED = 1, + CONNECTING = 2, + UP = 3, + RETRY_TIMEOUT = 4, } local nextTunnelName = 1 @@ -3243,7 +3248,8 @@ Tunnel = (function() checkCommand = nil, checkExitCodes = {0}, checkMaxFailed = 5, - reconnectDelay = 10 + retryDelay = 10, + readyDelay = 5 } -- export constants Tunnel.TUNNEL_CMD_TYPES = TUNNEL_CMD_TYPES @@ -3261,7 +3267,7 @@ Tunnel = (function() alarm = false } -- provides a default name if needed - if options.name ~= nil + if options.name == nil then options.name = 'Tunnel' .. nextTunnelName end @@ -3303,14 +3309,20 @@ Tunnel = (function() -- lsyncd.kill() -- check if child processes are running if self.status == TUNNEL_STATUS.CONNECTING then - local good = true - for pid, type in ipairs(self.processes) do - if type == TUNNEL_CMD_TYPES.CMD - and lsyncd.kill(pid, 0) ~= 0 then - -- required command does not exist - good = false + -- we can only be good if processes exist + local good = self.processes:size() > 0 or self.options.onShoot == true + + for pid, pd in self.processes:walk() do + if pd.type == TUNNEL_CMD_TYPES.CMD then + -- process needs to run for at least some time + if (pd.started + self.options.readyDelay) > timestamp + or lsyncd.kill(pid, 0) ~= 0 then + -- required command does not exist› + good = false + end end end + if good then log( 'Tunnel', @@ -3328,13 +3340,15 @@ Tunnel = (function() if self.status == TUNNEL_STATUS.DOWN then self:start() elseif self.status == TUNNEL_STATUS.RETRY_TIMEOUT then - log( - 'Tunnel', - 'Retry setup ', self.options.name - ) if self.alarm <= timestamp then + log( + 'Tunnel', + 'Retry setup ', self.options.name + ) self:start() end + elseif self.status == TUNNEL_STATUS.DISABLED then + self.alarm = false end end @@ -3375,7 +3389,7 @@ Tunnel = (function() end end -- delay tunnel check by 1 second - block:wait( now( ) + 1 ) + block:wait( now( ) + 1 ) end -- @@ -3393,7 +3407,7 @@ Tunnel = (function() self.status == TUNNEL_STATUS.UP then return end - + if self.options.mode == "command" then self.status = TUNNEL_STATUS.CONNECTING self.alarm = false @@ -3420,11 +3434,14 @@ Tunnel = (function() local pid = lsyncd.exec(bin, table.unpack(self.options.command, 2)) --local pid = spawn(bin, table.unpack(self.options.command, 2)) if pid and pid > 0 then - self.processes[pid] = TUNNEL_CMD_TYPES.CMD + self.processes[pid] = { + type = TUNNEL_CMD_TYPES.CMD, + started = now() + } self.checksFailed = 0 self.alarm = now() + 1 else - self.alarm = now() + self.options.reconnectDelay + self.alarm = now() + self.options.retryDelay self.status = TUNNEL_STATUS.RETRY_TIMEOUT end @@ -3441,9 +3458,15 @@ Tunnel = (function() pid, exitcode ) - local ctype = self.processes[pid] + local proc = self.processes[pid] + + if proc == nil then + return false + end + log('Debug', + "collect tunnel event. pid: ", pid," exitcode: ", exitcode) -- cases in which the tunnel command is handled - if ctype == TUNNEL_CMD_TYPES.CMD then + if proc.type == TUNNEL_CMD_TYPES.CMD then if self.options.onShot then log( 'Info', @@ -3452,17 +3475,28 @@ Tunnel = (function() ) self.status = TUNNEL_STATUS.UP else - log( - 'Info', - 'Tunnel died. Restarting', + if self.status == TUNNEL_STATUS.CONNECTING then + log( + 'Warning', + 'Starting tunnel failed.', self.options.name - ) - self.status = TUNNEL_STATUS.DOWN - self.alarm = now() + 1 - self:start() + ) + self.status = TUNNEL_STATUS.RETRY_TIMEOUT + self.alarm = now() + 5 + else + log( + 'Info', + 'Tunnel died. Restarting', + self.options.name + ) + + self.status = TUNNEL_STATUS.DOWN + self.alarm = true + self:start() + end end -- cases in which the check function has executed a program - elseif ctype == TUNNEL_CMD_TYPES.CHK then + elseif proc.type == TUNNEL_CMD_TYPES.CHK then local found = false if type(self.options.checkExitCodes) == 'table' then @@ -3507,12 +3541,14 @@ Tunnel = (function() -- Stops all tunnel processes -- function Tunnel:kill () - for pid, typ in pairs(self.processes) do - if typ == TUNNEL_CMD_TYPES.CMD then + log('Tunnel', 'Shutdown tunnel ', self.options.name) + for pid, pr in self.processes:walk() do + if pr.type == TUNNEL_CMD_TYPES.CMD then + log('Tunnel','Kill process ', pid) lsyncd.kill(pid, 9) end end - self.status = TUNNEL_STATUS.DOWN + self.status = TUNNEL_STATUS.DISABLED end function Tunnel:isReady () @@ -3550,10 +3586,10 @@ local Tunnels = ( function tunnel ) table.insert( tunnelList, tunnel ) - + return tunnel end - + -- -- Allows a for-loop to walk through all syncs. -- @@ -3561,7 +3597,7 @@ local Tunnels = ( function ( ) return ipairs( tunnelList ) end - + -- -- Returns the number of syncs. -- @@ -3575,12 +3611,8 @@ local Tunnels = ( function -- Cycle through all tunnels and call their invoke function -- local function invoke(timestamp) - if nextCycle and nextCycle > timestamp then - return - end for _,tunnel in ipairs( tunnelList ) do - print("invoke t", tunnel) tunnel:invoke(timestamp) end nextCycle = now() + 5 @@ -3601,7 +3633,22 @@ local Tunnels = ( function return rv end - + + -- + -- closes all tunnels + -- + local function killAll() + local rv = true + for _, tunnel in ipairs( tunnelList ) do + local ta = tunnel:kill() + if ta ~= true then + rv = false + end + end + + return rv + end + -- -- Public interface -- @@ -3611,7 +3658,8 @@ local Tunnels = ( function iwalk = iwalk, size = size, invoke = invoke, - getAlarm = getAlarm + getAlarm = getAlarm, + killAll = killAll } end )( ) @@ -3625,11 +3673,11 @@ tunnel = function (options) ) local rv = Tunnel.new(options) Tunnels.add(rv) - + return rv end - + -- -- Syncs - a singleton @@ -5033,6 +5081,7 @@ function runner.cycle( return true else + Tunnels.killAll() return false end end @@ -5567,7 +5616,7 @@ function runner.initialize( firstTime ) -- makes sure the user gave Lsyncd anything to do if Syncs.size() == 0 then - log( 'Error', 'Nothing to watch!' ) + log( 'Error', 'Nothing to watch!' ) os.exit( -1 ) end From 60e6505473f184464c6d76aab5f6a2d5a3a1f252 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Wed, 16 Mar 2022 17:38:03 +0100 Subject: [PATCH 16/22] Add function to substitude commands with placeholders --- lsyncd.lua | 24 ++++++++++++++++++++++++ tests/utils_test.lua | 13 +++++++++++++ 2 files changed, 37 insertions(+) diff --git a/lsyncd.lua b/lsyncd.lua index d46553b..3a1adcf 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -3983,6 +3983,30 @@ function splitQuotedString return rv end +function replaceCommand(cmd, data) + assert(type(data) == "table") + local getData = function(arg) + print(arg, data) + local rv = data[arg] + if rv ~= nil then + return rv + else + return "" + end + end + if type(cmd) == "string" then + return string.gsub(cmd, "%${(%w+)}", getData) + elseif type(cmd) == "table" then + local rv = {} + for i, v in ipairs(cmd) do + rv[i] = string.gsub(v, "%${(%w+)}", getData) + end + return rv + else + log("Error", "Unsupported type in replacCommand") + end +end + -- -- Interface to inotify. -- diff --git a/tests/utils_test.lua b/tests/utils_test.lua index c7e6942..c73441a 100644 --- a/tests/utils_test.lua +++ b/tests/utils_test.lua @@ -9,4 +9,17 @@ assert(isTableEqual( {"-p", "22", "-i", "/home/test/bla blu/id_rsa"} )) +-- test string replacement +local testData = { + localPort = 1234, + localHost = "localhorst" +} +assert(replaceCommand("echo ssh ${localHost}:${localPort}", testData) == + "echo ssh localhorst:1234") + +assert(isTableEqual( + replaceCommand({"-p${doesNotExist}", "2${localHost}2", "-i '${localPort}'"}, testData), + {"-p", "2localhorst2", "-i '1234'"} +)) + os.exit(0) \ No newline at end of file From f722ec14f85bb29490ee82538ddd7c48f8cf15b3 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Thu, 17 Mar 2022 02:25:27 +0100 Subject: [PATCH 17/22] Report tunnel status in status file --- lsyncd.c | 30 ++++++++++++++++++++++++++++++ lsyncd.lua | 48 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/lsyncd.c b/lsyncd.c index f6b55ab..f2b23dd 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -1923,6 +1923,33 @@ l_jiffies_add( lua_State *L ) } } +/* +| Adds a number in seconds to a jiffy timestamp. +*/ +static int +l_jiffies_concat( lua_State *L ) +{ + char buf[1024]; + clock_t *p1 = ( clock_t * ) lua_touserdata( L, 1 ); + clock_t *p2 = ( clock_t * ) lua_touserdata( L, 2 ); + + if( p1 && p2 ) + { + logstring( "Error", "Cannot add two timestamps!" ); + exit( -1 ); + } + + { + if (p1) { + snprintf( buf, sizeof(buf), "%Lf", (long double)(*p1)); + lua_pushfstring(L, "%s%s", &buf, luaL_checkstring( L, 2)); + } else { + snprintf( buf, sizeof(buf), "%Lf", (long double)(*p2)); + lua_pushfstring(L, "%s%s", luaL_checkstring( L, 1), &buf); + } + return 1; + } +} /* | Subracts two jiffy timestamps resulting in a number in seconds @@ -2029,6 +2056,9 @@ register_lsyncd( lua_State *L ) lua_pushcfunction( L, l_jiffies_eq ); lua_setfield( L, mt, "__eq" ); + lua_pushcfunction( L, l_jiffies_concat ); + lua_setfield( L, mt, "__concat" ); + lua_pop( L, 1 ); // pop(mt) #ifdef WITH_INOTIFY diff --git a/lsyncd.lua b/lsyncd.lua index 3a1adcf..996d796 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -3232,11 +3232,12 @@ Tunnel = (function() } local TUNNEL_STATUS = { - DOWN = 0, - DISABLED = 1, - CONNECTING = 2, - UP = 3, - RETRY_TIMEOUT = 4, + UNKNOWN = 0, + DOWN = 1, + DISABLED = 2, + CONNECTING = 3, + UP = 4, + RETRY_TIMEOUT = 5, } local nextTunnelName = 1 @@ -3286,6 +3287,16 @@ Tunnel = (function() return rv end + -- Returns the status of tunnel as text + function Tunnel:statusText() + for n, i in pairs(TUNNEL_STATUS) do + if self.status == i then + return n + end + end + return TUNNEL_STATUS.UNKNOWN + end + -- -- Returns next alarm -- @@ -3555,7 +3566,24 @@ Tunnel = (function() return self.status == TUNNEL_STATUS.UP end + -- + -- Writes a status report about this tunnel + -- + function Tunnel:statusReport(f) + f:write( 'Tunnel: name=', self.options.name, ' status=', self:statusText(), '\n' ) + + f:write( 'Running processes: ', self.processes:size( ), '\n') + + for pid, prc in self.processes:walk( ) + do + f:write(" pid=", pid, " type=", prc.type, " started=", ''..prc.started, '\n') + end + + f:write( '\n' ) + end + return Tunnel + end)() -- Tunnel scope -- @@ -3659,7 +3687,8 @@ local Tunnels = ( function size = size, invoke = invoke, getAlarm = getAlarm, - killAll = killAll + killAll = killAll, + statusReport = statusReport } end )( ) @@ -4878,6 +4907,13 @@ local StatusFile = ( function f:write( '\n' ) end + for i, t in Tunnels.iwalk( ) + do + t:statusReport( f ) + + f:write( '\n' ) + end + Inotify.statusReport( f ) f:close( ) From cf7cfe1bdaf71cd8ae86bf5fa1969aa4f8174a0d Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Thu, 17 Mar 2022 02:26:00 +0100 Subject: [PATCH 18/22] Add function to return a free port --- lsyncd.c | 57 ++++++++++++++++++++++++++++++++++++++++++++ tests/utils_test.lua | 2 ++ 2 files changed, 59 insertions(+) diff --git a/lsyncd.c b/lsyncd.c index f2b23dd..c3c3420 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -19,6 +19,8 @@ #define SYSLOG_NAMES 1 +#include +#include #include #include #include @@ -1120,6 +1122,60 @@ l_kill( lua_State *L ) return 1; } +/* +| Returns a free port of host +| +| Params on Lua stack: +| (not yet) 1: hostname or ip for bind +| +| Returns on Lua stack: +| return integer of free port +| +*/ +static int +l_free_port(lua_State *L) { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock < 0) { + printf("error opening socket\n"); + goto error; + } + + struct sockaddr_in serv_addr; + memset((char *) &serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = 0; + if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + if(errno == EADDRINUSE) { + printf("the port is not available. already to other process\n"); + goto error; + } else { + printf("could not bind to process (%d) %s\n", errno, strerror(errno)); + goto error; + } + } + + socklen_t len = sizeof(serv_addr); + if (getsockname(sock, (struct sockaddr *)&serv_addr, &len) == -1) { + goto error; + } + + lua_pushinteger(L, ntohs(serv_addr.sin_port)); + + if (close (sock) < 0 ) { + printf("did not close: %s\n", strerror(errno)); + lua_pop ( L, 1 ); + goto error; + } + + return 1; +error: + lua_pushnil(L); + return 1; +} + + + /* | Executes a subprocess. Does not wait for it to return. | @@ -1881,6 +1937,7 @@ static const luaL_Reg lsyncdlib[] = { "log", l_log }, { "now", l_now }, { "kill", l_kill }, + { "get_free_port", l_free_port }, { "nonobserve_fd", l_nonobserve_fd }, { "observe_fd", l_observe_fd }, { "readdir", l_readdir }, diff --git a/tests/utils_test.lua b/tests/utils_test.lua index c73441a..f6f94c4 100644 --- a/tests/utils_test.lua +++ b/tests/utils_test.lua @@ -22,4 +22,6 @@ assert(isTableEqual( {"-p", "2localhorst2", "-i '1234'"} )) +assert(type(lsyncd.get_free_port()) == "number") + os.exit(0) \ No newline at end of file From d708dca0f2ea3e94b5a45d1a3380d285fe952611 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Thu, 17 Mar 2022 02:27:02 +0100 Subject: [PATCH 19/22] Call user provideable check function --- lsyncd.lua | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lsyncd.lua b/lsyncd.lua index 996d796..d0f7403 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -3305,9 +3305,9 @@ Tunnel = (function() end -- - -- Check if the tunnel is up + -- User supplied function to check if tunnel is up function Tunnel:check() - + return true end function Tunnel:isReady() @@ -3318,6 +3318,11 @@ Tunnel = (function() -- Check if the tunnel is up function Tunnel:invoke(timestamp) -- lsyncd.kill() + if self:check() == false then + -- check failed, consider tunnel broken + self.status = TUNNEL_STATUS.DOWN + end + -- check if child processes are running if self.status == TUNNEL_STATUS.CONNECTING then -- we can only be good if processes exist From b712aea0c7f3f07f1d7ce3fe8a30ba79962590a5 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Thu, 17 Mar 2022 02:29:23 +0100 Subject: [PATCH 20/22] rename traceback function --- lsyncd.c | 4 ++-- lsyncd.lua | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/lsyncd.c b/lsyncd.c index c3c3420..a84aed9 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -448,7 +448,7 @@ printlogf0(lua_State *L, /* | Print a traceback of the error */ -static int traceback (lua_State *L) { +static int l_traceback (lua_State *L) { lua_getglobal(L, "debug"); lua_getfield(L, -1, "traceback"); lua_pushvalue(L, 1); @@ -2920,7 +2920,7 @@ main1( int argc, char *argv[] ) lsyncd_config_file, lua_tostring( L, -1 ) ); - traceback(L); + l_traceback(L); exit( -1 ); } diff --git a/lsyncd.lua b/lsyncd.lua index d0f7403..32dd250 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -4020,7 +4020,6 @@ end function replaceCommand(cmd, data) assert(type(data) == "table") local getData = function(arg) - print(arg, data) local rv = data[arg] if rv ~= nil then return rv From 07ec98174aafb13be224df7854470319454fffa6 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Wed, 23 Mar 2022 23:41:02 +0100 Subject: [PATCH 21/22] Call runner cleanup method when exiting through error codes --- lsyncd.c | 85 ++++++++++++++++++++++++++++++++++-------------------- lsyncd.lua | 13 +++++++++ 2 files changed, 66 insertions(+), 32 deletions(-) diff --git a/lsyncd.c b/lsyncd.c index a84aed9..8fac8ac 100644 --- a/lsyncd.c +++ b/lsyncd.c @@ -142,6 +142,22 @@ int pidfile_fd = 0; static long clocks_per_sec; +/* +| Dummy variable of which it's address is used as +| the cores index in the lua registry to +| the lua runners function table in the lua registry. +*/ +static int runner; + + +/* +| Dummy variable of which it's address is used as +| the cores index n the lua registry to +| the lua runners error handler. +*/ +static int callError; + + /** * signal handler */ @@ -449,6 +465,7 @@ printlogf0(lua_State *L, | Print a traceback of the error */ static int l_traceback (lua_State *L) { + // runner.callError lua_getglobal(L, "debug"); lua_getfield(L, -1, "traceback"); lua_pushvalue(L, 1); @@ -458,6 +475,25 @@ static int l_traceback (lua_State *L) { return 1; } +/* + | Call runners terminate function and exit with given exit code + */ +static void safeexit (lua_State *L, int exitcode) { + // load_runner_func(L, "teardown"); + // pushes the function + lua_pushlightuserdata( L, (void *) &runner ); + lua_gettable( L, LUA_REGISTRYINDEX ); + lua_pushstring( L, "teardown" ); + lua_gettable( L, -2 ); + lua_remove( L, -2 ); + lua_pushinteger(L, exitcode); + lua_call(L, 2, 1); + if (lua_isinteger(L, -1)) { + exitcode = luaL_checkinteger(L, -1); + } + exit(exitcode); +} + /*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~* ( Simple memory management ) @@ -626,23 +662,6 @@ pipe_tidy( struct observance * observance ) ( Helper Routines ) *~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/ - -/* -| Dummy variable of which it's address is used as -| the cores index in the lua registry to -| the lua runners function table in the lua registry. -*/ -static int runner; - - -/* -| Dummy variable of which it's address is used as -| the cores index n the lua registry to -| the lua runners error handler. -*/ -static int callError; - - /* | Sets the close-on-exit flag of a file descriptor. */ @@ -718,7 +737,7 @@ write_pidfile pidfile ); - exit( -1 ); + safeexit(L, -1 ); } int rc = lockf( pidfile_fd, F_TLOCK, 0 ); @@ -731,7 +750,7 @@ write_pidfile pidfile ); - exit( -1 ); + safeexit(L, -1 ); } snprintf( buf, sizeof( buf ), "%i\n", getpid( ) ); @@ -939,7 +958,7 @@ user_obs_ready( // calls the user function if( lua_pcall( L, 1, 0, -3 ) ) { - exit( -1 ); + safeexit(L, -1 ); } lua_pop( L, 2 ); @@ -975,7 +994,7 @@ user_obs_writey( // calls the user function if( lua_pcall( L, 1, 0, -3 ) ) { - exit(-1); + safeexit(L, -1); } lua_pop( L, 2 ); @@ -1300,7 +1319,7 @@ l_exec( lua_State *L ) "in spawn(), expected a string after pipe '<'" ); - exit( -1 ); + safeexit(L, -1 ); } pipe_text = lua_tolstring( L, 3, &pipe_len ); @@ -1312,7 +1331,7 @@ l_exec( lua_State *L ) { logstring( "Error", "cannot create a pipe!" ); - exit( -1 ); + safeexit(L, -1 ); } // always closes the write end for child processes @@ -1960,7 +1979,7 @@ l_jiffies_add( lua_State *L ) if( p1 && p2 ) { logstring( "Error", "Cannot add two timestamps!" ); - exit( -1 ); + safeexit(L, -1 ); } { @@ -1993,7 +2012,7 @@ l_jiffies_concat( lua_State *L ) if( p1 && p2 ) { logstring( "Error", "Cannot add two timestamps!" ); - exit( -1 ); + safeexit(L, -1 ); } { @@ -2291,7 +2310,7 @@ masterloop(lua_State *L) if( lua_pcall( L, 0, 1, -2 ) ) { - exit( -1 ); + safeexit(L, -1 ); } if( lua_type( L, -1 ) == LUA_TBOOLEAN) @@ -2477,7 +2496,9 @@ masterloop(lua_State *L) lua_pushinteger( L, WEXITSTATUS( status ) ); if ( lua_pcall( L, 2, 0, -4 ) ) - { exit(-1); } + { + safeexit(L, -1); + } lua_pop( L, 1 ); } @@ -2489,7 +2510,7 @@ masterloop(lua_State *L) if( lua_pcall( L, 0, 0, -2 ) ) { - exit( -1 ); + safeexit( L, -1 ); } lua_pop( L, 1 ); @@ -2506,7 +2527,7 @@ masterloop(lua_State *L) if( lua_pcall( L, 1, 0, -3 ) ) { - exit( -1 ); + safeexit(L, -1 ); } lua_pop( L, 1 ); @@ -2522,7 +2543,7 @@ masterloop(lua_State *L) if( lua_pcall( L, 1, 1, -3 ) ) { - exit( -1 ); + safeexit(L, -1 ); } if( !lua_toboolean( L, -1 ) ) @@ -2540,7 +2561,7 @@ masterloop(lua_State *L) "internal, stack is dirty." ); l_stackdump( L ); - exit( -1 ); + safeexit(L, -1 ); } } } @@ -2922,7 +2943,7 @@ main1( int argc, char *argv[] ) ); l_traceback(L); - exit( -1 ); + safeexit(L, -1 ); } } diff --git a/lsyncd.lua b/lsyncd.lua index 32dd250..8101d34 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -5880,6 +5880,19 @@ function runner.term end +-- +-- Called by core on a term signal. +-- +function runner.teardown + ( + exitCode -- exitcode that will be returned + ) + -- ensure we will all stray tunnels when we hard exit + Tunnels.killAll() + + return exitCode +end + --============================================================================ -- Lsyncd runner's user interface --============================================================================ From bfc604a591b698b52f8d41540efa946e5c1a3c7a Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Wed, 23 Mar 2022 23:44:19 +0100 Subject: [PATCH 22/22] Implement tunnel pool mode. In this mode, multiple tunnel processes are started and connection a load balanced on the pool of connections. Example config: ... sync { default.rsync, tunnel = tunnel { command = {"ssh", "-N", "-L", "localhost:${localport}:localhost:873", "user@testmachine"}, mode = "pool", parallel = 2, }, target = "rsync://localhost:${localport}/test", ... } --- default-rsync.lua | 8 +- lsyncd.lua | 388 ++++++++++++++++++++++++++++++------------- tests/utils_test.lua | 4 +- 3 files changed, 278 insertions(+), 122 deletions(-) diff --git a/default-rsync.lua b/default-rsync.lua index 3eedb7f..e422b72 100644 --- a/default-rsync.lua +++ b/default-rsync.lua @@ -129,6 +129,9 @@ rsync.action = function -- gets all events ready for syncing local elist = inlet.getEvents( eventNotInitBlank ) + local substitudes = inlet.getSubstitutionData(elist, {}) + local target = substitudeCommands(config.target, substitudes) + -- gets the list of paths for the event list -- deletes create multi match patterns local paths = elist.getPaths( ) @@ -237,7 +240,7 @@ rsync.action = function '--include-from=-', '--exclude=*', config.source, - config.target + target ) end @@ -332,6 +335,9 @@ rsync.init = function target = config.host .. ':' .. config.targetdir end + local substitudes = inlet.getSubstitutionData(event, {}) + target = substitudeCommands(target, substitudes) + if config.delete == true or config.delete == 'startup' then diff --git a/lsyncd.lua b/lsyncd.lua index 8101d34..5f1493a 100644 --- a/lsyncd.lua +++ b/lsyncd.lua @@ -1905,6 +1905,20 @@ local InletFactory = ( function -- TODO give a readonly handler only. return sync.config end, + + -- + -- Returns the sync for this Inlet + -- + getSync = function( sync ) + return sync + end, + + -- + -- Substitutes parameters in arguments + -- + getSubstitutionData = function( sync, event, data) + return sync.getSubstitutionData(sync, event, data) + end, } -- @@ -3106,6 +3120,16 @@ local Sync = ( function f:write( '\n' ) end + -- + -- Returns substitude data for event + -- + local function getSubstitutionData(self, event, data) + if self.config.tunnel then + data = self.config.tunnel:getSubstitutionData(event, data) + end + return data + end + -- -- Creates a new Sync. -- @@ -3141,6 +3165,7 @@ local Sync = ( function removeDelay = removeDelay, rmExclude = rmExclude, statusReport = statusReport, + getSubstitutionData = getSubstitutionData, } s.inlet = InletFactory.newInlet( s ) @@ -3238,23 +3263,43 @@ Tunnel = (function() CONNECTING = 3, UP = 4, RETRY_TIMEOUT = 5, + UP_RETRY_TIMEOUT = 6, + } + + local TUNNEL_MODES = { + COMMAND = "command", + POOL = "pool", + } + + local TUNNEL_DISTRIBUTION = { + ROUNDROBIN = "rr", + } + + local TUNNEL_SUBSTITIONS = { + "localhost", + "localport" } local nextTunnelName = 1 Tunnel.defaults = { - mode = "command", - oneShoot = false, + mode = TUNNEL_MODES.COMMAND, + parallel = 1, + distribution = TUNNEL_DISTRIBUTION.ROUNDROBIN, command = nil, checkCommand = nil, checkExitCodes = {0}, checkMaxFailed = 5, retryDelay = 10, - readyDelay = 5 + readyDelay = 5, + localhost = 'localhost', } -- export constants Tunnel.TUNNEL_CMD_TYPES = TUNNEL_CMD_TYPES Tunnel.TUNNEL_STATUS = TUNNEL_STATUS + Tunnel.TUNNEL_MODES = TUNNEL_MODES + Tunnel.TUNNEL_DISTRIBUTION = TUNNEL_DISTRIBUTION + Tunnel.TUNNEL_SUBSTITIONS = TUNNEL_SUBSTITIONS function Tunnel.new( options @@ -3263,9 +3308,10 @@ Tunnel = (function() processes = CountArray.new( ), blocks = {}, ready = false, - checksFailed = 0, + retryCount = 0, status = TUNNEL_STATUS.DOWN, - alarm = false + alarm = false, + rrCounter = 0, } -- provides a default name if needed if options.name == nil @@ -3287,16 +3333,20 @@ Tunnel = (function() return rv end - -- Returns the status of tunnel as text - function Tunnel:statusText() + function Tunnel.statusToText(status) for n, i in pairs(TUNNEL_STATUS) do - if self.status == i then + if status == i then return n end end return TUNNEL_STATUS.UNKNOWN end + -- Returns the status of tunnel as text + function Tunnel:statusText() + return Tunnel.statusToText(self.status) + end + -- -- Returns next alarm -- @@ -3311,7 +3361,42 @@ Tunnel = (function() end function Tunnel:isReady() - return self.status == TUNNEL_STATUS.UP + return self.status == TUNNEL_STATUS.UP or + self.status == TUNNEL_STATUS.UP_RETRY_TIMEOUT + end + + function Tunnel:setStatus(status) + log('Tunnel',self.options.name,': status change: ', + self:statusText(), " -> ", Tunnel.statusToText(status)) + self.status = status + end + + -- + -- Returns the number of processes currently running + -- + function Tunnel:countProcs(timestamp) + local run = 0 + local starting = 0 + local dead = 0 + if timestamp == nil then + timestamp = now() + end + + for pid, pd in self.processes:walk() do + if pd.type == TUNNEL_CMD_TYPES.CMD then + -- process needs to run for at least some time + if lsyncd.kill(pid, 0) ~= 0 then + dead = dead + 1 + elseif (pd.started + self.options.readyDelay) > timestamp then + starting = starting + 1 + else + pd.ready = true + run = run + 1 + end + end + end + + return run, starting, dead end -- @@ -3320,51 +3405,64 @@ Tunnel = (function() -- lsyncd.kill() if self:check() == false then -- check failed, consider tunnel broken - self.status = TUNNEL_STATUS.DOWN + self:setStatus(TUNNEL_STATUS.DOWN) end - -- check if child processes are running - if self.status == TUNNEL_STATUS.CONNECTING then - -- we can only be good if processes exist - local good = self.processes:size() > 0 or self.options.onShoot == true - - for pid, pd in self.processes:walk() do - if pd.type == TUNNEL_CMD_TYPES.CMD then - -- process needs to run for at least some time - if (pd.started + self.options.readyDelay) > timestamp - or lsyncd.kill(pid, 0) ~= 0 then - -- required command does not exist› - good = false - end - end - end - - if good then - log( - 'Tunnel', - 'Setup of tunnel ', self.options.name, ' sucessfull' - ) - self.status = TUNNEL_STATUS.UP - self.alarm = false - self:unblockSyncs() - else - -- not ready, postpone next check - self.alarm = now() + 1 - end - end - - if self.status == TUNNEL_STATUS.DOWN then - self:start() - elseif self.status == TUNNEL_STATUS.RETRY_TIMEOUT then + if self.status == TUNNEL_STATUS.RETRY_TIMEOUT then if self.alarm <= timestamp then log( 'Tunnel', 'Retry setup ', self.options.name ) self:start() + return + else + -- timeout not yet reached + self.alarm = now() + 1 + return end + elseif self.status == TUNNEL_STATUS.DOWN then + self:start() + return elseif self.status == TUNNEL_STATUS.DISABLED then self.alarm = false + return + end + + local parallel = self.options.parallel + local run, starting, dead = self:countProcs(timestamp) + + -- check if enough child processes are running + if self.status == TUNNEL_STATUS.CONNECTING then + if run > 0 then + log( + 'Tunnel', + 'Setup of tunnel ', self.options.name, ' sucessfull' + ) + self:setStatus(TUNNEL_STATUS.UP) + self:unblockSyncs() + end + elseif self.status == TUNNEL_STATUS.UP and run == 0 then + -- no good process running, degrade + log( + 'Tunnel', + 'Tunnel ', self.options.name, ' changed to CONNECTING' + ) + self:setStatus(TUNNEL_STATUS.CONNECTING) + end + + local spawned = 0 + -- start more processes if necesarry + while run + starting + spawned < self.options.parallel do + self:spawn() + spawned = spawned + 1 + end + + -- trigger next delay + if starting + spawned == 0 then + self.alarm = false + else + self.alarm = now() + 1 end end @@ -3418,52 +3516,73 @@ Tunnel = (function() self.blocks = {} end + -- + -- Spawn a single tunnel program + -- + function Tunnel:spawn() + local opts = { + type = TUNNEL_CMD_TYPES.CMD, + started = now(), + localhost = self.options.localhost, + ready = false, + } + local cmd = self.options.command + + if self.options.mode == TUNNEL_MODES.POOL then + opts.localport = lsyncd.get_free_port() + end + cmd = substitudeCommands(cmd, opts) + + if #cmd < 1 then + log('Error', + '', + self.options + ) + error( 'start tunnel of mode command with empty command', 2 ) + -- FIXME: add line which tunnel was called + return false + end + local bin = cmd[1] + -- for _,v in ipairs(cmd) do + -- if type( v ) ~= 'string' then + -- error( 'tunnel command must be a list of strings', 2 ) + -- end + -- end + log( + 'Info', + 'Start tunnel command ', + cmd + ) + local pid = lsyncd.exec(bin, table.unpack(cmd, 2)) + --local pid = spawn(bin, table.unpack(self.options.command, 2)) + if pid and pid > 0 then + self.processes[pid] = opts + self.retryCount = 0 + self.alarm = now() + 1 + else + self.alarm = now() + self.options.retryDelay + if self.status == TUNNEL_STATUS.UP then + self:setStatus(TUNNEL_STATUS.UP_RETRY_TIMEOUT) + else + self:setStatus(TUNNEL_STATUS.RETRY_TIMEOUT) + end + end + end + function Tunnel:start() - if self.status == TUNNEL_STATUS.CONNECTING or - self.status == TUNNEL_STATUS.UP then + if self.status == TUNNEL_STATUS.UP or + self.status == TUNNEL_STATUS.CONNECTING then return end - if self.options.mode == "command" then - self.status = TUNNEL_STATUS.CONNECTING - self.alarm = false - log( - 'Info', - 'Start tunnel command ', - self.options.command - ) - if #self.options.command < 1 then - log('Error', - '', - self.options - ) - error( 'start tunnel of mode command with empty command', 2 ) - -- FIXME: add line which tunnel was called - return false - end - local bin = self.options.command[1] - for _,v in ipairs(self.options.command) do - if type( v ) ~= 'string' then - error( 'tunnel command must be a list of strings', 2 ) - end - end - local pid = lsyncd.exec(bin, table.unpack(self.options.command, 2)) - --local pid = spawn(bin, table.unpack(self.options.command, 2)) - if pid and pid > 0 then - self.processes[pid] = { - type = TUNNEL_CMD_TYPES.CMD, - started = now() - } - self.checksFailed = 0 - self.alarm = now() + 1 - else - self.alarm = now() + self.options.retryDelay - self.status = TUNNEL_STATUS.RETRY_TIMEOUT - end + if self.options.mode == TUNNEL_MODES.COMMAND or + self.options.mode == TUNNEL_MODES.POOL then + self:setStatus(TUNNEL_STATUS.CONNECTING) + self:invoke(now()) else error('unknown tunnel mode:' .. self.options.mode) - self.status = TUNNEL_STATUS.DOWN + self:setStatus(TUNNEL_STATUS.DISABLED) end end @@ -3481,73 +3600,67 @@ Tunnel = (function() end log('Debug', "collect tunnel event. pid: ", pid," exitcode: ", exitcode) + local run, starting, dead = self:countProcs() -- cases in which the tunnel command is handled if proc.type == TUNNEL_CMD_TYPES.CMD then - if self.options.onShot then + if self.status == TUNNEL_STATUS.CONNECTING then + log( + 'Warning', + 'Starting tunnel failed.', + self.options.name + ) + self:setStatus(TUNNEL_STATUS.RETRY_TIMEOUT) + self.alarm = now() + self.options.retryDelay + else log( 'Info', - 'Tunnel setup complete.', + 'Tunnel died. Will Restarting', self.options.name ) - self.status = TUNNEL_STATUS.UP - else - if self.status == TUNNEL_STATUS.CONNECTING then - log( - 'Warning', - 'Starting tunnel failed.', - self.options.name - ) - self.status = TUNNEL_STATUS.RETRY_TIMEOUT - self.alarm = now() + 5 - else - log( - 'Info', - 'Tunnel died. Restarting', - self.options.name - ) - self.status = TUNNEL_STATUS.DOWN - self.alarm = true - self:start() + if run == 0 then + self:setStatus(TUNNEL_STATUS.DOWN) end + self.alarm = true end -- cases in which the check function has executed a program elseif proc.type == TUNNEL_CMD_TYPES.CHK then - local found = false + local good = false if type(self.options.checkExitCodes) == 'table' then for _,i in iwalk(self.options.checkExitCodes) do if exitcode == i then - found = true + good = true end end else if self.options.checkExitCodes == exitcode then - found = true + good = true end end - if found then - if self.ready == false then + if good then + if self.isReady() == false then log( 'Info', - 'Tunnel setup complete ', - self.options.name + self.options.name, + ' Tunnel setup complete ' ) end - self.ready = true + self:setStatus(TUNNEL_STATUS.UP) + self.checksFailed = 0 else - if self.ready == true then + if self.ready then log( - 'Info', - 'Check failed.', + 'Tunnel', self.options.name + ' Check failed.' ) self.checksFailed = self.checksFailed + 1 if self.checksFailed > self.options.checkMaxFailed then self:kill() end end - self.ready = false + self:setStatus(TUNNEL_STATUS.DOWN) end end self.processes[pid] = nil @@ -3564,13 +3677,50 @@ Tunnel = (function() lsyncd.kill(pid, 9) end end - self.status = TUNNEL_STATUS.DISABLED + self:setStatus(TUNNEL_STATUS.DISABLED) end function Tunnel:isReady () return self.status == TUNNEL_STATUS.UP end + -- + -- Fills/changes the opts table with additional values + -- for the transfer to be started + -- + function Tunnel:getSubstitutionData(event, opts) + local useProc, useProcLast = nil, nil + if self.options.mode == TUNNEL_MODES.POOL then + if self.options.distribution == TUNNEL_DISTRIBUTION.ROUNDROBIN then + local i = 0 + for pid, proc in self.processes:walk() do + if proc.ready == true then + useProcLast = proc + if (i % self.processes:size()) == self.rrCounter then + useProc = proc + self.rrCounter = self.rrCounter + 1 + end + end + end + if useProc == nil then + self.rrCounter = 0 + useProc = useProcLast + end + else + log('Tunnel', 'Unknown distribution mode: ', self.options.distribution) + os.exit(1) + end + end + if useProc then + for k,v in pairs(self.TUNNEL_SUBSTITIONS) do + if useProc[v] ~= nil then + opts[v] = useProc[v] + end + end + end + return opts + end + -- -- Writes a status report about this tunnel -- @@ -3581,7 +3731,7 @@ Tunnel = (function() for pid, prc in self.processes:walk( ) do - f:write(" pid=", pid, " type=", prc.type, " started=", ''..prc.started, '\n') + f:write(" pid=", pid, " type=", prc.type, " started="..prc.started, '\n') end f:write( '\n' ) @@ -4017,7 +4167,7 @@ function splitQuotedString return rv end -function replaceCommand(cmd, data) +function substitudeCommands(cmd, data) assert(type(data) == "table") local getData = function(arg) local rv = data[arg] @@ -5892,7 +6042,7 @@ function runner.teardown return exitCode end - + --============================================================================ -- Lsyncd runner's user interface --============================================================================ diff --git a/tests/utils_test.lua b/tests/utils_test.lua index f6f94c4..c793c3f 100644 --- a/tests/utils_test.lua +++ b/tests/utils_test.lua @@ -14,11 +14,11 @@ local testData = { localPort = 1234, localHost = "localhorst" } -assert(replaceCommand("echo ssh ${localHost}:${localPort}", testData) == +assert(substitudeCommands("echo ssh ${localHost}:${localPort}", testData) == "echo ssh localhorst:1234") assert(isTableEqual( - replaceCommand({"-p${doesNotExist}", "2${localHost}2", "-i '${localPort}'"}, testData), + substitudeCommands({"-p${doesNotExist}", "2${localHost}2", "-i '${localPort}'"}, testData), {"-p", "2localhorst2", "-i '1234'"} ))