chore(stdiscosrv): remove legacy replication

This commit is contained in:
Jakob Borg 2024-09-11 11:43:58 +02:00
parent 6505e123bb
commit 1616edcee3
No known key found for this signature in database
4 changed files with 19 additions and 417 deletions

View File

@ -7,7 +7,6 @@
package main
import (
"bytes"
"context"
"fmt"
"io"
@ -164,9 +163,6 @@ func (s *amqpReceiver) Serve(ctx context.Context) error {
replicationRecvsTotal.WithLabelValues("error").Inc()
return fmt.Errorf("replication unmarshal: %w", err)
}
if bytes.Equal(rec.Key, []byte("<heartbeat>")) {
continue
}
id, err := protocol.DeviceIDFromBytes(rec.Key)
if err != nil {
id, err = protocol.DeviceIDFromString(string(rec.Key))

View File

@ -51,6 +51,10 @@ type apiSrv struct {
notSeenTracker *retryAfterTracker
}
type replicator interface {
send(key *protocol.DeviceID, addrs []DatabaseAddress, seen int64)
}
type requestID int64
func (i requestID) String() string {

View File

@ -10,12 +10,10 @@ import (
"context"
"crypto/tls"
"log"
"net"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"time"
_ "net/http/pprof"
@ -66,11 +64,6 @@ type CLI struct {
Listen string `group:"Listen" help:"Listen address" default:":8443" env:"DISCOVERY_LISTEN"`
MetricsListen string `group:"Listen" help:"Metrics listen address" env:"DISCOVERY_METRICS_LISTEN"`
Replicate []string `group:"Legacy replication" help:"Replication peers, id@address, comma separated" env:"DISCOVERY_REPLICATE"`
ReplicationListen string `group:"Legacy replication" help:"Replication listen address" default:":19200" env:"DISCOVERY_REPLICATION_LISTEN"`
ReplicationCert string `group:"Legacy replication" help:"Certificate file for replication" env:"DISCOVERY_REPLICATION_CERT_FILE"`
ReplicationKey string `group:"Legacy replication" help:"Key file for replication" env:"DISCOVERY_REPLICATION_KEY_FILE"`
AMQPAddress string `group:"AMQP replication" help:"Address to AMQP broker" env:"DISCOVERY_AMQP_ADDRESS"`
DBDir string `group:"Database" help:"Database directory" default:"." env:"DISCOVERY_DB_DIR"`
@ -100,7 +93,10 @@ func main() {
buildInfo.WithLabelValues(build.Version, runtime.Version(), build.User, build.Date.UTC().Format("2006-01-02T15:04:05Z")).Set(1)
cert, err := tls.LoadX509KeyPair(cli.Cert, cli.Key)
var cert tls.Certificate
if !cli.HTTP {
var err error
cert, err = tls.LoadX509KeyPair(cli.Cert, cli.Key)
if os.IsNotExist(err) {
log.Println("Failed to load keypair. Generating one, this might take a while...")
cert, err = tlsutil.NewCertificate(cli.Cert, cli.Key, "stdiscosrv", 20*365)
@ -112,53 +108,6 @@ func main() {
}
devID := protocol.NewDeviceID(cert.Certificate[0])
log.Println("Server device ID is", devID)
replCert := cert
if cli.ReplicationCert != "" && cli.ReplicationKey != "" {
replCert, err = tls.LoadX509KeyPair(cli.ReplicationCert, cli.ReplicationKey)
if err != nil {
log.Fatalln("Failed to load replication keypair:", err)
}
}
replDevID := protocol.NewDeviceID(replCert.Certificate[0])
log.Println("Replication device ID is", replDevID)
// Parse the replication specs, if any.
var allowedReplicationPeers []protocol.DeviceID
var replicationDestinations []string
for _, part := range cli.Replicate {
if part == "" {
continue
}
fields := strings.Split(part, "@")
switch len(fields) {
case 2:
// This is an id@address specification. Grab the address for the
// destination list. Try to resolve it once to catch obvious
// syntax errors here rather than having the sender service fail
// repeatedly later.
_, err := net.ResolveTCPAddr("tcp", fields[1])
if err != nil {
log.Fatalln("Resolving address:", err)
}
replicationDestinations = append(replicationDestinations, fields[1])
fallthrough // N.B.
case 1:
// The first part is always a device ID.
id, err := protocol.DeviceIDFromString(fields[0])
if err != nil {
log.Fatalln("Parsing device ID:", err)
}
if id == protocol.EmptyDeviceID {
log.Fatalf("Missing device ID for peer in %q", part)
}
allowedReplicationPeers = append(allowedReplicationPeers, id)
default:
log.Fatalln("Unrecognized replication spec:", part)
}
}
// Root of the service tree.
@ -182,26 +131,13 @@ func main() {
db := newInMemoryStore(cli.DBDir, cli.DBFlushInterval, s3c)
main.Add(db)
// Start any replication senders.
var repl replicationMultiplexer
for _, dst := range replicationDestinations {
rs := newReplicationSender(dst, replCert, allowedReplicationPeers)
main.Add(rs)
repl = append(repl, rs)
}
// If we have replication configured, start the replication listener.
if len(allowedReplicationPeers) > 0 {
rl := newReplicationListener(cli.ReplicationListen, replCert, allowedReplicationPeers, db)
main.Add(rl)
}
// If we have an AMQP broker, start that
// If we have an AMQP broker for replication, start that
var repl replicator
if cli.AMQPAddress != "" {
clientID := rand.String(10)
kr := newAMQPReplicator(cli.AMQPAddress, clientID, db)
repl = append(repl, kr)
main.Add(kr)
repl = kr
}
// Start the main API server.

View File

@ -1,334 +0,0 @@
// Copyright (C) 2018 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package main
import (
"context"
"crypto/tls"
"encoding/binary"
"fmt"
io "io"
"log"
"net"
"time"
"github.com/syncthing/syncthing/lib/protocol"
)
const (
replicationReadTimeout = time.Minute
replicationWriteTimeout = 30 * time.Second
replicationHeartbeatInterval = time.Second * 30
)
type replicator interface {
send(key *protocol.DeviceID, addrs []DatabaseAddress, seen int64)
}
// a replicationSender tries to connect to the remote address and provide
// them with a feed of replication updates.
type replicationSender struct {
dst string
cert tls.Certificate // our certificate
allowedIDs []protocol.DeviceID
outbox chan ReplicationRecord
}
func newReplicationSender(dst string, cert tls.Certificate, allowedIDs []protocol.DeviceID) *replicationSender {
return &replicationSender{
dst: dst,
cert: cert,
allowedIDs: allowedIDs,
outbox: make(chan ReplicationRecord, replicationOutboxSize),
}
}
func (s *replicationSender) Serve(ctx context.Context) error {
// Sleep a little at startup. Peers often restart at the same time, and
// this avoid the service failing and entering backoff state
// unnecessarily, while also reducing the reconnect rate to something
// reasonable by default.
time.Sleep(2 * time.Second)
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{s.cert},
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: true,
}
// Dial the TLS connection.
conn, err := tls.Dial("tcp", s.dst, tlsCfg)
if err != nil {
log.Println("Replication connect:", err)
return err
}
defer func() {
conn.SetWriteDeadline(time.Now().Add(time.Second))
conn.Close()
}()
// The replication stream is not especially latency sensitive, but it is
// quite a lot of data in small writes. Make it more efficient.
if tcpc, ok := conn.NetConn().(*net.TCPConn); ok {
_ = tcpc.SetNoDelay(false)
}
// Get the other side device ID.
remoteID, err := deviceID(conn)
if err != nil {
log.Println("Replication connect:", err)
return err
}
// Verify it's in the set of allowed device IDs.
if !deviceIDIn(remoteID, s.allowedIDs) {
log.Println("Replication connect: unexpected device ID:", remoteID)
return err
}
heartBeatTicker := time.NewTicker(replicationHeartbeatInterval)
defer heartBeatTicker.Stop()
// Send records.
buf := make([]byte, 1024)
for {
select {
case <-heartBeatTicker.C:
if len(s.outbox) > 0 {
// No need to send heartbeats if there are events/prevrious
// heartbeats to send, they will keep the connection alive.
continue
}
// Empty replication message is the heartbeat:
s.outbox <- ReplicationRecord{}
case rec := <-s.outbox:
// Buffer must hold record plus four bytes for size
size := rec.Size()
if len(buf) < size+4 {
buf = make([]byte, size+4)
}
// Record comes after the four bytes size
n, err := rec.MarshalTo(buf[4:])
if err != nil {
// odd to get an error here, but we haven't sent anything
// yet so it's not fatal
replicationSendsTotal.WithLabelValues("error").Inc()
log.Println("Replication marshal:", err)
continue
}
binary.BigEndian.PutUint32(buf, uint32(n))
// Send
conn.SetWriteDeadline(time.Now().Add(replicationWriteTimeout))
if _, err := conn.Write(buf[:4+n]); err != nil {
replicationSendsTotal.WithLabelValues("error").Inc()
log.Println("Replication write:", err)
// Yes, we are losing the replication event here.
return err
}
replicationSendsTotal.WithLabelValues("success").Inc()
case <-ctx.Done():
return nil
}
}
}
func (s *replicationSender) String() string {
return fmt.Sprintf("replicationSender(%q)", s.dst)
}
func (s *replicationSender) send(key *protocol.DeviceID, ps []DatabaseAddress, seen int64) {
item := ReplicationRecord{
Key: key[:],
Addresses: ps,
Seen: seen,
}
// The send should never block. The inbox is suitably buffered for at
// least a few seconds of stalls, which shouldn't happen in practice.
select {
case s.outbox <- item:
default:
replicationSendsTotal.WithLabelValues("drop").Inc()
}
}
// a replicationMultiplexer sends to multiple replicators
type replicationMultiplexer []replicator
func (m replicationMultiplexer) send(key *protocol.DeviceID, ps []DatabaseAddress, seen int64) {
for _, s := range m {
// each send is nonblocking
s.send(key, ps, seen)
}
}
// replicationListener accepts incoming connections and reads replication
// items from them. Incoming items are applied to the KV store.
type replicationListener struct {
addr string
cert tls.Certificate
allowedIDs []protocol.DeviceID
db database
}
func newReplicationListener(addr string, cert tls.Certificate, allowedIDs []protocol.DeviceID, db database) *replicationListener {
return &replicationListener{
addr: addr,
cert: cert,
allowedIDs: allowedIDs,
db: db,
}
}
func (l *replicationListener) Serve(ctx context.Context) error {
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{l.cert},
ClientAuth: tls.RequestClientCert,
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: true,
}
lst, err := tls.Listen("tcp", l.addr, tlsCfg)
if err != nil {
log.Println("Replication listen:", err)
return err
}
defer lst.Close()
for {
select {
case <-ctx.Done():
return nil
default:
}
// Accept a connection
conn, err := lst.Accept()
if err != nil {
log.Println("Replication accept:", err)
return err
}
// Figure out the other side device ID
remoteID, err := deviceID(conn.(*tls.Conn))
if err != nil {
log.Println("Replication accept:", err)
conn.SetWriteDeadline(time.Now().Add(time.Second))
conn.Close()
continue
}
// Verify it is in the set of allowed device IDs
if !deviceIDIn(remoteID, l.allowedIDs) {
log.Println("Replication accept: unexpected device ID:", remoteID)
conn.SetWriteDeadline(time.Now().Add(time.Second))
conn.Close()
continue
}
go l.handle(ctx, conn)
}
}
func (l *replicationListener) String() string {
return fmt.Sprintf("replicationListener(%q)", l.addr)
}
func (l *replicationListener) handle(ctx context.Context, conn net.Conn) {
defer func() {
conn.SetWriteDeadline(time.Now().Add(time.Second))
conn.Close()
}()
buf := make([]byte, 1024)
for {
select {
case <-ctx.Done():
return
default:
}
conn.SetReadDeadline(time.Now().Add(replicationReadTimeout))
// First four bytes are the size
if _, err := io.ReadFull(conn, buf[:4]); err != nil {
log.Println("Replication read size:", err)
replicationRecvsTotal.WithLabelValues("error").Inc()
return
}
// Read the rest of the record
size := int(binary.BigEndian.Uint32(buf[:4]))
if len(buf) < size {
buf = make([]byte, size)
}
if size == 0 {
// Heartbeat, ignore
continue
}
if _, err := io.ReadFull(conn, buf[:size]); err != nil {
log.Println("Replication read record:", err)
replicationRecvsTotal.WithLabelValues("error").Inc()
return
}
// Unmarshal
var rec ReplicationRecord
if err := rec.Unmarshal(buf[:size]); err != nil {
log.Println("Replication unmarshal:", err)
replicationRecvsTotal.WithLabelValues("error").Inc()
continue
}
id, err := protocol.DeviceIDFromBytes(rec.Key)
if err != nil {
id, err = protocol.DeviceIDFromString(string(rec.Key))
}
if err != nil {
log.Println("Replication device ID:", err)
replicationRecvsTotal.WithLabelValues("error").Inc()
continue
}
// Store
l.db.merge(&id, rec.Addresses, rec.Seen)
replicationRecvsTotal.WithLabelValues("success").Inc()
}
}
func deviceID(conn *tls.Conn) (protocol.DeviceID, error) {
// Handshake may not be complete on the server side yet, which we need
// to get the client certificate.
if !conn.ConnectionState().HandshakeComplete {
if err := conn.Handshake(); err != nil {
return protocol.DeviceID{}, err
}
}
// We expect exactly one certificate.
certs := conn.ConnectionState().PeerCertificates
if len(certs) != 1 {
return protocol.DeviceID{}, fmt.Errorf("unexpected number of certificates (%d != 1)", len(certs))
}
return protocol.NewDeviceID(certs[0].Raw), nil
}
func deviceIDIn(id protocol.DeviceID, ids []protocol.DeviceID) bool {
for _, candidate := range ids {
if id == candidate {
return true
}
}
return false
}