diff --git a/cmd/stdisco/main.go b/cmd/stdisco/main.go index ffea6711e..7b820fcc4 100644 --- a/cmd/stdisco/main.go +++ b/cmd/stdisco/main.go @@ -7,6 +7,7 @@ package main import ( + "context" "crypto/rand" "encoding/binary" "flag" @@ -47,14 +48,16 @@ func main() { log.Println("My ID:", myID) } - runbeacon(beacon.NewMulticast(mc), fake) - runbeacon(beacon.NewBroadcast(bc), fake) + ctx := context.Background() + + runbeacon(ctx, beacon.NewMulticast(mc), fake) + runbeacon(ctx, beacon.NewBroadcast(bc), fake) select {} } -func runbeacon(bc beacon.Interface, fake bool) { - go bc.Serve() +func runbeacon(ctx context.Context, bc beacon.Interface, fake bool) { + go bc.Serve(ctx) go recv(bc) if fake { go send(bc) diff --git a/cmd/stdiscosrv/apisrv.go b/cmd/stdiscosrv/apisrv.go index db1b446d8..f3fcedb95 100644 --- a/cmd/stdiscosrv/apisrv.go +++ b/cmd/stdiscosrv/apisrv.go @@ -66,12 +66,12 @@ func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, } } -func (s *apiSrv) Serve() { +func (s *apiSrv) Serve(ctx context.Context) error { if s.useHTTP { listener, err := net.Listen("tcp", s.addr) if err != nil { log.Println("Listen:", err) - return + return err } s.listener = listener } else { @@ -93,7 +93,7 @@ func (s *apiSrv) Serve() { tlsListener, err := tls.Listen("tcp", s.addr, tlsCfg) if err != nil { log.Println("Listen:", err) - return + return err } s.listener = tlsListener } @@ -107,9 +107,11 @@ func (s *apiSrv) Serve() { MaxHeaderBytes: httpMaxHeaderBytes, } - if err := srv.Serve(s.listener); err != nil { + err := srv.Serve(s.listener) + if err != nil { log.Println("Serve:", err) } + return err } var topCtx = context.Background() diff --git a/cmd/stdiscosrv/database.go b/cmd/stdiscosrv/database.go index 653681822..f38ec4c27 100644 --- a/cmd/stdiscosrv/database.go +++ b/cmd/stdiscosrv/database.go @@ -10,6 +10,7 @@ package main import ( + "context" "log" "sort" "time" @@ -37,7 +38,6 @@ type database interface { type levelDBStore struct { db *leveldb.DB inbox chan func() - stop chan struct{} clock clock marshalBuf []byte } @@ -50,7 +50,6 @@ func newLevelDBStore(dir string) (*levelDBStore, error) { return &levelDBStore{ db: db, inbox: make(chan func(), 16), - stop: make(chan struct{}), clock: defaultClock{}, }, nil } @@ -155,7 +154,7 @@ func (s *levelDBStore) get(key string) (DatabaseRecord, error) { return rec, nil } -func (s *levelDBStore) Serve() { +func (s *levelDBStore) Serve(ctx context.Context) error { t := time.NewTimer(0) defer t.Stop() defer s.db.Close() @@ -183,7 +182,7 @@ loop: // the next. t.Reset(databaseStatisticsInterval) - case <-s.stop: + case <-ctx.Done(): // We're done. close(statisticsTrigger) break loop @@ -192,6 +191,8 @@ loop: // Also wait for statisticsServe to return <-statisticsDone + + return nil } func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- struct{}) { @@ -255,10 +256,6 @@ func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- stru } } -func (s *levelDBStore) Stop() { - close(s.stop) -} - // merge returns the merged result of the two database records a and b. The // result is the union of the two address sets, with the newer expiry time // chosen for any duplicates. diff --git a/cmd/stdiscosrv/database_test.go b/cmd/stdiscosrv/database_test.go index be7340810..781ac0a2a 100644 --- a/cmd/stdiscosrv/database_test.go +++ b/cmd/stdiscosrv/database_test.go @@ -7,6 +7,7 @@ package main import ( + "context" "fmt" "os" "testing" @@ -20,8 +21,9 @@ func TestDatabaseGetSet(t *testing.T) { if err != nil { t.Fatal(err) } - go db.Serve() - defer db.Stop() + ctx, cancel := context.WithCancel(context.Background()) + go db.Serve(ctx) + defer cancel() // Check missing record diff --git a/cmd/stdiscosrv/main.go b/cmd/stdiscosrv/main.go index 74479951f..f1e96e065 100644 --- a/cmd/stdiscosrv/main.go +++ b/cmd/stdiscosrv/main.go @@ -7,6 +7,7 @@ package main import ( + "context" "crypto/tls" "flag" "log" @@ -21,7 +22,7 @@ import ( "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/tlsutil" "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" ) const ( @@ -183,5 +184,5 @@ func main() { } // Engage! - main.Serve() + main.Serve(context.Background()) } diff --git a/cmd/stdiscosrv/replication.go b/cmd/stdiscosrv/replication.go index 462033af6..f84e97ecc 100644 --- a/cmd/stdiscosrv/replication.go +++ b/cmd/stdiscosrv/replication.go @@ -7,6 +7,7 @@ package main import ( + "context" "crypto/tls" "encoding/binary" "fmt" @@ -32,7 +33,6 @@ type replicationSender struct { cert tls.Certificate // our certificate allowedIDs []protocol.DeviceID outbox chan ReplicationRecord - stop chan struct{} } func newReplicationSender(dst string, cert tls.Certificate, allowedIDs []protocol.DeviceID) *replicationSender { @@ -41,11 +41,10 @@ func newReplicationSender(dst string, cert tls.Certificate, allowedIDs []protoco cert: cert, allowedIDs: allowedIDs, outbox: make(chan ReplicationRecord, replicationOutboxSize), - stop: make(chan struct{}), } } -func (s *replicationSender) Serve() { +func (s *replicationSender) Serve(ctx context.Context) error { // Sleep a little at startup. Peers often restart at the same time, and // this avoid the service failing and entering backoff state // unnecessarily, while also reducing the reconnect rate to something @@ -62,7 +61,7 @@ func (s *replicationSender) Serve() { conn, err := tls.Dial("tcp", s.dst, tlsCfg) if err != nil { log.Println("Replication connect:", err) - return + return err } defer func() { conn.SetWriteDeadline(time.Now().Add(time.Second)) @@ -73,13 +72,13 @@ func (s *replicationSender) Serve() { remoteID, err := deviceID(conn) if err != nil { log.Println("Replication connect:", err) - return + return err } // Verify it's in the set of allowed device IDs. if !deviceIDIn(remoteID, s.allowedIDs) { log.Println("Replication connect: unexpected device ID:", remoteID) - return + return err } heartBeatTicker := time.NewTicker(replicationHeartbeatInterval) @@ -122,20 +121,16 @@ func (s *replicationSender) Serve() { replicationSendsTotal.WithLabelValues("error").Inc() log.Println("Replication write:", err) // Yes, we are loosing the replication event here. - return + return err } replicationSendsTotal.WithLabelValues("success").Inc() - case <-s.stop: - return + case <-ctx.Done(): + return nil } } } -func (s *replicationSender) Stop() { - close(s.stop) -} - func (s *replicationSender) String() string { return fmt.Sprintf("replicationSender(%q)", s.dst) } @@ -172,7 +167,6 @@ type replicationListener struct { cert tls.Certificate allowedIDs []protocol.DeviceID db database - stop chan struct{} } func newReplicationListener(addr string, cert tls.Certificate, allowedIDs []protocol.DeviceID, db database) *replicationListener { @@ -181,11 +175,10 @@ func newReplicationListener(addr string, cert tls.Certificate, allowedIDs []prot cert: cert, allowedIDs: allowedIDs, db: db, - stop: make(chan struct{}), } } -func (l *replicationListener) Serve() { +func (l *replicationListener) Serve(ctx context.Context) error { tlsCfg := &tls.Config{ Certificates: []tls.Certificate{l.cert}, ClientAuth: tls.RequestClientCert, @@ -196,14 +189,14 @@ func (l *replicationListener) Serve() { lst, err := tls.Listen("tcp", l.addr, tlsCfg) if err != nil { log.Println("Replication listen:", err) - return + return err } defer lst.Close() for { select { - case <-l.stop: - return + case <-ctx.Done(): + return nil default: } @@ -211,7 +204,7 @@ func (l *replicationListener) Serve() { conn, err := lst.Accept() if err != nil { log.Println("Replication accept:", err) - return + return err } // Figure out the other side device ID @@ -231,19 +224,15 @@ func (l *replicationListener) Serve() { continue } - go l.handle(conn) + go l.handle(ctx, conn) } } -func (l *replicationListener) Stop() { - close(l.stop) -} - func (l *replicationListener) String() string { return fmt.Sprintf("replicationListener(%q)", l.addr) } -func (l *replicationListener) handle(conn net.Conn) { +func (l *replicationListener) handle(ctx context.Context, conn net.Conn) { defer func() { conn.SetWriteDeadline(time.Now().Add(time.Second)) conn.Close() @@ -253,7 +242,7 @@ func (l *replicationListener) handle(conn net.Conn) { for { select { - case <-l.stop: + case <-ctx.Done(): return default: } diff --git a/cmd/strelaysrv/main.go b/cmd/strelaysrv/main.go index 0d59c24fe..964f4b0cc 100644 --- a/cmd/strelaysrv/main.go +++ b/cmd/strelaysrv/main.go @@ -3,6 +3,7 @@ package main import ( + "context" "crypto/tls" "flag" "fmt" @@ -194,7 +195,9 @@ func main() { mapping := mapping{natSvc.NewMapping(nat.TCP, addr.IP, addr.Port)} if natEnabled { - go natSvc.Serve() + ctx, cancel := context.WithCancel(context.Background()) + go natSvc.Serve(ctx) + defer cancel() found := make(chan struct{}) mapping.OnChanged(func(_ *nat.Mapping, _, _ []nat.Address) { select { diff --git a/cmd/strelaysrv/testutil/main.go b/cmd/strelaysrv/testutil/main.go index 609848763..fdbb971f9 100644 --- a/cmd/strelaysrv/testutil/main.go +++ b/cmd/strelaysrv/testutil/main.go @@ -20,7 +20,8 @@ import ( ) func main() { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() log.SetOutput(os.Stdout) log.SetFlags(log.LstdFlags | log.Lshortfile) @@ -62,7 +63,7 @@ func main() { } log.Println("Created client") - go relay.Serve() + go relay.Serve(ctx) recv := make(chan protocol.SessionInvitation) diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 8db13ff8d..02e58fc77 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -8,6 +8,7 @@ package main import ( "bytes" + "context" "crypto/tls" "flag" "fmt" @@ -41,6 +42,7 @@ import ( "github.com/syncthing/syncthing/lib/syncthing" "github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/upgrade" + "github.com/syncthing/syncthing/lib/util" "github.com/pkg/errors" ) @@ -321,7 +323,7 @@ func main() { } if err != nil { l.Warnln("Command line options:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } if options.logFile == "default" || options.logFile == "" { @@ -358,7 +360,7 @@ func main() { ) if err != nil { l.Warnln("Error reading device ID:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } fmt.Println(protocol.NewDeviceID(cert.Certificate[0])) @@ -368,7 +370,7 @@ func main() { if options.browserOnly { if err := openGUI(protocol.EmptyDeviceID); err != nil { l.Warnln("Failed to open web UI:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } return } @@ -376,7 +378,7 @@ func main() { if options.generateDir != "" { if err := generate(options.generateDir); err != nil { l.Warnln("Failed to generate config and keys:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } return } @@ -384,14 +386,14 @@ func main() { // Ensure that our home directory exists. if err := ensureDir(locations.GetBaseDir(locations.ConfigBaseDir), 0700); err != nil { l.Warnln("Failure on home directory:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } if options.upgradeTo != "" { err := upgrade.ToURL(options.upgradeTo) if err != nil { l.Warnln("Error while Upgrading:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } l.Infoln("Upgraded from", options.upgradeTo) return @@ -422,13 +424,13 @@ func main() { os.Exit(exitCodeForUpgrade(err)) } l.Infof("Upgraded to %q", release.Tag) - os.Exit(syncthing.ExitUpgrade.AsInt()) + os.Exit(util.ExitUpgrade.AsInt()) } if options.resetDatabase { if err := resetDB(); err != nil { l.Warnln("Resetting database:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } l.Infoln("Successfully reset database - it will be rebuilt after next start.") return @@ -601,13 +603,14 @@ func syncthingMain(runtimeOptions RuntimeOptions) { } evLogger := events.NewLogger() - go evLogger.Serve() - defer evLogger.Stop() + ctx, cancel := context.WithCancel(context.Background()) + go evLogger.Serve(ctx) + defer cancel() cfg, err := syncthing.LoadConfigAtStartup(locations.Get(locations.ConfigFile), cert, evLogger, runtimeOptions.allowNewerConfig, noDefaultFolder) if err != nil { l.Warnln("Failed to initialize config:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } // Candidate builds should auto upgrade. Make sure the option is set, @@ -653,7 +656,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) { } } else { l.Infof("Upgraded to %q, exiting now.", release.Tag) - os.Exit(syncthing.ExitUpgrade.AsInt()) + os.Exit(util.ExitUpgrade.AsInt()) } } @@ -694,18 +697,18 @@ func syncthingMain(runtimeOptions RuntimeOptions) { f, err := os.Create(fmt.Sprintf("cpu-%d.pprof", os.Getpid())) if err != nil { l.Warnln("Creating profile:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } if err := pprof.StartCPUProfile(f); err != nil { l.Warnln("Starting profile:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } } go standbyMonitor(app, cfg) if err := app.Start(); err != nil { - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } cleanConfigDirectory() @@ -718,6 +721,10 @@ func syncthingMain(runtimeOptions RuntimeOptions) { status := app.Wait() + if status == util.ExitError { + l.Warnln("Syncthing stopped with error:", app.Error()) + } + if runtimeOptions.cpuProfile { pprof.StopCPUProfile() } @@ -733,7 +740,7 @@ func setupSignalHandling(app *syncthing.App) { signal.Notify(restartSign, sigHup) go func() { <-restartSign - app.Stop(syncthing.ExitRestart) + app.Stop(util.ExitRestart) }() // Exit with "success" code (no restart) on INT/TERM @@ -742,7 +749,7 @@ func setupSignalHandling(app *syncthing.App) { signal.Notify(stopSign, os.Interrupt, sigTerm) go func() { <-stopSign - app.Stop(syncthing.ExitSuccess) + app.Stop(util.ExitSuccess) }() } @@ -779,7 +786,7 @@ func auditWriter(auditFile string) io.Writer { fd, err = os.OpenFile(auditFile, auditFlags, 0600) if err != nil { l.Warnln("Audit:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } auditDest = auditFile } @@ -829,7 +836,7 @@ func standbyMonitor(app *syncthing.App, cfg config.Wrapper) { // things a moment to stabilize. time.Sleep(restartDelay) - app.Stop(syncthing.ExitRestart) + app.Stop(util.ExitRestart) return } now = time.Now() @@ -899,7 +906,7 @@ func autoUpgrade(cfg config.Wrapper, app *syncthing.App, evLogger events.Logger) sub.Unsubscribe() l.Warnf("Automatically upgraded to version %q. Restarting in 1 minute.", rel.Tag) time.Sleep(time.Minute) - app.Stop(syncthing.ExitUpgrade) + app.Stop(util.ExitUpgrade) return } } @@ -987,13 +994,13 @@ func setPauseState(cfg config.Wrapper, paused bool) { } if _, err := cfg.Replace(raw); err != nil { l.Warnln("Cannot adjust paused state:", err) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } } func exitCodeForUpgrade(err error) int { if _, ok := err.(*errNoUpgrade); ok { - return syncthing.ExitNoUpgradeAvailable.AsInt() + return util.ExitNoUpgradeAvailable.AsInt() } - return syncthing.ExitError.AsInt() + return util.ExitError.AsInt() } diff --git a/cmd/syncthing/monitor.go b/cmd/syncthing/monitor.go index 238b4b65e..93e8f6f07 100644 --- a/cmd/syncthing/monitor.go +++ b/cmd/syncthing/monitor.go @@ -26,7 +26,7 @@ import ( "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" - "github.com/syncthing/syncthing/lib/syncthing" + "github.com/syncthing/syncthing/lib/util" ) var ( @@ -99,7 +99,7 @@ func monitorMain(runtimeOptions RuntimeOptions) { if t := time.Since(restarts[0]); t < loopThreshold { l.Warnf("%d restarts in %v; not retrying further", countRestarts, t) - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } copy(restarts[0:], restarts[1:]) @@ -169,7 +169,7 @@ func monitorMain(runtimeOptions RuntimeOptions) { if err == nil { // Successful exit indicates an intentional shutdown - os.Exit(syncthing.ExitSuccess.AsInt()) + os.Exit(util.ExitSuccess.AsInt()) } if exiterr, ok := err.(*exec.ExitError); ok { @@ -177,7 +177,7 @@ func monitorMain(runtimeOptions RuntimeOptions) { if stopped || runtimeOptions.noRestart { os.Exit(exitCode) } - if exitCode == syncthing.ExitUpgrade.AsInt() { + if exitCode == util.ExitUpgrade.AsInt() { // Restart the monitor process to release the .old // binary as part of the upgrade process. l.Infoln("Restarting monitor...") @@ -189,7 +189,7 @@ func monitorMain(runtimeOptions RuntimeOptions) { } if runtimeOptions.noRestart { - os.Exit(syncthing.ExitError.AsInt()) + os.Exit(util.ExitError.AsInt()) } l.Infoln("Syncthing exited:", err) diff --git a/go.mod b/go.mod index 758602a77..6b7e85aa4 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/shirou/gopsutil v3.20.10+incompatible github.com/syncthing/notify v0.0.0-20201109091751-9a0e44181151 github.com/syndtr/goleveldb v1.0.1-0.20200815071216-d9e9293bd0f7 - github.com/thejerf/suture v4.0.0+incompatible + github.com/thejerf/suture/v4 v4.0.0 github.com/urfave/cli v1.22.4 github.com/vitrun/qart v0.0.0-20160531060029-bf64b92db6b0 golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 diff --git a/go.sum b/go.sum index c911bf8c6..29a7ae606 100644 --- a/go.sum +++ b/go.sum @@ -17,11 +17,9 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= @@ -85,7 +83,6 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/d4l3k/messagediff v1.2.1 h1:ZcAIMYsUg0EAp9X+tt8/enBE/Q8Yd5kzPynLyKptt9U= github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkEQxENCrlLo= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dchest/siphash v1.2.2 h1:9DFz8tQwl9pTVt5iok/9zKyzA1Q6bRGiF3HPiEEVr9I= github.com/dchest/siphash v1.2.2/go.mod h1:q+IRvb2gOSrUnYoPqHiyHXS0FOBBOdl6tONBlVnOnt4= @@ -114,7 +111,6 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs= github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= @@ -131,7 +127,6 @@ github.com/go-ldap/ldap/v3 v3.2.4/go.mod h1:iYS1MdmrmceOJ1QOTnRXrIs7i3kloqtmGQjR github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -153,7 +148,6 @@ github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200j github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= -github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -174,7 +168,6 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= @@ -247,11 +240,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= @@ -264,7 +255,6 @@ github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0Q github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/marten-seemann/qpack v0.2.0/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2opoCp2jDKb7wc= -github.com/marten-seemann/qtls v0.10.0 h1:ECsuYUKalRL240rRD4Ri33ISb7kAQ3qGDlrrl55b2pc= github.com/marten-seemann/qtls v0.10.0/go.mod h1:UvMd1oaYDACI99/oZUYLzMCkBXQVT0aGm99sJhbT8hs= github.com/marten-seemann/qtls-go1-15 v0.1.0 h1:i/YPXVxz8q9umso/5y474CNcHmTpA+5DH+mFPjx6PZg= github.com/marten-seemann/qtls-go1-15 v0.1.0/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= @@ -309,7 +299,6 @@ github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= -github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= @@ -317,11 +306,9 @@ github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:v github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= @@ -351,7 +338,6 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -435,7 +421,6 @@ github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= @@ -453,17 +438,14 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/syncthing/notify v0.0.0-20201101120444-a28a0bd0f5ee h1:Q2dajND8VmNqXOi+N3IQQP77VkuXMA7tvPzXosDS1vA= -github.com/syncthing/notify v0.0.0-20201101120444-a28a0bd0f5ee/go.mod h1:Sn4ChoS7e4FxjCN1XHPVBT43AgnRLbuaB8pEc1Zcdjg= github.com/syncthing/notify v0.0.0-20201109091751-9a0e44181151 h1:aKnLuEFWn/7u42UR82PxsPOMkoBAhq+06oRtUnK3Z1o= github.com/syncthing/notify v0.0.0-20201109091751-9a0e44181151/go.mod h1:Sn4ChoS7e4FxjCN1XHPVBT43AgnRLbuaB8pEc1Zcdjg= github.com/syndtr/goleveldb v1.0.1-0.20200815071216-d9e9293bd0f7 h1:udtnv1cokhJYqnUfCMCppJ71bFN9VKfG1BQ6UsYZnx8= github.com/syndtr/goleveldb v1.0.1-0.20200815071216-d9e9293bd0f7/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= -github.com/thejerf/suture v4.0.0+incompatible h1:luAwgEo87y1X30wEYa64N4SKMrsAm9qXRwNxnLVuuwg= -github.com/thejerf/suture v4.0.0+incompatible/go.mod h1:ibKwrVj+Uzf3XZdAiNWUouPaAbSoemxOHLmJmwheEMc= +github.com/thejerf/suture/v4 v4.0.0 h1:GX3X+1Qaewtj9flL2wgoTBfLA5NcmrCY39TJRpPbUrI= +github.com/thejerf/suture/v4 v4.0.0/go.mod h1:g0e8vwskm9tI0jRjxrnA6lSr0q6OfPdWJVX7G5bVWRs= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= @@ -617,7 +599,6 @@ golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapK golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= @@ -657,7 +638,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= @@ -665,7 +645,6 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= @@ -673,9 +652,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/lib/api/api.go b/lib/api/api.go index 797c2c626..727675465 100644 --- a/lib/api/api.go +++ b/lib/api/api.go @@ -33,7 +33,7 @@ import ( "github.com/julienschmidt/httprouter" metrics "github.com/rcrowley/go-metrics" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "github.com/vitrun/qart/qr" "github.com/syncthing/syncthing/lib/build" @@ -82,7 +82,6 @@ type service struct { connectionsService connections.Service fss model.FolderSummaryService urService *ur.Service - contr Controller noUpgrade bool tlsDefaultCommonName string configChanged chan struct{} // signals intentional listener close due to config change @@ -90,25 +89,20 @@ type service struct { startedOnce chan struct{} // the service has started successfully at least once startupErr error listenerAddr net.Addr + exitChan chan *util.FatalErr guiErrors logger.Recorder systemLog logger.Recorder } -type Controller interface { - ExitUpgrading() - Restart() - Shutdown() -} - type Service interface { suture.Service config.Committer WaitForStart() error } -func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, evLogger events.Logger, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, contr Controller, noUpgrade bool) Service { - s := &service{ +func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, evLogger events.Logger, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, noUpgrade bool) Service { + return &service{ id: id, cfg: cfg, statics: newStaticsServer(cfg.GUI().Theme, assetDir, cfg.Options().FeatureFlag(featureFlagUntrusted)), @@ -125,14 +119,12 @@ func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonNam urService: urService, guiErrors: errors, systemLog: systemLog, - contr: contr, noUpgrade: noUpgrade, tlsDefaultCommonName: tlsDefaultCommonName, configChanged: make(chan struct{}), startedOnce: make(chan struct{}), + exitChan: make(chan *util.FatalErr, 1), } - s.Service = util.AsService(s.serve, s.String()) - return s } func (s *service) WaitForStart() error { @@ -211,7 +203,7 @@ func sendJSON(w http.ResponseWriter, jsonObject interface{}) { fmt.Fprintf(w, "%s\n", bs) } -func (s *service) serve(ctx context.Context) { +func (s *service) Serve(ctx context.Context) error { listener, err := s.getListener(s.cfg.GUI()) if err != nil { select { @@ -227,13 +219,13 @@ func (s *service) serve(ctx context.Context) { s.startupErr = err close(s.startedOnce) } - return + return err } if listener == nil { // Not much we can do here other than exit quickly. The supervisor // will log an error at some point. - return + return nil } s.listenerAddr = listener.Addr() @@ -410,6 +402,7 @@ func (s *service) serve(ctx context.Context) { // Wait for stop, restart or error signals + err = nil select { case <-ctx.Done(): // Shutting down permanently @@ -417,11 +410,14 @@ func (s *service) serve(ctx context.Context) { case <-s.configChanged: // Soft restart due to configuration change l.Debugln("restarting (config changed)") - case <-serveError: + case err = <-s.exitChan: + case err = <-serveError: // Restart due to listen/serve failure l.Warnln("GUI/API:", err, "(restarting)") } srv.Close() + + return err } // Complete implements suture.IsCompletable, which signifies to the supervisor @@ -470,6 +466,14 @@ func (s *service) CommitConfiguration(from, to config.Configuration) bool { return true } +func (s *service) fatal(err *util.FatalErr) { + // s.exitChan is 1-buffered and whoever is first gets handled. + select { + case s.exitChan <- err: + default: + } +} + func debugMiddleware(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { t0 := time.Now() @@ -874,7 +878,11 @@ func (s *service) getDebugFile(w http.ResponseWriter, r *http.Request) { func (s *service) postSystemRestart(w http.ResponseWriter, r *http.Request) { s.flushResponse(`{"ok": "restarting"}`, w) - go s.contr.Restart() + + s.fatal(&util.FatalErr{ + Err: errors.New("restart initiated by rest API"), + Status: util.ExitRestart, + }) } func (s *service) postSystemReset(w http.ResponseWriter, r *http.Request) { @@ -900,12 +908,18 @@ func (s *service) postSystemReset(w http.ResponseWriter, r *http.Request) { s.flushResponse(`{"ok": "resetting folder `+folder+`"}`, w) } - go s.contr.Restart() + s.fatal(&util.FatalErr{ + Err: errors.New("restart after db reset initiated by rest API"), + Status: util.ExitRestart, + }) } func (s *service) postSystemShutdown(w http.ResponseWriter, r *http.Request) { s.flushResponse(`{"ok": "shutting down"}`, w) - go s.contr.Shutdown() + s.fatal(&util.FatalErr{ + Err: errors.New("shutdown initiated by rest API"), + Status: util.ExitSuccess, + }) } func (s *service) flushResponse(resp string, w http.ResponseWriter) { @@ -1340,7 +1354,10 @@ func (s *service) postSystemUpgrade(w http.ResponseWriter, r *http.Request) { } s.flushResponse(`{"ok": "restarting"}`, w) - s.contr.ExitUpgrading() + s.fatal(&util.FatalErr{ + Err: errors.New("exit after upgrade initiated by rest API"), + Status: util.ExitUpgrade, + }) } } diff --git a/lib/api/api_test.go b/lib/api/api_test.go index de7e3a804..f81c12c7f 100644 --- a/lib/api/api_test.go +++ b/lib/api/api_test.go @@ -9,6 +9,7 @@ package api import ( "bytes" "compress/gzip" + "context" "encoding/json" "fmt" "io" @@ -34,7 +35,8 @@ import ( "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/ur" - "github.com/thejerf/suture" + "github.com/syncthing/syncthing/lib/util" + "github.com/thejerf/suture/v4" ) var ( @@ -113,15 +115,14 @@ func TestStopAfterBrokenConfig(t *testing.T) { } w := config.Wrap("/dev/null", cfg, events.NoopLogger) - srv := New(protocol.LocalDeviceID, w, "", "syncthing", nil, nil, nil, events.NoopLogger, nil, nil, nil, nil, nil, nil, nil, false).(*service) + srv := New(protocol.LocalDeviceID, w, "", "syncthing", nil, nil, nil, events.NoopLogger, nil, nil, nil, nil, nil, nil, false).(*service) defer os.Remove(token) srv.started = make(chan string) - sup := suture.New("test", suture.Spec{ - PassThroughPanics: true, - }) + sup := suture.New("test", util.Spec()) sup.Add(srv) - sup.ServeBackground() + ctx, cancel := context.WithCancel(context.Background()) + sup.ServeBackground(ctx) <-srv.started @@ -139,9 +140,7 @@ func TestStopAfterBrokenConfig(t *testing.T) { t.Fatal("Verify config should have failed") } - // Nonetheless, it should be fine to Stop() it without panic. - - sup.Stop() + cancel() } func TestAssetsDir(t *testing.T) { @@ -250,11 +249,11 @@ func TestAPIServiceRequests(t *testing.T) { const testAPIKey = "foobarbaz" cfg := new(mockedConfig) cfg.gui.APIKey = testAPIKey - baseURL, sup, err := startHTTP(cfg) + baseURL, cancel, err := startHTTP(cfg) if err != nil { t.Fatal(err) } - defer sup.Stop() + defer cancel() cases := []httpTestCase{ // /rest/db @@ -519,11 +518,11 @@ func TestHTTPLogin(t *testing.T) { cfg := new(mockedConfig) cfg.gui.User = "üser" cfg.gui.Password = "$2a$10$IdIZTxTg/dCNuNEGlmLynOjqg4B1FvDKuIV5e0BB3pnWVHNb8.GSq" // bcrypt of "räksmörgås" in UTF-8 - baseURL, sup, err := startHTTP(cfg) + baseURL, cancel, err := startHTTP(cfg) if err != nil { t.Fatal(err) } - defer sup.Stop() + defer cancel() // Verify rejection when not using authorization @@ -581,7 +580,7 @@ func TestHTTPLogin(t *testing.T) { } } -func startHTTP(cfg config.Wrapper) (string, *suture.Supervisor, error) { +func startHTTP(cfg config.Wrapper) (string, context.CancelFunc, error) { m := new(mockedModel) assetDir := "../../gui" eventSub := new(mockedEventSub) @@ -594,7 +593,7 @@ func startHTTP(cfg config.Wrapper) (string, *suture.Supervisor, error) { // Instantiate the API service urService := ur.New(cfg, m, connections, false) - svc := New(protocol.LocalDeviceID, cfg, assetDir, "syncthing", m, eventSub, diskEventSub, events.NoopLogger, discoverer, connections, urService, &mockedFolderSummaryService{}, errorLog, systemLog, nil, false).(*service) + svc := New(protocol.LocalDeviceID, cfg, assetDir, "syncthing", m, eventSub, diskEventSub, events.NoopLogger, discoverer, connections, urService, &mockedFolderSummaryService{}, errorLog, systemLog, false).(*service) defer os.Remove(token) svc.started = addrChan @@ -603,14 +602,15 @@ func startHTTP(cfg config.Wrapper) (string, *suture.Supervisor, error) { PassThroughPanics: true, }) supervisor.Add(svc) - supervisor.ServeBackground() + ctx, cancel := context.WithCancel(context.Background()) + supervisor.ServeBackground(ctx) // Make sure the API service is listening, and get the URL to use. addr := <-addrChan tcpAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { - supervisor.Stop() - return "", nil, fmt.Errorf("weird address from API service: %w", err) + cancel() + return "", cancel, fmt.Errorf("weird address from API service: %w", err) } host, _, _ := net.SplitHostPort(cfg.GUI().RawAddress) @@ -619,7 +619,7 @@ func startHTTP(cfg config.Wrapper) (string, *suture.Supervisor, error) { } baseURL := fmt.Sprintf("http://%s", net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port))) - return baseURL, supervisor, nil + return baseURL, cancel, nil } func TestCSRFRequired(t *testing.T) { @@ -628,11 +628,11 @@ func TestCSRFRequired(t *testing.T) { const testAPIKey = "foobarbaz" cfg := new(mockedConfig) cfg.gui.APIKey = testAPIKey - baseURL, sup, err := startHTTP(cfg) + baseURL, cancel, err := startHTTP(cfg) if err != nil { t.Fatal("Unexpected error from getting base URL:", err) } - defer sup.Stop() + defer cancel() cli := &http.Client{ Timeout: time.Minute, @@ -704,11 +704,11 @@ func TestRandomString(t *testing.T) { const testAPIKey = "foobarbaz" cfg := new(mockedConfig) cfg.gui.APIKey = testAPIKey - baseURL, sup, err := startHTTP(cfg) + baseURL, cancel, err := startHTTP(cfg) if err != nil { t.Fatal(err) } - defer sup.Stop() + defer cancel() cli := &http.Client{ Timeout: time.Second, } @@ -797,11 +797,11 @@ func testConfigPost(data io.Reader) (*http.Response, error) { const testAPIKey = "foobarbaz" cfg := new(mockedConfig) cfg.gui.APIKey = testAPIKey - baseURL, sup, err := startHTTP(cfg) + baseURL, cancel, err := startHTTP(cfg) if err != nil { return nil, err } - defer sup.Stop() + defer cancel() cli := &http.Client{ Timeout: time.Second, } @@ -818,11 +818,11 @@ func TestHostCheck(t *testing.T) { cfg := new(mockedConfig) cfg.gui.RawAddress = "127.0.0.1:0" - baseURL, sup, err := startHTTP(cfg) + baseURL, cancel, err := startHTTP(cfg) if err != nil { t.Fatal(err) } - defer sup.Stop() + defer cancel() // A normal HTTP get to the localhost-bound service should succeed @@ -879,11 +879,11 @@ func TestHostCheck(t *testing.T) { cfg = new(mockedConfig) cfg.gui.RawAddress = "127.0.0.1:0" cfg.gui.InsecureSkipHostCheck = true - baseURL, sup, err = startHTTP(cfg) + baseURL, cancel, err = startHTTP(cfg) if err != nil { t.Fatal(err) } - defer sup.Stop() + defer cancel() // A request with a suspicious Host header should be allowed @@ -903,11 +903,11 @@ func TestHostCheck(t *testing.T) { cfg = new(mockedConfig) cfg.gui.RawAddress = "0.0.0.0:0" cfg.gui.InsecureSkipHostCheck = true - baseURL, sup, err = startHTTP(cfg) + baseURL, cancel, err = startHTTP(cfg) if err != nil { t.Fatal(err) } - defer sup.Stop() + defer cancel() // A request with a suspicious Host header should be allowed @@ -931,11 +931,11 @@ func TestHostCheck(t *testing.T) { cfg = new(mockedConfig) cfg.gui.RawAddress = "[::1]:0" - baseURL, sup, err = startHTTP(cfg) + baseURL, cancel, err = startHTTP(cfg) if err != nil { t.Fatal(err) } - defer sup.Stop() + defer cancel() // A normal HTTP get to the localhost-bound service should succeed @@ -1026,11 +1026,11 @@ func TestAccessControlAllowOriginHeader(t *testing.T) { const testAPIKey = "foobarbaz" cfg := new(mockedConfig) cfg.gui.APIKey = testAPIKey - baseURL, sup, err := startHTTP(cfg) + baseURL, cancel, err := startHTTP(cfg) if err != nil { t.Fatal(err) } - defer sup.Stop() + defer cancel() cli := &http.Client{ Timeout: time.Second, } @@ -1057,11 +1057,11 @@ func TestOptionsRequest(t *testing.T) { const testAPIKey = "foobarbaz" cfg := new(mockedConfig) cfg.gui.APIKey = testAPIKey - baseURL, sup, err := startHTTP(cfg) + baseURL, cancel, err := startHTTP(cfg) if err != nil { t.Fatal(err) } - defer sup.Stop() + defer cancel() cli := &http.Client{ Timeout: time.Second, } @@ -1093,7 +1093,7 @@ func TestEventMasks(t *testing.T) { cfg := new(mockedConfig) defSub := new(mockedEventSub) diskSub := new(mockedEventSub) - svc := New(protocol.LocalDeviceID, cfg, "", "syncthing", nil, defSub, diskSub, events.NoopLogger, nil, nil, nil, nil, nil, nil, nil, false).(*service) + svc := New(protocol.LocalDeviceID, cfg, "", "syncthing", nil, defSub, diskSub, events.NoopLogger, nil, nil, nil, nil, nil, nil, false).(*service) defer os.Remove(token) if mask := svc.getEventMask(""); mask != DefaultEventMask { @@ -1253,11 +1253,11 @@ func TestConfigChanges(t *testing.T) { defer os.Remove(tmpFile.Name()) w := config.Wrap(tmpFile.Name(), cfg, events.NoopLogger) tmpFile.Close() - baseURL, sup, err := startHTTP(w) + baseURL, cancel, err := startHTTP(w) if err != nil { t.Fatal("Unexpected error from getting base URL:", err) } - defer sup.Stop() + defer cancel() cli := &http.Client{ Timeout: time.Second, diff --git a/lib/api/mocked_connections_test.go b/lib/api/mocked_connections_test.go index 11883b775..d86459754 100644 --- a/lib/api/mocked_connections_test.go +++ b/lib/api/mocked_connections_test.go @@ -7,6 +7,8 @@ package api import ( + "context" + "github.com/syncthing/syncthing/lib/connections" ) @@ -24,9 +26,7 @@ func (m *mockedConnections) NATType() string { return "" } -func (m *mockedConnections) Serve() {} - -func (m *mockedConnections) Stop() {} +func (m *mockedConnections) Serve(ctx context.Context) error { return nil } func (m *mockedConnections) ExternalAddresses() []string { return nil } diff --git a/lib/api/mocked_discovery_test.go b/lib/api/mocked_discovery_test.go index e560a58aa..9b03dde43 100644 --- a/lib/api/mocked_discovery_test.go +++ b/lib/api/mocked_discovery_test.go @@ -17,13 +17,10 @@ type mockedCachingMux struct{} // from suture.Service -func (m *mockedCachingMux) Serve() { +func (m *mockedCachingMux) Serve(ctx context.Context) error { select {} } -func (m *mockedCachingMux) Stop() { -} - // from events.Finder func (m *mockedCachingMux) Lookup(ctx context.Context, deviceID protocol.DeviceID) (direct []string, err error) { diff --git a/lib/api/mocked_model_test.go b/lib/api/mocked_model_test.go index e9cf92001..491f4498a 100644 --- a/lib/api/mocked_model_test.go +++ b/lib/api/mocked_model_test.go @@ -7,6 +7,7 @@ package api import ( + "context" "net" "time" @@ -124,8 +125,7 @@ func (m *mockedModel) WatchError(folder string) error { return nil } -func (m *mockedModel) Serve() {} -func (m *mockedModel) Stop() {} +func (m *mockedModel) Serve(ctx context.Context) error { return nil } func (m *mockedModel) Index(deviceID protocol.DeviceID, folder string, files []protocol.FileInfo) error { return nil @@ -167,9 +167,7 @@ func (m *mockedModel) DBSnapshot(_ string) (*db.Snapshot, error) { type mockedFolderSummaryService struct{} -func (m *mockedFolderSummaryService) Serve() {} - -func (m *mockedFolderSummaryService) Stop() {} +func (m *mockedFolderSummaryService) Serve(context.Context) error { return nil } func (m *mockedFolderSummaryService) Summary(folder string) (map[string]interface{}, error) { return map[string]interface{}{"mocked": true}, nil diff --git a/lib/beacon/beacon.go b/lib/beacon/beacon.go index d7cb5e04f..ed7545c36 100644 --- a/lib/beacon/beacon.go +++ b/lib/beacon/beacon.go @@ -12,7 +12,7 @@ import ( "net" "time" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "github.com/syncthing/syncthing/lib/util" ) @@ -44,24 +44,25 @@ type cast struct { // caller needs to set reader and writer with the addReader and addWriter // methods to get a functional implementation of Interface. func newCast(name string) *cast { - return &cast{ - Supervisor: suture.New(name, suture.Spec{ - // Don't retry too frenetically: an error to open a socket or - // whatever is usually something that is either permanent or takes - // a while to get solved... - FailureThreshold: 2, - FailureBackoff: 60 * time.Second, - // Only log restarts in debug mode. - Log: func(line string) { - l.Debugln(line) - }, - PassThroughPanics: true, - }), - name: name, - inbox: make(chan []byte), - outbox: make(chan recv, 16), - stopped: make(chan struct{}), + spec := util.Spec() + // Don't retry too frenetically: an error to open a socket or + // whatever is usually something that is either permanent or takes + // a while to get solved... + spec.FailureThreshold = 2 + spec.FailureBackoff = 60 * time.Second + // Only log restarts in debug mode. + spec.EventHook = func(e suture.Event) { + l.Debugln(e) } + c := &cast{ + Supervisor: suture.New(name, spec), + name: name, + inbox: make(chan []byte), + outbox: make(chan recv, 16), + stopped: make(chan struct{}), + } + util.OnSupervisorDone(c.Supervisor, func() { close(c.stopped) }) + return c } func (c *cast) addReader(svc func(context.Context) error) { @@ -75,17 +76,7 @@ func (c *cast) addWriter(svc func(ctx context.Context) error) { } func (c *cast) createService(svc func(context.Context) error, suffix string) util.ServiceWithError { - return util.AsServiceWithError(func(ctx context.Context) error { - l.Debugln("Starting", c.name, suffix) - err := svc(ctx) - l.Debugf("Stopped %v %v: %v", c.name, suffix, err) - return err - }, fmt.Sprintf("%s/%s", c, suffix)) -} - -func (c *cast) Stop() { - c.Supervisor.Stop() - close(c.stopped) + return util.AsService(svc, fmt.Sprintf("%s/%s", c, suffix)) } func (c *cast) String() string { diff --git a/lib/beacon/broadcast.go b/lib/beacon/broadcast.go index fbd0061de..940d01c2c 100644 --- a/lib/beacon/broadcast.go +++ b/lib/beacon/broadcast.go @@ -41,7 +41,7 @@ func writeBroadcasts(ctx context.Context, inbox <-chan []byte, port int) error { select { case bs = <-inbox: case <-doneCtx.Done(): - return nil + return doneCtx.Err() } intfs, err := net.Interfaces() @@ -138,7 +138,7 @@ func readBroadcasts(ctx context.Context, outbox chan<- recv, port int) error { select { case outbox <- recv{c, addr}: case <-doneCtx.Done(): - return nil + return doneCtx.Err() default: l.Debugln("dropping message") } diff --git a/lib/beacon/multicast.go b/lib/beacon/multicast.go index d3644053f..6e8501d00 100644 --- a/lib/beacon/multicast.go +++ b/lib/beacon/multicast.go @@ -56,7 +56,7 @@ func writeMulticasts(ctx context.Context, inbox <-chan []byte, addr string) erro select { case bs = <-inbox: case <-doneCtx.Done(): - return nil + return doneCtx.Err() } intfs, err := net.Interfaces() @@ -87,7 +87,7 @@ func writeMulticasts(ctx context.Context, inbox <-chan []byte, addr string) erro select { case <-doneCtx.Done(): - return nil + return doneCtx.Err() default: } } @@ -144,7 +144,7 @@ func readMulticasts(ctx context.Context, outbox chan<- recv, addr string) error for { select { case <-doneCtx.Done(): - return nil + return doneCtx.Err() default: } n, _, addr, err := pconn.ReadFrom(bs) diff --git a/lib/connections/quic_listen.go b/lib/connections/quic_listen.go index a5ff9298f..8a320de8f 100644 --- a/lib/connections/quic_listen.go +++ b/lib/connections/quic_listen.go @@ -91,8 +91,7 @@ func (t *quicListener) serve(ctx context.Context) error { svc, conn := stun.New(t.cfg, t, packetConn) defer func() { _ = conn.Close() }() - go svc.Serve() - defer svc.Stop() + go svc.Serve(ctx) registry.Register(t.uri.Scheme, conn) defer registry.Unregister(t.uri.Scheme, conn) @@ -115,7 +114,7 @@ func (t *quicListener) serve(ctx context.Context) error { for { select { case <-ctx.Done(): - return nil + return ctx.Err() default: } @@ -206,7 +205,7 @@ func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls. conns: conns, factory: f, } - l.ServiceWithError = util.AsServiceWithError(l.serve, l.String()) + l.ServiceWithError = util.AsService(l.serve, l.String()) l.nat.Store(stun.NATUnknown) return l } diff --git a/lib/connections/relay_listen.go b/lib/connections/relay_listen.go index 2ffb61cbe..09e2a852a 100644 --- a/lib/connections/relay_listen.go +++ b/lib/connections/relay_listen.go @@ -53,8 +53,7 @@ func (t *relayListener) serve(ctx context.Context) error { t.mut.Lock() t.client = clnt - go clnt.Serve() - defer clnt.Stop() + go clnt.Serve(ctx) t.mut.Unlock() // Start with nil, so that we send a addresses changed notification as soon as we connect somewhere. @@ -120,7 +119,7 @@ func (t *relayListener) serve(ctx context.Context) error { } case <-ctx.Done(): - return nil + return ctx.Err() } } } @@ -185,7 +184,7 @@ func (f *relayListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls conns: conns, factory: f, } - t.ServiceWithError = util.AsServiceWithError(t.serve, t.String()) + t.ServiceWithError = util.AsService(t.serve, t.String()) return t } diff --git a/lib/connections/service.go b/lib/connections/service.go index 48c495c4d..4a347be57 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -31,7 +31,7 @@ import ( _ "github.com/syncthing/syncthing/lib/upnp" "github.com/pkg/errors" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "golang.org/x/time/rate" ) @@ -132,13 +132,12 @@ type service struct { } func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder, bepProtocolName string, tlsDefaultCommonName string, evLogger events.Logger) Service { + spec := util.Spec() + spec.EventHook = func(e suture.Event) { + l.Infoln(e) + } service := &service{ - Supervisor: suture.New("connections.Service", suture.Spec{ - Log: func(line string) { - l.Infoln(line) - }, - PassThroughPanics: true, - }), + Supervisor: suture.New("connections.Service", spec), connectionStatusHandler: newConnectionStatusHandler(), cfg: cfg, @@ -162,8 +161,8 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t // due to config are done by removing and adding services, so are // not subject to these limitations. listenerSupervisor: suture.New("c.S.listenerSupervisor", suture.Spec{ - Log: func(line string) { - l.Infoln(line) + EventHook: func(e suture.Event) { + l.Infoln(e) }, FailureThreshold: 2, FailureBackoff: 600 * time.Second, @@ -189,21 +188,20 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t service.Add(service.listenerSupervisor) service.Add(service.natService) + util.OnSupervisorDone(service.Supervisor, func() { + service.cfg.Unsubscribe(service.limiter) + service.cfg.Unsubscribe(service) + }) + return service } -func (s *service) Stop() { - s.cfg.Unsubscribe(s.limiter) - s.cfg.Unsubscribe(s) - s.Supervisor.Stop() -} - -func (s *service) handle(ctx context.Context) { +func (s *service) handle(ctx context.Context) error { var c internalConn for { select { case <-ctx.Done(): - return + return ctx.Err() case c = <-s.conns: } @@ -338,9 +336,10 @@ func (s *service) handle(ctx context.Context) { s.model.AddConnection(modelConn, hello) continue } + return nil } -func (s *service) connect(ctx context.Context) { +func (s *service) connect(ctx context.Context) error { nextDial := make(map[string]time.Time) // Used as delay for the first few connection attempts, increases @@ -371,7 +370,7 @@ func (s *service) connect(ctx context.Context) { for _, deviceCfg := range cfg.Devices { select { case <-ctx.Done(): - return + return ctx.Err() default: } @@ -503,9 +502,10 @@ func (s *service) connect(ctx context.Context) { select { case <-time.After(sleep): case <-ctx.Done(): - return + return ctx.Err() } } + return nil } func (s *service) isLANHost(host string) bool { diff --git a/lib/connections/structs.go b/lib/connections/structs.go index d270d13bf..3918a95fc 100644 --- a/lib/connections/structs.go +++ b/lib/connections/structs.go @@ -18,6 +18,8 @@ import ( "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/nat" "github.com/syncthing/syncthing/lib/protocol" + + "github.com/thejerf/suture/v4" ) // Connection is what we expose to the outside. It is a protocol.Connection @@ -181,8 +183,7 @@ type ListenerAddresses struct { } type genericListener interface { - Serve() - Stop() + suture.Service URI() *url.URL // A given address can potentially be mutated by the listener. // For example we bind to tcp://0.0.0.0, but that for example might return diff --git a/lib/connections/tcp_listen.go b/lib/connections/tcp_listen.go index fbe1e3cc8..912708563 100644 --- a/lib/connections/tcp_listen.go +++ b/lib/connections/tcp_listen.go @@ -207,7 +207,7 @@ func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.C natService: natService, factory: f, } - l.ServiceWithError = util.AsServiceWithError(l.serve, l.String()) + l.ServiceWithError = util.AsService(l.serve, l.String()) return l } diff --git a/lib/db/lowlevel.go b/lib/db/lowlevel.go index c0c1b620c..8074564aa 100644 --- a/lib/db/lowlevel.go +++ b/lib/db/lowlevel.go @@ -25,7 +25,7 @@ import ( "github.com/syncthing/syncthing/lib/sha256" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/util" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" ) const ( @@ -68,14 +68,13 @@ type Lowlevel struct { } func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel { + spec := util.Spec() + // Only log restarts in debug mode. + spec.EventHook = func(e suture.Event) { + l.Debugln(e) + } db := &Lowlevel{ - Supervisor: suture.New("db.Lowlevel", suture.Spec{ - // Only log restarts in debug mode. - Log: func(line string) { - l.Debugln(line) - }, - PassThroughPanics: true, - }), + Supervisor: suture.New("db.Lowlevel", spec), Backend: backend, folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}), deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}), @@ -586,7 +585,7 @@ func (db *Lowlevel) dropPrefix(prefix []byte) error { return t.Commit() } -func (db *Lowlevel) gcRunner(ctx context.Context) { +func (db *Lowlevel) gcRunner(ctx context.Context) error { // Calculate the time for the next GC run. Even if we should run GC // directly, give the system a while to get up and running and do other // stuff first. (We might have migrations and stuff which would be @@ -602,7 +601,7 @@ func (db *Lowlevel) gcRunner(ctx context.Context) { for { select { case <-ctx.Done(): - return + return ctx.Err() case <-t.C: if err := db.gcIndirect(ctx); err != nil { l.Warnln("Database indirection GC failed:", err) diff --git a/lib/discover/cache.go b/lib/discover/cache.go index b22255020..29dfdedf1 100644 --- a/lib/discover/cache.go +++ b/lib/discover/cache.go @@ -10,7 +10,7 @@ import ( stdsync "sync" "time" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "github.com/syncthing/syncthing/lib/protocol" ) diff --git a/lib/discover/cache_test.go b/lib/discover/cache_test.go index d6dc4a081..dd79d3ec8 100644 --- a/lib/discover/cache_test.go +++ b/lib/discover/cache_test.go @@ -18,6 +18,14 @@ import ( "github.com/syncthing/syncthing/lib/protocol" ) +func setupCache() *manager { + cfg := config.New(protocol.LocalDeviceID) + cfg.Options.LocalAnnEnabled = false + cfg.Options.GlobalAnnEnabled = false + + return NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager) +} + func TestCacheUnique(t *testing.T) { addresses0 := []string{"tcp://192.0.2.44:22000", "tcp://192.0.2.42:22000"} addresses1 := []string{"tcp://192.0.2.43:22000", "tcp://192.0.2.42:22000"} @@ -33,13 +41,7 @@ func TestCacheUnique(t *testing.T) { "tcp://192.0.2.44:22000", } - cfg := config.New(protocol.LocalDeviceID) - cfg.Options.LocalAnnEnabled = false - cfg.Options.GlobalAnnEnabled = false - - c := NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager) - c.ServeBackground() - defer c.Stop() + c := setupCache() // Add a fake discovery service and verify we get its answers through the // cache. @@ -93,13 +95,7 @@ func (f *fakeDiscovery) Cache() map[protocol.DeviceID]CacheEntry { } func TestCacheSlowLookup(t *testing.T) { - cfg := config.New(protocol.LocalDeviceID) - cfg.Options.LocalAnnEnabled = false - cfg.Options.GlobalAnnEnabled = false - - c := NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager) - c.ServeBackground() - defer c.Stop() + c := setupCache() // Add a slow discovery service. diff --git a/lib/discover/discover.go b/lib/discover/discover.go index d866aa16a..95564fe63 100644 --- a/lib/discover/discover.go +++ b/lib/discover/discover.go @@ -11,7 +11,7 @@ import ( "time" "github.com/syncthing/syncthing/lib/protocol" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" ) // A Finder provides lookup services of some kind. diff --git a/lib/discover/global.go b/lib/discover/global.go index 57749c5ac..cca74ff21 100644 --- a/lib/discover/global.go +++ b/lib/discover/global.go @@ -21,16 +21,12 @@ import ( stdsync "sync" "time" - "github.com/thejerf/suture" - "github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" - "github.com/syncthing/syncthing/lib/util" ) type globalClient struct { - suture.Service server string addrList AddressLister announceClient httpClient @@ -133,7 +129,6 @@ func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLo noLookup: opts.noLookup, evLogger: evLogger, } - cl.Service = util.AsService(cl.serve, cl.String()) if !opts.noAnnounce { // If we are supposed to annonce, it's an error until we've done so. cl.setError(errors.New("not announced")) @@ -193,12 +188,12 @@ func (c *globalClient) String() string { return "global@" + c.server } -func (c *globalClient) serve(ctx context.Context) { +func (c *globalClient) Serve(ctx context.Context) error { if c.noAnnounce { // We're configured to not do announcements, only lookups. To maintain // the same interface, we just pause here if Serve() is run. <-ctx.Done() - return + return ctx.Err() } timer := time.NewTimer(5 * time.Second) @@ -231,7 +226,7 @@ func (c *globalClient) serve(ctx context.Context) { c.sendAnnouncement(ctx, timer) case <-ctx.Done(): - return + return ctx.Err() } } } diff --git a/lib/discover/global_test.go b/lib/discover/global_test.go index 58940d439..8ebbc7c4d 100644 --- a/lib/discover/global_test.go +++ b/lib/discover/global_test.go @@ -200,8 +200,9 @@ func TestGlobalAnnounce(t *testing.T) { t.Fatal(err) } - go disco.Serve() - defer disco.Stop() + ctx, cancel := context.WithCancel(context.Background()) + go disco.Serve(ctx) + defer cancel() // The discovery thing should attempt an announcement immediately. We wait // for it to succeed, a while. @@ -223,8 +224,9 @@ func testLookup(url string) ([]string, error) { if err != nil { return nil, err } - go disco.Serve() - defer disco.Stop() + ctx, cancel := context.WithCancel(context.Background()) + go disco.Serve(ctx) + defer cancel() return disco.Lookup(context.Background(), protocol.LocalDeviceID) } diff --git a/lib/discover/local.go b/lib/discover/local.go index a4d79e059..3769931b3 100644 --- a/lib/discover/local.go +++ b/lib/discover/local.go @@ -25,7 +25,7 @@ import ( "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/util" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" ) type localClient struct { @@ -52,9 +52,7 @@ const ( func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogger events.Logger) (FinderService, error) { c := &localClient{ - Supervisor: suture.New("local", suture.Spec{ - PassThroughPanics: true, - }), + Supervisor: suture.New("local", util.Spec()), myID: id, addrList: addrList, evLogger: evLogger, @@ -137,7 +135,7 @@ func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, boo return msg, true } -func (c *localClient) sendLocalAnnouncements(ctx context.Context) { +func (c *localClient) sendLocalAnnouncements(ctx context.Context) error { var msg []byte var ok bool instanceID := rand.Int63() @@ -150,18 +148,18 @@ func (c *localClient) sendLocalAnnouncements(ctx context.Context) { case <-c.localBcastTick: case <-c.forcedBcastTick: case <-ctx.Done(): - return + return ctx.Err() } } } -func (c *localClient) recvAnnouncements(ctx context.Context) { +func (c *localClient) recvAnnouncements(ctx context.Context) error { b := c.beacon warnedAbout := make(map[string]bool) for { select { case <-ctx.Done(): - return + return ctx.Err() default: } diff --git a/lib/discover/local_test.go b/lib/discover/local_test.go index b6bf3c32f..ce5872c2c 100644 --- a/lib/discover/local_test.go +++ b/lib/discover/local_test.go @@ -8,6 +8,7 @@ package discover import ( "bytes" + "context" "net" "testing" @@ -20,8 +21,9 @@ func TestLocalInstanceID(t *testing.T) { if err != nil { t.Fatal(err) } - go c.Serve() - defer c.Stop() + ctx, cancel := context.WithCancel(context.Background()) + go c.Serve(ctx) + defer cancel() lc := c.(*localClient) diff --git a/lib/discover/manager.go b/lib/discover/manager.go index 1e2b34b0f..c01c74840 100644 --- a/lib/discover/manager.go +++ b/lib/discover/manager.go @@ -13,7 +13,7 @@ import ( "sort" "time" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/events" @@ -46,10 +46,8 @@ type manager struct { } func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate, evLogger events.Logger, lister AddressLister) Manager { - return &manager{ - Supervisor: suture.New("discover.Manager", suture.Spec{ - PassThroughPanics: true, - }), + m := &manager{ + Supervisor: suture.New("discover.Manager", util.Spec()), myID: myID, cfg: cfg, cert: cert, @@ -59,13 +57,16 @@ func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate finders: make(map[string]cachedFinder), mut: sync.NewRWMutex(), } + m.Add(util.AsService(m.serve, m.String())) + return m } -func (m *manager) Serve() { +func (m *manager) serve(ctx context.Context) error { m.cfg.Subscribe(m) - defer m.cfg.Unsubscribe(m) m.CommitConfiguration(config.Configuration{}, m.cfg.RawCopy()) - m.Supervisor.Serve() + <-ctx.Done() + m.cfg.Unsubscribe(m) + return nil } func (m *manager) addLocked(identity string, finder Finder, cacheTime, negCacheTime time.Duration) { diff --git a/lib/events/events.go b/lib/events/events.go index d672701e2..63a9cd6e4 100644 --- a/lib/events/events.go +++ b/lib/events/events.go @@ -15,10 +15,9 @@ import ( "runtime" "time" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "github.com/syncthing/syncthing/lib/sync" - "github.com/syncthing/syncthing/lib/util" ) type EventType int64 @@ -219,7 +218,6 @@ type Logger interface { } type logger struct { - suture.Service subs []*subscription nextSubscriptionIDs []int nextGlobalID int @@ -267,7 +265,6 @@ func NewLogger() Logger { funcs: make(chan func(context.Context)), toUnsubscribe: make(chan *subscription), } - l.Service = util.AsService(l.serve, l.String()) // Make sure the timer is in the stopped state and hasn't fired anything // into the channel. if !l.timeout.Stop() { @@ -276,7 +273,7 @@ func NewLogger() Logger { return l } -func (l *logger) serve(ctx context.Context) { +func (l *logger) Serve(ctx context.Context) error { loop: for { select { @@ -302,6 +299,8 @@ loop: for _, s := range l.subs { close(s.events) } + + return nil } func (l *logger) Log(t EventType, data interface{}) { @@ -535,7 +534,7 @@ type noopLogger struct{} var NoopLogger Logger = &noopLogger{} -func (*noopLogger) Serve() {} +func (*noopLogger) Serve(ctx context.Context) error { return nil } func (*noopLogger) Stop() {} diff --git a/lib/events/events_test.go b/lib/events/events_test.go index 49f64e9a4..12e935158 100644 --- a/lib/events/events_test.go +++ b/lib/events/events_test.go @@ -7,6 +7,7 @@ package events import ( + "context" "encoding/json" "fmt" "sync" @@ -27,10 +28,16 @@ func TestNewLogger(t *testing.T) { } } -func TestSubscriber(t *testing.T) { +func setupLogger() (Logger, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) l := NewLogger() - defer l.Stop() - go l.Serve() + go l.Serve(ctx) + return l, cancel +} + +func TestSubscriber(t *testing.T) { + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(0) defer s.Unsubscribe() @@ -40,9 +47,8 @@ func TestSubscriber(t *testing.T) { } func TestTimeout(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(0) defer s.Unsubscribe() @@ -53,9 +59,8 @@ func TestTimeout(t *testing.T) { } func TestEventBeforeSubscribe(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() l.Log(DeviceConnected, "foo") s := l.Subscribe(0) @@ -68,9 +73,8 @@ func TestEventBeforeSubscribe(t *testing.T) { } func TestEventAfterSubscribe(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(AllEvents) defer s.Unsubscribe() @@ -95,9 +99,8 @@ func TestEventAfterSubscribe(t *testing.T) { } func TestEventAfterSubscribeIgnoreMask(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(DeviceDisconnected) defer s.Unsubscribe() @@ -110,9 +113,8 @@ func TestEventAfterSubscribeIgnoreMask(t *testing.T) { } func TestBufferOverflow(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(AllEvents) defer s.Unsubscribe() @@ -135,9 +137,8 @@ func TestBufferOverflow(t *testing.T) { } func TestUnsubscribe(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(AllEvents) l.Log(DeviceConnected, "foo") @@ -157,9 +158,8 @@ func TestUnsubscribe(t *testing.T) { } func TestGlobalIDs(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(AllEvents) defer s.Unsubscribe() @@ -189,9 +189,8 @@ func TestGlobalIDs(t *testing.T) { } func TestSubscriptionIDs(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(DeviceConnected) defer s.Unsubscribe() @@ -231,9 +230,8 @@ func TestSubscriptionIDs(t *testing.T) { } func TestBufferedSub(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(AllEvents) defer s.Unsubscribe() @@ -262,9 +260,8 @@ func TestBufferedSub(t *testing.T) { } func BenchmarkBufferedSub(b *testing.B) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(AllEvents) defer s.Unsubscribe() @@ -318,9 +315,8 @@ func BenchmarkBufferedSub(b *testing.B) { } func TestSinceUsesSubscriptionId(t *testing.T) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(DeviceConnected) defer s.Unsubscribe() @@ -375,9 +371,8 @@ func TestUnsubscribeContention(t *testing.T) { senders = 1000 ) - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() // Start listeners. These will poll until the stop channel is closed, // then exit and unsubscribe. @@ -444,9 +439,8 @@ func TestUnsubscribeContention(t *testing.T) { } func BenchmarkLogEvent(b *testing.B) { - l := NewLogger() - defer l.Stop() - go l.Serve() + l, cancel := setupLogger() + defer cancel() s := l.Subscribe(AllEvents) defer s.Unsubscribe() diff --git a/lib/model/fakeconns_test.go b/lib/model/fakeconns_test.go index 809b0eae8..0ab3c7417 100644 --- a/lib/model/fakeconns_test.go +++ b/lib/model/fakeconns_test.go @@ -31,7 +31,7 @@ type fakeConnection struct { files []protocol.FileInfo fileData map[string][]byte folder string - model *model + model *testModel indexFn func(context.Context, string, []protocol.FileInfo) requestFn func(ctx context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) closeFn func(error) @@ -201,7 +201,7 @@ func (f *fakeConnection) sendIndexUpdate() { f.model.IndexUpdate(f.id, f.folder, f.files) } -func addFakeConn(m *model, dev protocol.DeviceID) *fakeConnection { +func addFakeConn(m *testModel, dev protocol.DeviceID) *fakeConnection { fc := &fakeConnection{id: dev, model: m} m.AddConnection(fc, protocol.Hello{}) diff --git a/lib/model/folder.go b/lib/model/folder.go index 7c9874a1b..3d7278d59 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -31,12 +31,9 @@ import ( "github.com/syncthing/syncthing/lib/util" "github.com/syncthing/syncthing/lib/versioner" "github.com/syncthing/syncthing/lib/watchaggregator" - - "github.com/thejerf/suture" ) type folder struct { - suture.Service stateTracker config.FolderConfiguration *stats.FolderStatisticsReference @@ -135,7 +132,7 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf return f } -func (f *folder) serve(ctx context.Context) { +func (f *folder) Serve(ctx context.Context) error { atomic.AddInt32(&f.model.foldersRunning, 1) defer atomic.AddInt32(&f.model.foldersRunning, -1) @@ -168,7 +165,7 @@ func (f *folder) serve(ctx context.Context) { select { case <-f.ctx.Done(): close(f.done) - return + return nil case <-f.pullScheduled: f.pull() diff --git a/lib/model/folder_recvonly_test.go b/lib/model/folder_recvonly_test.go index ece6e070b..57016937f 100644 --- a/lib/model/folder_recvonly_test.go +++ b/lib/model/folder_recvonly_test.go @@ -441,7 +441,7 @@ func setupKnownFiles(t *testing.T, ffs fs.Filesystem, data []byte) []protocol.Fi return knownFiles } -func setupROFolder(t *testing.T) (*model, *receiveOnlyFolder) { +func setupROFolder(t *testing.T) (*testModel, *receiveOnlyFolder) { t.Helper() w := createTmpWrapper(defaultCfg) @@ -455,6 +455,7 @@ func setupROFolder(t *testing.T) (*model, *receiveOnlyFolder) { m := newModel(w, myID, "syncthing", "dev", db.NewLowlevel(backend.OpenMemory()), nil) m.ServeBackground() + <-m.started must(t, m.ScanFolder("ro")) m.fmut.RLock() diff --git a/lib/model/folder_sendonly.go b/lib/model/folder_sendonly.go index 501e98ad2..c6d778661 100644 --- a/lib/model/folder_sendonly.go +++ b/lib/model/folder_sendonly.go @@ -13,7 +13,6 @@ import ( "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/protocol" - "github.com/syncthing/syncthing/lib/util" "github.com/syncthing/syncthing/lib/versioner" ) @@ -30,7 +29,6 @@ func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter, nil), } f.folder.puller = f - f.folder.Service = util.AsService(f.serve, f.String()) return f } diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 788e47384..ba45c12df 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -28,7 +28,6 @@ import ( "github.com/syncthing/syncthing/lib/scanner" "github.com/syncthing/syncthing/lib/sha256" "github.com/syncthing/syncthing/lib/sync" - "github.com/syncthing/syncthing/lib/util" "github.com/syncthing/syncthing/lib/versioner" "github.com/syncthing/syncthing/lib/weakhash" ) @@ -140,7 +139,6 @@ func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matche writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites), } f.folder.puller = f - f.folder.Service = util.AsService(f.serve, f.String()) if f.Copiers == 0 { f.Copiers = defaultCopiers diff --git a/lib/model/folder_sendrecv_test.go b/lib/model/folder_sendrecv_test.go index 6827ed00a..7e28ce1c2 100644 --- a/lib/model/folder_sendrecv_test.go +++ b/lib/model/folder_sendrecv_test.go @@ -91,10 +91,12 @@ func createFile(t *testing.T, name string, fs fs.Filesystem) protocol.FileInfo { } // Sets up a folder and model, but makes sure the services aren't actually running. -func setupSendReceiveFolder(files ...protocol.FileInfo) (*model, *sendReceiveFolder) { +func setupSendReceiveFolder(files ...protocol.FileInfo) (*testModel, *sendReceiveFolder) { w, fcfg := tmpDefaultWrapper() + // Initialise model and stop immediately. model := setupModel(w) - model.Supervisor.Stop() + model.cancel() + <-model.stopped f := model.folderRunners[fcfg.ID].(*sendReceiveFolder) f.tempPullErrors = make(map[string]string) f.ctx = context.Background() @@ -107,8 +109,7 @@ func setupSendReceiveFolder(files ...protocol.FileInfo) (*model, *sendReceiveFol return model, f } -func cleanupSRFolder(f *sendReceiveFolder, m *model) { - m.evLogger.Stop() +func cleanupSRFolder(f *sendReceiveFolder, m *testModel) { os.Remove(m.cfg.ConfigPath()) os.RemoveAll(f.Filesystem().URI()) } diff --git a/lib/model/folder_summary.go b/lib/model/folder_summary.go index 7bd3b2f00..07bc28b68 100644 --- a/lib/model/folder_summary.go +++ b/lib/model/folder_summary.go @@ -12,7 +12,7 @@ import ( "strings" "time" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/db" @@ -52,9 +52,7 @@ type folderSummaryService struct { func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, evLogger events.Logger) FolderSummaryService { service := &folderSummaryService{ - Supervisor: suture.New("folderSummaryService", suture.Spec{ - PassThroughPanics: true, - }), + Supervisor: suture.New("folderSummaryService", util.Spec()), cfg: cfg, model: m, id: id, @@ -169,7 +167,7 @@ func (c *folderSummaryService) OnEventRequest() { // listenForUpdates subscribes to the event bus and makes note of folders that // need their data recalculated. -func (c *folderSummaryService) listenForUpdates(ctx context.Context) { +func (c *folderSummaryService) listenForUpdates(ctx context.Context) error { sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress) defer sub.Unsubscribe() @@ -180,7 +178,7 @@ func (c *folderSummaryService) listenForUpdates(ctx context.Context) { case ev := <-sub.C(): c.processUpdate(ev) case <-ctx.Done(): - return + return ctx.Err() } } } @@ -261,7 +259,7 @@ func (c *folderSummaryService) processUpdate(ev events.Event) { // calculateSummaries periodically recalculates folder summaries and // completion percentage, and sends the results on the event bus. -func (c *folderSummaryService) calculateSummaries(ctx context.Context) { +func (c *folderSummaryService) calculateSummaries(ctx context.Context) error { const pumpInterval = 2 * time.Second pump := time.NewTimer(pumpInterval) @@ -272,7 +270,7 @@ func (c *folderSummaryService) calculateSummaries(ctx context.Context) { for _, folder := range c.foldersToHandle() { select { case <-ctx.Done(): - return + return ctx.Err() default: } c.sendSummary(ctx, folder) @@ -288,7 +286,7 @@ func (c *folderSummaryService) calculateSummaries(ctx context.Context) { c.sendSummary(ctx, folder) case <-ctx.Done(): - return + return ctx.Err() } } } diff --git a/lib/model/indexsender.go b/lib/model/indexsender.go index 58a1c6954..4549452e4 100644 --- a/lib/model/indexsender.go +++ b/lib/model/indexsender.go @@ -12,17 +12,15 @@ import ( "sync" "time" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" - "github.com/syncthing/syncthing/lib/util" ) type indexSender struct { - suture.Service conn protocol.Connection folder string folderIsReceiveEncrypted bool @@ -36,7 +34,7 @@ type indexSender struct { resumeChan chan *db.FileSet } -func (s *indexSender) serve(ctx context.Context) { +func (s *indexSender) Serve(ctx context.Context) error { var err error l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence) @@ -59,9 +57,9 @@ func (s *indexSender) serve(ctx context.Context) { for err == nil { select { case <-ctx.Done(): - return + return ctx.Err() case <-s.connClosed: - return + return nil default: } @@ -72,9 +70,9 @@ func (s *indexSender) serve(ctx context.Context) { if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { select { case <-ctx.Done(): - return + return ctx.Err() case <-s.connClosed: - return + return nil case <-evChan: case <-ticker.C: case <-s.pauseChan: @@ -95,6 +93,8 @@ func (s *indexSender) serve(ctx context.Context) { // time to batch them up a little. time.Sleep(250 * time.Millisecond) } + + return nil } // Complete implements the suture.IsCompletable interface. When Serve terminates @@ -333,7 +333,6 @@ func (r *indexSenderRegistry) startLocked(folderID string, fset *db.FileSet, sta pauseChan: make(chan struct{}), resumeChan: make(chan *db.FileSet), } - is.Service = util.AsService(is.serve, is.String()) is.token = r.sup.Add(is) r.indexSenders[folderID] = is } diff --git a/lib/model/model.go b/lib/model/model.go index 292d1b2b4..7cbffd19f 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -22,7 +22,7 @@ import ( "unicode" "github.com/pkg/errors" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/connections" @@ -36,6 +36,7 @@ import ( "github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/ur/contract" + "github.com/syncthing/syncthing/lib/util" "github.com/syncthing/syncthing/lib/versioner" ) @@ -46,6 +47,7 @@ const ( ) type service interface { + suture.Service BringToFront(string) Override() Revert() @@ -53,8 +55,6 @@ type service interface { SchedulePull() // something relevant changed, we should try a pull Jobs(page, perpage int) ([]string, []string, int) // In progress, Queued, skipped Scan(subs []string) error - Serve() - Stop() Errors() []FileError WatchError() error ScheduleForceRescan(path string) @@ -154,7 +154,9 @@ type model struct { remotePausedFolders map[protocol.DeviceID]map[string]struct{} // deviceID -> folders indexSenders map[protocol.DeviceID]*indexSenderRegistry - foldersRunning int32 // for testing only + // for testing only + foldersRunning int32 + started chan struct{} } type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem, events.Logger, *byteSemaphore) service @@ -192,13 +194,12 @@ var ( // where it sends index information to connected peers and responds to requests // for file data without altering the local folder in any way. func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersion string, ldb *db.Lowlevel, protectedFiles []string, evLogger events.Logger) Model { + spec := util.Spec() + spec.EventHook = func(e suture.Event) { + l.Debugln(e) + } m := &model{ - Supervisor: suture.New("model", suture.Spec{ - Log: func(line string) { - l.Debugln(line) - }, - PassThroughPanics: true, - }), + Supervisor: suture.New("model", spec), // constructor parameters cfg: cfg, @@ -237,26 +238,20 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), remotePausedFolders: make(map[protocol.DeviceID]map[string]struct{}), indexSenders: make(map[protocol.DeviceID]*indexSenderRegistry), + + // for testing only + started: make(chan struct{}), } for devID := range cfg.Devices() { m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID.String()) } m.Add(m.progressEmitter) + m.Add(util.AsService(m.serve, m.String())) return m } -func (m *model) Serve() { - m.onServe() - m.Supervisor.Serve() -} - -func (m *model) ServeBackground() { - m.onServe() - m.Supervisor.ServeBackground() -} - -func (m *model) onServe() { +func (m *model) serve(ctx context.Context) error { // Add and start folders cacheIgnoredFiles := m.cfg.Options().CacheIgnoredFiles for _, folderCfg := range m.cfg.Folders() { @@ -267,11 +262,11 @@ func (m *model) onServe() { m.newFolder(folderCfg, cacheIgnoredFiles) } m.cfg.Subscribe(m) -} -func (m *model) Stop() { + close(m.started) + <-ctx.Done() + m.cfg.Unsubscribe(m) - m.Supervisor.Stop() m.pmut.RLock() closed := make([]chan struct{}, 0, len(m.conn)) for id, conn := range m.conn { @@ -282,6 +277,7 @@ func (m *model) Stop() { for _, c := range closed { <-c } + return nil } // StartDeadlockDetector starts a deadlock detector on the models locks which diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 98097ce10..ae9487868 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -120,7 +120,7 @@ func createTmpWrapper(cfg config.Configuration) config.Wrapper { return wrapper } -func newState(cfg config.Configuration) *model { +func newState(cfg config.Configuration) *testModel { wcfg := createTmpWrapper(cfg) m := setupModel(wcfg) @@ -1396,7 +1396,7 @@ func TestAutoAcceptEnc(t *testing.T) { } } -func changeIgnores(t *testing.T, m *model, expected []string) { +func changeIgnores(t *testing.T, m *testModel, expected []string) { arrEqual := func(a, b []string) bool { if len(a) != len(b) { return false @@ -4205,7 +4205,7 @@ func TestNeedMetaAfterIndexReset(t *testing.T) { func TestCcCheckEncryption(t *testing.T) { w, fcfg := tmpDefaultWrapper() m := setupModel(w) - m.Stop() + m.cancel() defer cleanupModel(m) pw := "foo" diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go index 263674008..56bf92e27 100644 --- a/lib/model/progressemitter.go +++ b/lib/model/progressemitter.go @@ -11,18 +11,13 @@ import ( "fmt" "time" - "github.com/thejerf/suture" - "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" - "github.com/syncthing/syncthing/lib/util" ) type ProgressEmitter struct { - suture.Service - cfg config.Wrapper registry map[string]map[string]*sharedPullerState // folder: name: puller interval time.Duration @@ -60,7 +55,6 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi evLogger: evLogger, mut: sync.NewMutex(), } - t.Service = util.AsService(t.serve, t.String()) t.CommitConfiguration(config.Configuration{}, cfg.RawCopy()) @@ -69,7 +63,7 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi // serve starts the progress emitter which starts emitting DownloadProgress // events as the progress happens. -func (t *ProgressEmitter) serve(ctx context.Context) { +func (t *ProgressEmitter) Serve(ctx context.Context) error { t.cfg.Subscribe(t) defer t.cfg.Unsubscribe(t) @@ -79,7 +73,7 @@ func (t *ProgressEmitter) serve(ctx context.Context) { select { case <-ctx.Done(): l.Debugln("progress emitter: stopping") - return + return nil case <-t.timer.C: t.mut.Lock() l.Debugln("progress emitter: timer - looking after", len(t.registry)) diff --git a/lib/model/progressemitter_test.go b/lib/model/progressemitter_test.go index ed68d3093..c5c7ee5ba 100644 --- a/lib/model/progressemitter_test.go +++ b/lib/model/progressemitter_test.go @@ -53,9 +53,10 @@ func expectTimeout(w events.Subscription, t *testing.T) { } func TestProgressEmitter(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) evLogger := events.NewLogger() - go evLogger.Serve() - defer evLogger.Stop() + go evLogger.Serve(ctx) + defer cancel() w := evLogger.Subscribe(events.DownloadProgress) @@ -66,8 +67,7 @@ func TestProgressEmitter(t *testing.T) { }) p := NewProgressEmitter(c, evLogger) - go p.Serve() - defer p.Stop() + go p.Serve(ctx) p.interval = 0 expectTimeout(w, t) @@ -118,9 +118,10 @@ func TestSendDownloadProgressMessages(t *testing.T) { fc := &fakeConnection{} + ctx, cancel := context.WithCancel(context.Background()) evLogger := events.NewLogger() - go evLogger.Serve() - defer evLogger.Stop() + go evLogger.Serve(ctx) + defer cancel() p := NewProgressEmitter(c, evLogger) p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"}) diff --git a/lib/model/testutils_test.go b/lib/model/testutils_test.go index 3cd6eede5..1caf7f8be 100644 --- a/lib/model/testutils_test.go +++ b/lib/model/testutils_test.go @@ -7,6 +7,7 @@ package model import ( + "context" "io/ioutil" "os" "testing" @@ -95,13 +96,13 @@ func testFolderConfigFake() config.FolderConfiguration { return cfg } -func setupModelWithConnection() (*model, *fakeConnection, config.FolderConfiguration) { +func setupModelWithConnection() (*testModel, *fakeConnection, config.FolderConfiguration) { w, fcfg := tmpDefaultWrapper() m, fc := setupModelWithConnectionFromWrapper(w) return m, fc, fcfg } -func setupModelWithConnectionFromWrapper(w config.Wrapper) (*model, *fakeConnection) { +func setupModelWithConnectionFromWrapper(w config.Wrapper) (*testModel, *fakeConnection) { m := setupModel(w) fc := addFakeConn(m, device1) @@ -112,31 +113,57 @@ func setupModelWithConnectionFromWrapper(w config.Wrapper) (*model, *fakeConnect return m, fc } -func setupModel(w config.Wrapper) *model { +func setupModel(w config.Wrapper) *testModel { db := db.NewLowlevel(backend.OpenMemory()) m := newModel(w, myID, "syncthing", "dev", db, nil) m.ServeBackground() + <-m.started m.ScanFolders() return m } -func newModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersion string, ldb *db.Lowlevel, protectedFiles []string) *model { - evLogger := events.NewLogger() - m := NewModel(cfg, id, clientName, clientVersion, ldb, protectedFiles, evLogger).(*model) - go evLogger.Serve() - return m +type testModel struct { + *model + cancel context.CancelFunc + evCancel context.CancelFunc + stopped chan struct{} } -func cleanupModel(m *model) { - m.Stop() +func newModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersion string, ldb *db.Lowlevel, protectedFiles []string) *testModel { + evLogger := events.NewLogger() + m := NewModel(cfg, id, clientName, clientVersion, ldb, protectedFiles, evLogger).(*model) + ctx, cancel := context.WithCancel(context.Background()) + go evLogger.Serve(ctx) + return &testModel{ + model: m, + evCancel: cancel, + stopped: make(chan struct{}), + } +} + +func (m *testModel) ServeBackground() { + ctx, cancel := context.WithCancel(context.Background()) + m.cancel = cancel + go func() { + m.model.Serve(ctx) + close(m.stopped) + }() + <-m.started +} + +func cleanupModel(m *testModel) { + if m.cancel != nil { + m.cancel() + <-m.stopped + } + m.evCancel() m.db.Close() - m.evLogger.Stop() os.Remove(m.cfg.ConfigPath()) } -func cleanupModelAndRemoveDir(m *model, dir string) { +func cleanupModelAndRemoveDir(m *testModel, dir string) { cleanupModel(m) os.RemoveAll(dir) } @@ -223,7 +250,7 @@ func dbSnapshot(t *testing.T, m Model, folder string) *db.Snapshot { // reloads when asked to, instead of checking file mtimes. This is // because we will be changing the files on disk often enough that the // mtimes will be unreliable to determine change status. -func folderIgnoresAlwaysReload(m *model, fcfg config.FolderConfiguration) { +func folderIgnoresAlwaysReload(m *testModel, fcfg config.FolderConfiguration) { m.removeFolder(fcfg) fset := db.NewFileSet(fcfg.ID, fcfg.Filesystem(), m.db) ignores := ignore.New(fcfg.Filesystem(), ignore.WithCache(true), ignore.WithChangeDetector(newAlwaysChanged())) @@ -250,7 +277,7 @@ func basicClusterConfig(local, remote protocol.DeviceID, folders ...string) prot return cc } -func localIndexUpdate(m *model, folder string, fs []protocol.FileInfo) { +func localIndexUpdate(m *testModel, folder string, fs []protocol.FileInfo) { m.fmut.RLock() fset := m.folderFiles[folder] m.fmut.RUnlock() diff --git a/lib/nat/service.go b/lib/nat/service.go index 901ceac6d..4c0269c48 100644 --- a/lib/nat/service.go +++ b/lib/nat/service.go @@ -15,19 +15,14 @@ import ( stdsync "sync" "time" - "github.com/thejerf/suture" - "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" - "github.com/syncthing/syncthing/lib/util" ) // Service runs a loop for discovery of IGDs (Internet Gateway Devices) and // setup/renewal of a port mapping. type Service struct { - suture.Service - id protocol.DeviceID cfg config.Wrapper processScheduled chan struct{} @@ -45,8 +40,6 @@ func NewService(id protocol.DeviceID, cfg config.Wrapper) *Service { mut: sync.NewRWMutex(), } - s.Service = util.AsService(s.serve, s.String()) - cfg.Subscribe(s) cfgCopy := cfg.RawCopy() s.CommitConfiguration(cfgCopy, cfgCopy) return s @@ -70,12 +63,10 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool { return true } -func (s *Service) Stop() { - s.cfg.Unsubscribe(s) - s.Service.Stop() -} +func (s *Service) Serve(ctx context.Context) error { + s.cfg.Subscribe(s) + defer s.cfg.Unsubscribe(s) -func (s *Service) serve(ctx context.Context) { announce := stdsync.Once{} timer := time.NewTimer(0) @@ -97,7 +88,7 @@ func (s *Service) serve(ctx context.Context) { mapping.clearAddresses() } s.mut.RUnlock() - return + return ctx.Err() } s.mut.RLock() enabled := s.enabled @@ -351,7 +342,7 @@ func (s *Service) tryNATDevice(ctx context.Context, natd Device, intPort, extPor for i := 0; i < 10; i++ { select { case <-ctx.Done(): - return Address{}, nil + return Address{}, ctx.Err() default: } diff --git a/lib/relay/client/client.go b/lib/relay/client/client.go index 8b40f23b3..2da735062 100644 --- a/lib/relay/client/client.go +++ b/lib/relay/client/client.go @@ -13,7 +13,7 @@ import ( "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/util" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" ) type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient @@ -61,7 +61,7 @@ func newCommonClient(invitations chan protocol.SessionInvitation, serve func(con defer c.cleanup() return serve(ctx) } - c.ServiceWithError = util.AsServiceWithError(newServe, creator) + c.ServiceWithError = util.AsService(newServe, creator) if c.invitations == nil { c.closeInvitationsOnFinish = true c.invitations = make(chan protocol.SessionInvitation) diff --git a/lib/relay/client/dynamic.go b/lib/relay/client/dynamic.go index bce3e32f0..be9fcc897 100644 --- a/lib/relay/client/dynamic.go +++ b/lib/relay/client/dynamic.go @@ -93,7 +93,7 @@ func (c *dynamicClient) serve(ctx context.Context) error { c.client = client c.mut.Unlock() - c.client.Serve() + c.client.Serve(ctx) c.mut.Lock() c.client = nil @@ -104,15 +104,6 @@ func (c *dynamicClient) serve(ctx context.Context) error { return errors.New("could not find a connectable relay") } -func (c *dynamicClient) Stop() { - c.mut.RLock() - if c.client != nil { - c.client.Stop() - } - c.mut.RUnlock() - c.commonClient.Stop() -} - func (c *dynamicClient) Error() error { c.mut.RLock() defer c.mut.RUnlock() diff --git a/lib/relay/client/methods.go b/lib/relay/client/methods.go index 4d3b50b68..297e5e221 100644 --- a/lib/relay/client/methods.go +++ b/lib/relay/client/methods.go @@ -120,11 +120,12 @@ func TestRelay(ctx context.Context, uri *url.URL, certs []tls.Certificate, sleep close(invs) return fmt.Errorf("creating client: %w", err) } - go c.Serve() - defer func() { - c.Stop() + ctx, cancel := context.WithCancel(context.Background()) + go func() { + c.Serve(ctx) close(invs) }() + defer cancel() for i := 0; i < times; i++ { _, err = GetInvitationFromRelay(ctx, uri, id, certs, timeout) diff --git a/lib/scanner/walk_test.go b/lib/scanner/walk_test.go index 2947a481a..dfa367462 100644 --- a/lib/scanner/walk_test.go +++ b/lib/scanner/walk_test.go @@ -69,7 +69,8 @@ func TestWalkSub(t *testing.T) { t.Fatal(err) } - cfg := testConfig() + cfg, cancel := testConfig() + defer cancel() cfg.Subs = []string{"dir2"} cfg.Matcher = ignores fchan := Walk(context.TODO(), cfg) @@ -103,7 +104,8 @@ func TestWalk(t *testing.T) { } t.Log(ignores) - cfg := testConfig() + cfg, cancel := testConfig() + defer cancel() cfg.Matcher = ignores fchan := Walk(context.TODO(), cfg) @@ -487,7 +489,8 @@ func TestWalkReceiveOnly(t *testing.T) { } func walkDir(fs fs.Filesystem, dir string, cfiler CurrentFiler, matcher *ignore.Matcher, localFlags uint32) []protocol.FileInfo { - cfg := testConfig() + cfg, cancel := testConfig() + defer cancel() cfg.Filesystem = fs cfg.Subs = []string{dir} cfg.AutoNormalize = true @@ -596,7 +599,8 @@ func TestStopWalk(t *testing.T) { const numHashers = 4 ctx, cancel := context.WithCancel(context.Background()) - cfg := testConfig() + cfg, cfgCancel := testConfig() + defer cfgCancel() cfg.Filesystem = fs cfg.Hashers = numHashers cfg.ProgressTickIntervalS = -1 // Don't attempt to build the full list of files before starting to scan... @@ -725,7 +729,8 @@ func TestIssue4841(t *testing.T) { } fd.Close() - cfg := testConfig() + cfg, cancel := testConfig() + defer cancel() cfg.Filesystem = fs cfg.AutoNormalize = true cfg.CurrentFiler = fakeCurrentFiler{"foo": { @@ -761,7 +766,8 @@ func TestNotExistingError(t *testing.T) { t.Fatalf("Lstat returned error %v, while nothing should exist there.", err) } - cfg := testConfig() + cfg, cancel := testConfig() + defer cancel() cfg.Subs = []string{sub} fchan := Walk(context.TODO(), cfg) for f := range fchan { @@ -891,12 +897,13 @@ func (fcf fakeCurrentFiler) CurrentFile(name string) (protocol.FileInfo, bool) { return f, ok } -func testConfig() Config { +func testConfig() (Config, context.CancelFunc) { evLogger := events.NewLogger() - go evLogger.Serve() + ctx, cancel := context.WithCancel(context.Background()) + go evLogger.Serve(ctx) return Config{ Filesystem: testFs, Hashers: 2, EventLogger: evLogger, - } + }, cancel } diff --git a/lib/stun/stun.go b/lib/stun/stun.go index 9a73c0ea3..5f53f2299 100644 --- a/lib/stun/stun.go +++ b/lib/stun/stun.go @@ -14,7 +14,6 @@ import ( "github.com/AudriusButkevicius/pfilter" "github.com/ccding/go-stun/stun" - "github.com/thejerf/suture" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/util" @@ -60,8 +59,6 @@ type Subscriber interface { } type Service struct { - suture.Service - name string cfg config.Wrapper subscriber Subscriber @@ -105,28 +102,24 @@ func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Servi natType: NATUnknown, addr: nil, } - s.Service = util.AsService(s.serve, s.String()) return s, otherDataConn } -func (s *Service) Stop() { - _ = s.stunConn.Close() - s.Service.Stop() -} - -func (s *Service) serve(ctx context.Context) { +func (s *Service) Serve(ctx context.Context) error { defer func() { s.setNATType(NATUnknown) s.setExternalAddress(nil, "") }() + util.OnDone(ctx, func() { _ = s.stunConn.Close() }) + timer := time.NewTimer(time.Millisecond) for { disabled: select { case <-ctx.Done(): - return + return ctx.Err() case <-timer.C: } @@ -146,7 +139,7 @@ func (s *Service) serve(ctx context.Context) { // Have we been asked to stop? select { case <-ctx.Done(): - return + return ctx.Err() default: } diff --git a/lib/syncthing/auditservice.go b/lib/syncthing/auditservice.go index 05c85190c..c56dbaf4e 100644 --- a/lib/syncthing/auditservice.go +++ b/lib/syncthing/auditservice.go @@ -12,49 +12,40 @@ import ( "fmt" "io" - "github.com/thejerf/suture" - "github.com/syncthing/syncthing/lib/events" - "github.com/syncthing/syncthing/lib/util" ) // The auditService subscribes to events and writes these in JSON format, one // event per line, to the specified writer. type auditService struct { - suture.Service - w io.Writer // audit destination - sub events.Subscription + w io.Writer // audit destination + evLogger events.Logger } func newAuditService(w io.Writer, evLogger events.Logger) *auditService { - s := &auditService{ - w: w, - sub: evLogger.Subscribe(events.AllEvents), + return &auditService{ + w: w, + evLogger: evLogger, } - s.Service = util.AsService(s.serve, s.String()) - return s } // serve runs the audit service. -func (s *auditService) serve(ctx context.Context) { +func (s *auditService) Serve(ctx context.Context) error { + sub := s.evLogger.Subscribe(events.AllEvents) + defer sub.Unsubscribe() + enc := json.NewEncoder(s.w) for { select { - case ev := <-s.sub.C(): + case ev := <-sub.C(): enc.Encode(ev) case <-ctx.Done(): - return + return ctx.Err() } } } -// Stop stops the audit service. -func (s *auditService) Stop() { - s.Service.Stop() - s.sub.Unsubscribe() -} - func (s *auditService) String() string { return fmt.Sprintf("auditService@%p", s) } diff --git a/lib/syncthing/auditservice_test.go b/lib/syncthing/auditservice_test.go index 13b67bd24..22c46016f 100644 --- a/lib/syncthing/auditservice_test.go +++ b/lib/syncthing/auditservice_test.go @@ -8,6 +8,7 @@ package syncthing import ( "bytes" + "context" "strings" "testing" "time" @@ -18,8 +19,9 @@ import ( func TestAuditService(t *testing.T) { buf := new(bytes.Buffer) evLogger := events.NewLogger() - go evLogger.Serve() - defer evLogger.Stop() + ctx, cancel := context.WithCancel(context.Background()) + go evLogger.Serve(ctx) + defer cancel() sub := evLogger.Subscribe(events.AllEvents) defer sub.Unsubscribe() @@ -28,8 +30,16 @@ func TestAuditService(t *testing.T) { // Make sure the event goes through before creating the service <-sub.C() + auditCtx, auditCancel := context.WithCancel(context.Background()) service := newAuditService(buf, evLogger) - go service.Serve() + done := make(chan struct{}) + go func() { + service.Serve(auditCtx) + close(done) + }() + + // Subscription needs to happen in service.Serve + time.Sleep(10 * time.Millisecond) // Event that should end up in the audit log evLogger.Log(events.ConfigSaved, "the second event") @@ -37,7 +47,8 @@ func TestAuditService(t *testing.T) { // We need to give the events time to arrive, since the channels are buffered etc. time.Sleep(10 * time.Millisecond) - service.Stop() + auditCancel() + <-done // This event should not be logged, since we have stopped. evLogger.Log(events.ConfigSaved, "the third event") diff --git a/lib/syncthing/syncthing.go b/lib/syncthing/syncthing.go index c855b4f47..772ff580f 100644 --- a/lib/syncthing/syncthing.go +++ b/lib/syncthing/syncthing.go @@ -9,6 +9,7 @@ package syncthing import ( "context" "crypto/tls" + "errors" "fmt" "io" "net/http" @@ -19,7 +20,7 @@ import ( "sync" "time" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" "github.com/syncthing/syncthing/lib/api" "github.com/syncthing/syncthing/lib/build" @@ -39,6 +40,7 @@ import ( "github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/upgrade" "github.com/syncthing/syncthing/lib/ur" + "github.com/syncthing/syncthing/lib/util" ) const ( @@ -50,20 +52,6 @@ const ( deviceCertLifetimeDays = 20 * 365 ) -type ExitStatus int - -func (s ExitStatus) AsInt() int { - return int(s) -} - -const ( - ExitSuccess ExitStatus = 0 - ExitError ExitStatus = 1 - ExitNoUpgradeAvailable ExitStatus = 2 - ExitRestart ExitStatus = 3 - ExitUpgrade ExitStatus = 4 -) - type Options struct { AssetDir string AuditWriter io.Writer @@ -78,18 +66,18 @@ type Options struct { } type App struct { - myID protocol.DeviceID - mainService *suture.Supervisor - cfg config.Wrapper - ll *db.Lowlevel - evLogger events.Logger - cert tls.Certificate - opts Options - exitStatus ExitStatus - err error - stopOnce sync.Once - stop chan struct{} - stopped chan struct{} + myID protocol.DeviceID + mainService *suture.Supervisor + cfg config.Wrapper + ll *db.Lowlevel + evLogger events.Logger + cert tls.Certificate + opts Options + exitStatus util.ExitStatus + err error + stopOnce sync.Once + mainServiceCancel context.CancelFunc + stopped chan struct{} } func New(cfg config.Wrapper, dbBackend backend.Backend, evLogger events.Logger, cert tls.Certificate, opts Options) *App { @@ -99,7 +87,6 @@ func New(cfg config.Wrapper, dbBackend backend.Backend, evLogger events.Logger, evLogger: evLogger, opts: opts, cert: cert, - stop: make(chan struct{}), stopped: make(chan struct{}), } close(a.stopped) // Hasn't been started, so shouldn't block on Wait. @@ -112,20 +99,20 @@ func New(cfg config.Wrapper, dbBackend backend.Backend, evLogger events.Logger, func (a *App) Start() error { // Create a main service manager. We'll add things to this as we go along. // We want any logging it does to go through our log system. - a.mainService = suture.New("main", suture.Spec{ - Log: func(line string) { - l.Debugln(line) - }, - PassThroughPanics: true, - }) + spec := util.Spec() + spec.EventHook = func(e suture.Event) { + l.Debugln(e) + } + a.mainService = suture.New("main", spec) // Start the supervisor and wait for it to stop to handle cleanup. a.stopped = make(chan struct{}) - a.mainService.ServeBackground() - go a.run() + ctx, cancel := context.WithCancel(context.Background()) + a.mainServiceCancel = cancel + go a.run(ctx) if err := a.startup(); err != nil { - a.stopWithErr(ExitError, err) + a.stopWithErr(util.ExitError, err) return err } @@ -343,14 +330,9 @@ func (a *App) startup() error { return nil } -func (a *App) run() { - <-a.stop - - if shouldDebug() { - l.Debugln("Services before stop:") - printServiceTree(os.Stdout, a.mainService, 0) - } - a.mainService.Stop() +func (a *App) run(ctx context.Context) { + err := a.mainService.Serve(ctx) + a.handleMainServiceError(err) done := make(chan struct{}) go func() { @@ -368,9 +350,23 @@ func (a *App) run() { close(a.stopped) } +func (a *App) handleMainServiceError(err error) { + if err == nil || errors.Is(err, context.Canceled) { + return + } + var fatalErr *util.FatalErr + if errors.As(err, &fatalErr) { + a.exitStatus = fatalErr.Status + a.err = fatalErr.Err + return + } + a.err = err + a.exitStatus = util.ExitError +} + // Wait blocks until the app stops running. Also returns if the app hasn't been // started yet. -func (a *App) Wait() ExitStatus { +func (a *App) Wait() util.ExitStatus { <-a.stopped return a.exitStatus } @@ -379,7 +375,7 @@ func (a *App) Wait() ExitStatus { // for the app to stop before returning. func (a *App) Error() error { select { - case <-a.stop: + case <-a.stopped: return a.err default: } @@ -388,15 +384,19 @@ func (a *App) Error() error { // Stop stops the app and sets its exit status to given reason, unless the app // was already stopped before. In any case it returns the effective exit status. -func (a *App) Stop(stopReason ExitStatus) ExitStatus { +func (a *App) Stop(stopReason util.ExitStatus) util.ExitStatus { return a.stopWithErr(stopReason, nil) } -func (a *App) stopWithErr(stopReason ExitStatus, err error) ExitStatus { +func (a *App) stopWithErr(stopReason util.ExitStatus, err error) util.ExitStatus { a.stopOnce.Do(func() { a.exitStatus = stopReason a.err = err - close(a.stop) + if shouldDebug() { + l.Debugln("Services before stop:") + printServiceTree(os.Stdout, a.mainService, 0) + } + a.mainServiceCancel() }) <-a.stopped return a.exitStatus @@ -416,7 +416,7 @@ func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscri summaryService := model.NewFolderSummaryService(a.cfg, m, a.myID, a.evLogger) a.mainService.Add(summaryService) - apiSvc := api.New(a.myID, a.cfg, a.opts.AssetDir, tlsDefaultCommonName, m, defaultSub, diskSub, a.evLogger, discoverer, connectionsService, urService, summaryService, errors, systemLog, &controller{a}, a.opts.NoUpgrade) + apiSvc := api.New(a.myID, a.cfg, a.opts.AssetDir, tlsDefaultCommonName, m, defaultSub, diskSub, a.evLogger, discoverer, connectionsService, urService, summaryService, errors, systemLog, a.opts.NoUpgrade) a.mainService.Add(apiSvc) if err := apiSvc.WaitForStart(); err != nil { @@ -440,21 +440,6 @@ func checkShortIDs(cfg config.Wrapper) error { return nil } -// Implements api.Controller -type controller struct{ *App } - -func (e *controller) Restart() { - e.Stop(ExitRestart) -} - -func (e *controller) Shutdown() { - e.Stop(ExitSuccess) -} - -func (e *controller) ExitUpgrading() { - e.Stop(ExitUpgrade) -} - type supervisor interface{ Services() []suture.Service } func printServiceTree(w io.Writer, sup supervisor, level int) { diff --git a/lib/syncthing/syncthing_test.go b/lib/syncthing/syncthing_test.go index b45f8f3a2..4d5c3065e 100644 --- a/lib/syncthing/syncthing_test.go +++ b/lib/syncthing/syncthing_test.go @@ -18,6 +18,7 @@ import ( "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/tlsutil" + "github.com/syncthing/syncthing/lib/util" ) func tempCfgFilename(t *testing.T) string { @@ -86,7 +87,7 @@ func TestStartupFail(t *testing.T) { } done := make(chan struct{}) - var waitE ExitStatus + var waitE util.ExitStatus go func() { waitE = app.Wait() close(done) @@ -98,8 +99,8 @@ func TestStartupFail(t *testing.T) { case <-done: } - if waitE != ExitError { - t.Errorf("Got exit status %v, expected %v", waitE, ExitError) + if waitE != util.ExitError { + t.Errorf("Got exit status %v, expected %v", waitE, util.ExitError) } if err = app.Error(); err != startErr { diff --git a/lib/syncthing/verboseservice.go b/lib/syncthing/verboseservice.go index 80c2a2368..7bcb196b7 100644 --- a/lib/syncthing/verboseservice.go +++ b/lib/syncthing/verboseservice.go @@ -10,49 +10,38 @@ import ( "context" "fmt" - "github.com/thejerf/suture" - "github.com/syncthing/syncthing/lib/events" - "github.com/syncthing/syncthing/lib/util" ) // The verbose logging service subscribes to events and prints these in // verbose format to the console using INFO level. type verboseService struct { - suture.Service - sub events.Subscription + evLogger events.Logger } func newVerboseService(evLogger events.Logger) *verboseService { - s := &verboseService{ - sub: evLogger.Subscribe(events.AllEvents), + return &verboseService{ + evLogger: evLogger, } - s.Service = util.AsService(s.serve, s.String()) - return s } // serve runs the verbose logging service. -func (s *verboseService) serve(ctx context.Context) { +func (s *verboseService) Serve(ctx context.Context) error { + sub := s.evLogger.Subscribe(events.AllEvents) + defer sub.Unsubscribe() for { select { - case ev := <-s.sub.C(): + case ev := <-sub.C(): formatted := s.formatEvent(ev) if formatted != "" { l.Verboseln(formatted) } case <-ctx.Done(): - return + return ctx.Err() } } } -// Stop stops the verbose logging service. -func (s *verboseService) Stop() { - s.Service.Stop() - s.sub.Unsubscribe() - -} - func (s *verboseService) formatEvent(ev events.Event) string { switch ev.Type { case events.DownloadProgress, events.LocalIndexUpdated: diff --git a/lib/ur/failurereporting.go b/lib/ur/failurereporting.go index 30525104f..92de0fdc9 100644 --- a/lib/ur/failurereporting.go +++ b/lib/ur/failurereporting.go @@ -17,9 +17,8 @@ import ( "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/events" - "github.com/syncthing/syncthing/lib/util" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" ) var ( @@ -45,18 +44,15 @@ type FailureHandler interface { } func NewFailureHandler(cfg config.Wrapper, evLogger events.Logger) FailureHandler { - h := &failureHandler{ + return &failureHandler{ cfg: cfg, evLogger: evLogger, optsChan: make(chan config.OptionsConfiguration), buf: make(map[string]*failureStat), } - h.Service = util.AsServiceWithError(h.serve, h.String()) - return h } type failureHandler struct { - suture.Service cfg config.Wrapper evLogger events.Logger optsChan chan config.OptionsConfiguration @@ -68,7 +64,7 @@ type failureStat struct { count int } -func (h *failureHandler) serve(ctx context.Context) error { +func (h *failureHandler) Serve(ctx context.Context) error { go func() { select { case h.optsChan <- h.cfg.Options(): diff --git a/lib/ur/usage_report.go b/lib/ur/usage_report.go index 5f4de3216..a5e947574 100644 --- a/lib/ur/usage_report.go +++ b/lib/ur/usage_report.go @@ -29,9 +29,6 @@ import ( "github.com/syncthing/syncthing/lib/scanner" "github.com/syncthing/syncthing/lib/upgrade" "github.com/syncthing/syncthing/lib/ur/contract" - "github.com/syncthing/syncthing/lib/util" - - "github.com/thejerf/suture" ) // Current version number of the usage report, for acceptance purposes. If @@ -42,7 +39,6 @@ const Version = 3 var StartTime = time.Now() type Service struct { - suture.Service cfg config.Wrapper model model.Model connectionsService connections.Service @@ -51,15 +47,13 @@ type Service struct { } func New(cfg config.Wrapper, m model.Model, connectionsService connections.Service, noUpgrade bool) *Service { - svc := &Service{ + return &Service{ cfg: cfg, model: m, connectionsService: connectionsService, noUpgrade: noUpgrade, forceRun: make(chan struct{}, 1), // Buffered to prevent locking } - svc.Service = util.AsService(svc.serve, svc.String()) - return svc } // ReportData returns the data to be sent in a usage report with the currently @@ -362,7 +356,7 @@ func (s *Service) sendUsageReport(ctx context.Context) error { return nil } -func (s *Service) serve(ctx context.Context) { +func (s *Service) Serve(ctx context.Context) error { s.cfg.Subscribe(s) defer s.cfg.Unsubscribe(s) @@ -370,7 +364,7 @@ func (s *Service) serve(ctx context.Context) { for { select { case <-ctx.Done(): - return + return ctx.Err() case <-s.forceRun: t.Reset(0) case <-t.C: diff --git a/lib/util/utils.go b/lib/util/utils.go index 7b54701e6..d2d512de4 100644 --- a/lib/util/utils.go +++ b/lib/util/utils.go @@ -18,7 +18,7 @@ import ( "github.com/syncthing/syncthing/lib/sync" - "github.com/thejerf/suture" + "github.com/thejerf/suture/v4" ) type defaultParser interface { @@ -229,13 +229,35 @@ func AddressUnspecifiedLess(a, b net.Addr) bool { return aIsUnspecified } -// AsService wraps the given function to implement suture.Service by calling -// that function on serve and closing the passed channel when Stop is called. -func AsService(fn func(ctx context.Context), creator string) suture.Service { - return asServiceWithError(func(ctx context.Context) error { - fn(ctx) - return nil - }, creator) +type FatalErr struct { + Err error + Status ExitStatus +} + +func (e *FatalErr) Error() string { + return e.Err.Error() +} + +func (e *FatalErr) Unwrap() error { + return e.Err +} + +func (e *FatalErr) Is(target error) bool { + return target == suture.ErrTerminateSupervisorTree +} + +type ExitStatus int + +const ( + ExitSuccess ExitStatus = 0 + ExitError ExitStatus = 1 + ExitNoUpgradeAvailable ExitStatus = 2 + ExitRestart ExitStatus = 3 + ExitUpgrade ExitStatus = 4 +) + +func (s ExitStatus) AsInt() int { + return int(s) } type ServiceWithError interface { @@ -245,76 +267,35 @@ type ServiceWithError interface { SetError(error) } -// AsServiceWithError does the same as AsService, except that it keeps track -// of an error returned by the given function. -func AsServiceWithError(fn func(ctx context.Context) error, creator string) ServiceWithError { - return asServiceWithError(fn, creator) -} - -func asServiceWithError(fn func(ctx context.Context) error, creator string) ServiceWithError { - ctx, cancel := context.WithCancel(context.Background()) - s := &service{ - serve: fn, - ctx: ctx, - cancel: cancel, - stopped: make(chan struct{}), +// AsService wraps the given function to implement suture.Service. In addition +// it keeps track of the returned error and allows querying and setting that error. +func AsService(fn func(ctx context.Context) error, creator string) ServiceWithError { + return &service{ creator: creator, + serve: fn, mut: sync.NewMutex(), } - close(s.stopped) // not yet started, don't block on Stop() - return s } type service struct { creator string serve func(ctx context.Context) error - ctx context.Context - cancel context.CancelFunc - stopped chan struct{} err error mut sync.Mutex } -func (s *service) Serve() { +func (s *service) Serve(ctx context.Context) error { s.mut.Lock() - select { - case <-s.ctx.Done(): - s.mut.Unlock() - return - default: - } s.err = nil - s.stopped = make(chan struct{}) s.mut.Unlock() - var err error - defer func() { - if err == context.Canceled { - err = nil - } - s.mut.Lock() - s.err = err - close(s.stopped) - s.mut.Unlock() - }() - err = s.serve(s.ctx) -} + err := s.serve(ctx) -func (s *service) Stop() { s.mut.Lock() - select { - case <-s.ctx.Done(): - s.mut.Unlock() - panic(fmt.Sprintf("Stop called more than once on %v", s)) - default: - s.cancel() - } - - // Cache s.stopped in a variable while we hold the mutex - // to prevent a data race with Serve's resetting it. - stopped := s.stopped + s.err = err s.mut.Unlock() - <-stopped + + return err } func (s *service) Error() error { @@ -331,6 +312,37 @@ func (s *service) SetError(err error) { func (s *service) String() string { return fmt.Sprintf("Service@%p created by %v", s, s.creator) + +} + +// OnDone calls fn when ctx is cancelled. +func OnDone(ctx context.Context, fn func()) { + go func() { + <-ctx.Done() + fn() + }() +} + +type doneService struct { + fn func() +} + +func (s *doneService) Serve(ctx context.Context) error { + <-ctx.Done() + s.fn() + return nil +} + +// OnSupervisorDone calls fn when sup is done. +func OnSupervisorDone(sup *suture.Supervisor, fn func()) { + sup.Add(&doneService{fn}) +} + +func Spec() suture.Spec { + return suture.Spec{ + PassThroughPanics: true, + DontPropagateTermination: false, + } } func CallWithContext(ctx context.Context, fn func() error) error { diff --git a/lib/util/utils_test.go b/lib/util/utils_test.go index b3eb0cc2c..40099c376 100644 --- a/lib/util/utils_test.go +++ b/lib/util/utils_test.go @@ -7,8 +7,6 @@ package util import ( - "context" - "strings" "testing" ) @@ -271,23 +269,6 @@ func TestInspecifiedAddressLess(t *testing.T) { } } -func TestUtilStopTwicePanic(t *testing.T) { - name := "foo" - s := AsService(func(ctx context.Context) { - <-ctx.Done() - }, name) - - go s.Serve() - s.Stop() - - defer func() { - if r := recover(); r == nil || !strings.Contains(r.(string), name) { - t.Fatalf(`expected panic containing "%v", got "%v"`, name, r) - } - }() - s.Stop() -} - func TestFillNil(t *testing.T) { type A struct { Slice []int diff --git a/lib/watchaggregator/aggregator_test.go b/lib/watchaggregator/aggregator_test.go index 4d860c90b..aa6d4f731 100644 --- a/lib/watchaggregator/aggregator_test.go +++ b/lib/watchaggregator/aggregator_test.go @@ -152,8 +152,9 @@ func TestAggregate(t *testing.T) { // TestInProgress checks that ignoring files currently edited by Syncthing works func TestInProgress(t *testing.T) { evLogger := events.NewLogger() - go evLogger.Serve() - defer evLogger.Stop() + ctx, cancel := context.WithCancel(context.Background()) + go evLogger.Serve(ctx) + defer cancel() testCase := func(c chan<- fs.Event) { evLogger.Log(events.ItemStarted, map[string]string{ "item": "inprogress",