From f283215fce1660753fba9e213a5fee8820137c0c Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 3 Jun 2024 07:13:21 +0200 Subject: [PATCH] cmd/stdiscosrv: Add AMQP replication --- cmd/stdiscosrv/amqp.go | 246 ++++++++++++++++++++++++++++++++++ cmd/stdiscosrv/apisrv.go | 36 ++--- cmd/stdiscosrv/main.go | 23 +++- cmd/stdiscosrv/replication.go | 3 +- cmd/stdiscosrv/stats.go | 14 -- go.mod | 1 + go.sum | 4 + 7 files changed, 293 insertions(+), 34 deletions(-) create mode 100644 cmd/stdiscosrv/amqp.go diff --git a/cmd/stdiscosrv/amqp.go b/cmd/stdiscosrv/amqp.go new file mode 100644 index 000000000..e09919e27 --- /dev/null +++ b/cmd/stdiscosrv/amqp.go @@ -0,0 +1,246 @@ +// Copyright (C) 2024 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package main + +import ( + "context" + "fmt" + "io" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/thejerf/suture/v4" +) + +type amqpReplicator struct { + suture.Service + broker string + sender *amqpSender + receiver *amqpReceiver + outbox chan ReplicationRecord +} + +func newAMQPReplicator(broker, clientID string, db database) *amqpReplicator { + svc := suture.New("amqpReplicator", suture.Spec{PassThroughPanics: true}) + + sender := &amqpSender{ + broker: broker, + clientID: clientID, + outbox: make(chan ReplicationRecord, replicationOutboxSize), + } + svc.Add(sender) + + receiver := &amqpReceiver{ + broker: broker, + clientID: clientID, + db: db, + } + svc.Add(receiver) + + return &amqpReplicator{ + Service: svc, + broker: broker, + sender: sender, + receiver: receiver, + outbox: make(chan ReplicationRecord, replicationOutboxSize), + } +} + +func (s *amqpReplicator) send(key string, ps []DatabaseAddress, seen int64) { + s.sender.send(key, ps, seen) +} + +type amqpSender struct { + broker string + clientID string + outbox chan ReplicationRecord +} + +func (s *amqpSender) Serve(ctx context.Context) error { + conn, ch, err := amqpChannel(s.broker) + if err != nil { + return err + } + defer ch.Close() + defer conn.Close() + + buf := make([]byte, 1024) + for { + select { + case rec := <-s.outbox: + size := rec.Size() + if len(buf) < size { + buf = make([]byte, size) + } + + n, err := rec.MarshalTo(buf) + if err != nil { + replicationSendsTotal.WithLabelValues("error").Inc() + return fmt.Errorf("replication marshal: %w", err) + } + + err = ch.PublishWithContext(ctx, + "discovery", // exchange + "", // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/protobuf", + Body: buf[:n], + AppId: s.clientID, + }) + if err != nil { + replicationSendsTotal.WithLabelValues("error").Inc() + return fmt.Errorf("replication publish: %w", err) + } + + replicationSendsTotal.WithLabelValues("success").Inc() + + case <-ctx.Done(): + return nil + } + } +} + +func (s *amqpSender) String() string { + return fmt.Sprintf("amqpSender(%q)", s.broker) +} + +func (s *amqpSender) send(key string, ps []DatabaseAddress, seen int64) { + item := ReplicationRecord{ + Key: key, + Addresses: ps, + Seen: seen, + } + + // The send should never block. The inbox is suitably buffered for at + // least a few seconds of stalls, which shouldn't happen in practice. + select { + case s.outbox <- item: + default: + replicationSendsTotal.WithLabelValues("drop").Inc() + } +} + +type amqpReceiver struct { + broker string + clientID string + db database +} + +func (s *amqpReceiver) Serve(ctx context.Context) error { + conn, ch, err := amqpChannel(s.broker) + if err != nil { + return err + } + defer ch.Close() + defer conn.Close() + + msgs, err := amqpConsume(ch) + if err != nil { + return err + } + + for { + select { + case msg, ok := <-msgs: + if !ok { + return fmt.Errorf("subscription closed: %w", io.EOF) + } + + // ignore messages from ourself + if msg.AppId == s.clientID { + continue + } + + var rec ReplicationRecord + if err := rec.Unmarshal(msg.Body); err != nil { + replicationRecvsTotal.WithLabelValues("error").Inc() + return fmt.Errorf("replication unmarshal: %w", err) + } + + if err := s.db.merge(rec.Key, rec.Addresses, rec.Seen); err != nil { + return fmt.Errorf("replication database merge: %w", err) + } + + replicationRecvsTotal.WithLabelValues("success").Inc() + + case <-ctx.Done(): + return nil + } + } +} + +func (s *amqpReceiver) String() string { + return fmt.Sprintf("amqpReceiver(%q)", s.broker) +} + +func amqpChannel(dst string) (*amqp.Connection, *amqp.Channel, error) { + conn, err := amqp.Dial(dst) + if err != nil { + return nil, nil, fmt.Errorf("AMQP dial: %w", err) + } + + ch, err := conn.Channel() + if err != nil { + return nil, nil, fmt.Errorf("AMQP channel: %w", err) + } + + err = ch.ExchangeDeclare( + "discovery", // name + "fanout", // type + false, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, nil, fmt.Errorf("AMQP declare exchange: %w", err) + } + + return conn, ch, nil +} + +func amqpConsume(ch *amqp.Channel) (<-chan amqp.Delivery, error) { + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, fmt.Errorf("AMQP declare queue: %w", err) + } + + err = ch.QueueBind( + q.Name, // queue name + "", // routing key + "discovery", // exchange + false, + nil, + ) + if err != nil { + return nil, fmt.Errorf("AMQP bind queue: %w", err) + } + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return nil, fmt.Errorf("AMQP consume: %w", err) + } + + return msgs, nil +} diff --git a/cmd/stdiscosrv/apisrv.go b/cmd/stdiscosrv/apisrv.go index 3da8ee0f1..b8721ae68 100644 --- a/cmd/stdiscosrv/apisrv.go +++ b/cmd/stdiscosrv/apisrv.go @@ -39,12 +39,13 @@ type announcement struct { } type apiSrv struct { - addr string - cert tls.Certificate - db database - listener net.Listener - repl replicator // optional - useHTTP bool + addr string + cert tls.Certificate + db database + listener net.Listener + repl replicator // optional + useHTTP bool + missesIncrease int mapsMut sync.Mutex misses map[string]int32 @@ -60,14 +61,15 @@ type contextKey int const idKey contextKey = iota -func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP bool) *apiSrv { +func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP bool, missesIncrease int) *apiSrv { return &apiSrv{ - addr: addr, - cert: cert, - db: db, - repl: repl, - useHTTP: useHTTP, - misses: make(map[string]int32), + addr: addr, + cert: cert, + db: db, + repl: repl, + useHTTP: useHTTP, + misses: make(map[string]int32), + missesIncrease: missesIncrease, } } @@ -197,14 +199,13 @@ func (s *apiSrv) handleGET(w http.ResponseWriter, req *http.Request) { s.mapsMut.Lock() misses := s.misses[key] if misses < rec.Misses { - misses = rec.Misses + 1 - } else { - misses++ + misses = rec.Misses } + misses += int32(s.missesIncrease) s.misses[key] = misses s.mapsMut.Unlock() - if misses%notFoundMissesWriteInterval == 0 { + if misses >= notFoundMissesWriteInterval { rec.Misses = misses rec.Missed = time.Now().UnixNano() rec.Addresses = nil @@ -444,7 +445,6 @@ func fixupAddresses(remote *net.TCPAddr, addresses []string) []string { // remote is nil, unable to determine host IP continue } - } // If zero port was specified, use remote port. diff --git a/cmd/stdiscosrv/main.go b/cmd/stdiscosrv/main.go index 4890f3bda..0b8c907f5 100644 --- a/cmd/stdiscosrv/main.go +++ b/cmd/stdiscosrv/main.go @@ -22,6 +22,7 @@ import ( _ "github.com/syncthing/syncthing/lib/automaxprocs" "github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/tlsutil" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/thejerf/suture/v4" @@ -80,6 +81,8 @@ func main() { var replKeyFile string var useHTTP bool var largeDB bool + var amqpAddress string + missesIncrease := 1 log.SetOutput(os.Stdout) log.SetFlags(0) @@ -96,6 +99,8 @@ func main() { flag.StringVar(&replCertFile, "replication-cert", "", "Certificate file for replication") flag.StringVar(&replKeyFile, "replication-key", "", "Key file for replication") flag.BoolVar(&largeDB, "large-db", false, "Use larger database settings") + flag.StringVar(&amqpAddress, "amqp-address", "", "Address to AMQP broker") + flag.IntVar(&missesIncrease, "misses-increase", 1, "How many times to increase the misses counter on each miss") showVersion := flag.Bool("version", false, "Show version") flag.Parse() @@ -203,8 +208,24 @@ func main() { main.Add(rl) } + // If we have an AMQP broker, start that + if amqpAddress != "" { + clientID := rand.String(10) + kr := newAMQPReplicator(amqpAddress, clientID, db) + repl = append(repl, kr) + main.Add(kr) + } + + go func() { + for range time.NewTicker(time.Second).C { + for _, r := range repl { + r.send("", nil, time.Now().UnixNano()) + } + } + }() + // Start the main API server. - qs := newAPISrv(listen, cert, db, repl, useHTTP) + qs := newAPISrv(listen, cert, db, repl, useHTTP, missesIncrease) main.Add(qs) // If we have a metrics port configured, start a metrics handler. diff --git a/cmd/stdiscosrv/replication.go b/cmd/stdiscosrv/replication.go index 8d0db3f7b..e7aa2894a 100644 --- a/cmd/stdiscosrv/replication.go +++ b/cmd/stdiscosrv/replication.go @@ -144,10 +144,11 @@ func (s *replicationSender) String() string { return fmt.Sprintf("replicationSender(%q)", s.dst) } -func (s *replicationSender) send(key string, ps []DatabaseAddress, _ int64) { +func (s *replicationSender) send(key string, ps []DatabaseAddress, seen int64) { item := ReplicationRecord{ Key: key, Addresses: ps, + Seen: seen, } // The send should never block. The inbox is suitably buffered for at diff --git a/cmd/stdiscosrv/stats.go b/cmd/stdiscosrv/stats.go index 949a369eb..ba9ccb40d 100644 --- a/cmd/stdiscosrv/stats.go +++ b/cmd/stdiscosrv/stats.go @@ -7,10 +7,7 @@ package main import ( - "os" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" ) var ( @@ -127,15 +124,4 @@ func init() { databaseKeys, databaseStatisticsSeconds, databaseOperations, databaseOperationSeconds, retryAfterHistogram) - - processCollectorOpts := collectors.ProcessCollectorOpts{ - Namespace: "syncthing_discovery", - PidFn: func() (int, error) { - return os.Getpid(), nil - }, - } - - prometheus.MustRegister( - collectors.NewProcessCollector(processCollectorOpts), - ) } diff --git a/go.mod b/go.mod index 3636d1ef8..b23df054a 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/pierrec/lz4/v4 v4.1.21 github.com/prometheus/client_golang v1.19.1 github.com/quic-go/quic-go v0.44.0 + github.com/rabbitmq/amqp091-go v1.10.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/shirou/gopsutil/v3 v3.24.4 github.com/syncthing/notify v0.0.0-20210616190510-c6b7342338d2 diff --git a/go.sum b/go.sum index 9955b733a..4a0e77ede 100644 --- a/go.sum +++ b/go.sum @@ -200,6 +200,8 @@ github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk= github.com/quic-go/quic-go v0.44.0 h1:So5wOr7jyO4vzL2sd8/pD9Kesciv91zSk8BoFngItQ0= github.com/quic-go/quic-go v0.44.0/go.mod h1:z4cx/9Ny9UtGITIPzmPTXh1ULfOyWh4qGQlpnPcWmek= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/riywo/loginshell v0.0.0-20200815045211-7d26008be1ab h1:ZjX6I48eZSFetPb41dHudEyVr5v953N15TsNZXlkcWY= @@ -250,6 +252,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=