feat(discosrv): in-memory storage with S3 backing

This commit is contained in:
Jakob Borg 2024-09-06 11:14:23 +02:00
parent 68a1fd010f
commit aed2c66e52
No known key found for this signature in database
12 changed files with 485 additions and 453 deletions

View File

@ -7,11 +7,14 @@
package main package main
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io" "io"
"log"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/thejerf/suture/v4" "github.com/thejerf/suture/v4"
) )
@ -49,7 +52,7 @@ func newAMQPReplicator(broker, clientID string, db database) *amqpReplicator {
} }
} }
func (s *amqpReplicator) send(key string, ps []DatabaseAddress, seen int64) { func (s *amqpReplicator) send(key *protocol.DeviceID, ps []DatabaseAddress, seen int64) {
s.sender.send(key, ps, seen) s.sender.send(key, ps, seen)
} }
@ -109,9 +112,9 @@ func (s *amqpSender) String() string {
return fmt.Sprintf("amqpSender(%q)", s.broker) return fmt.Sprintf("amqpSender(%q)", s.broker)
} }
func (s *amqpSender) send(key string, ps []DatabaseAddress, seen int64) { func (s *amqpSender) send(key *protocol.DeviceID, ps []DatabaseAddress, seen int64) {
item := ReplicationRecord{ item := ReplicationRecord{
Key: key, Key: key[:],
Addresses: ps, Addresses: ps,
Seen: seen, Seen: seen,
} }
@ -161,8 +164,20 @@ func (s *amqpReceiver) Serve(ctx context.Context) error {
replicationRecvsTotal.WithLabelValues("error").Inc() replicationRecvsTotal.WithLabelValues("error").Inc()
return fmt.Errorf("replication unmarshal: %w", err) return fmt.Errorf("replication unmarshal: %w", err)
} }
if bytes.Equal(rec.Key, []byte("<heartbeat>")) {
continue
}
id, err := protocol.DeviceIDFromBytes(rec.Key)
if err != nil {
id, err = protocol.DeviceIDFromString(string(rec.Key))
}
if err != nil {
log.Println("Replication device ID:", err)
replicationRecvsTotal.WithLabelValues("error").Inc()
continue
}
if err := s.db.merge(rec.Key, rec.Addresses, rec.Seen); err != nil { if err := s.db.merge(&id, rec.Addresses, rec.Seen); err != nil {
return fmt.Errorf("replication database merge: %w", err) return fmt.Errorf("replication database merge: %w", err)
} }

View File

@ -46,11 +46,9 @@ type apiSrv struct {
repl replicator // optional repl replicator // optional
useHTTP bool useHTTP bool
compression bool compression bool
missesIncrease int
gzipWriters sync.Pool gzipWriters sync.Pool
seenTracker *retryAfterTracker
mapsMut sync.Mutex notSeenTracker *retryAfterTracker
misses map[string]int32
} }
type requestID int64 type requestID int64
@ -63,20 +61,30 @@ type contextKey int
const idKey contextKey = iota const idKey contextKey = iota
func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP, compression bool, missesIncrease int) *apiSrv { func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP, compression bool) *apiSrv {
return &apiSrv{ return &apiSrv{
addr: addr, addr: addr,
cert: cert, cert: cert,
db: db, db: db,
repl: repl, repl: repl,
useHTTP: useHTTP, useHTTP: useHTTP,
compression: compression, compression: compression,
misses: make(map[string]int32), seenTracker: &retryAfterTracker{
missesIncrease: missesIncrease, name: "seenTracker",
bucketStarts: time.Now(),
desiredRate: 250,
currentDelay: notFoundRetryUnknownMinSeconds,
},
notSeenTracker: &retryAfterTracker{
name: "notSeenTracker",
bucketStarts: time.Now(),
desiredRate: 250,
currentDelay: notFoundRetryUnknownMaxSeconds / 2,
},
} }
} }
func (s *apiSrv) Serve(_ context.Context) error { func (s *apiSrv) Serve(ctx context.Context) error {
if s.useHTTP { if s.useHTTP {
listener, err := net.Listen("tcp", s.addr) listener, err := net.Listen("tcp", s.addr)
if err != nil { if err != nil {
@ -110,6 +118,11 @@ func (s *apiSrv) Serve(_ context.Context) error {
ErrorLog: log.New(io.Discard, "", 0), ErrorLog: log.New(io.Discard, "", 0),
} }
go func() {
<-ctx.Done()
srv.Shutdown(context.Background())
}()
err := srv.Serve(s.listener) err := srv.Serve(s.listener)
if err != nil { if err != nil {
log.Println("Serve:", err) log.Println("Serve:", err)
@ -186,8 +199,7 @@ func (s *apiSrv) handleGET(w http.ResponseWriter, req *http.Request) {
return return
} }
key := deviceID.String() rec, err := s.db.get(&deviceID)
rec, err := s.db.get(key)
if err != nil { if err != nil {
// some sort of internal error // some sort of internal error
lookupRequestsTotal.WithLabelValues("internal_error").Inc() lookupRequestsTotal.WithLabelValues("internal_error").Inc()
@ -197,27 +209,14 @@ func (s *apiSrv) handleGET(w http.ResponseWriter, req *http.Request) {
} }
if len(rec.Addresses) == 0 { if len(rec.Addresses) == 0 {
lookupRequestsTotal.WithLabelValues("not_found").Inc() var afterS int
if rec.Seen == 0 {
s.mapsMut.Lock() afterS = s.notSeenTracker.retryAfterS()
misses := s.misses[key] lookupRequestsTotal.WithLabelValues("not_found_ever").Inc()
if misses < rec.Misses { } else {
misses = rec.Misses afterS = s.seenTracker.retryAfterS()
lookupRequestsTotal.WithLabelValues("not_found_recent").Inc()
} }
misses += int32(s.missesIncrease)
s.misses[key] = misses
s.mapsMut.Unlock()
if misses >= notFoundMissesWriteInterval {
rec.Misses = misses
rec.Missed = time.Now().UnixNano()
rec.Addresses = nil
// rec.Seen retained from get
s.db.put(key, rec)
}
afterS := notFoundRetryAfterSeconds(int(misses))
retryAfterHistogram.Observe(float64(afterS))
w.Header().Set("Retry-After", strconv.Itoa(afterS)) w.Header().Set("Retry-After", strconv.Itoa(afterS))
http.Error(w, "Not Found", http.StatusNotFound) http.Error(w, "Not Found", http.StatusNotFound)
return return
@ -301,7 +300,6 @@ func (s *apiSrv) Stop() {
} }
func (s *apiSrv) handleAnnounce(deviceID protocol.DeviceID, addresses []string) error { func (s *apiSrv) handleAnnounce(deviceID protocol.DeviceID, addresses []string) error {
key := deviceID.String()
now := time.Now() now := time.Now()
expire := now.Add(addressExpiryTime).UnixNano() expire := now.Add(addressExpiryTime).UnixNano()
@ -317,9 +315,9 @@ func (s *apiSrv) handleAnnounce(deviceID protocol.DeviceID, addresses []string)
seen := now.UnixNano() seen := now.UnixNano()
if s.repl != nil { if s.repl != nil {
s.repl.send(key, dbAddrs, seen) s.repl.send(&deviceID, dbAddrs, seen)
} }
return s.db.merge(key, dbAddrs, seen) return s.db.merge(&deviceID, dbAddrs, seen)
} }
func handlePing(w http.ResponseWriter, _ *http.Request) { func handlePing(w http.ResponseWriter, _ *http.Request) {
@ -503,15 +501,44 @@ func errorRetryAfterString() string {
return strconv.Itoa(errorRetryAfterSeconds + rand.Intn(errorRetryFuzzSeconds)) return strconv.Itoa(errorRetryAfterSeconds + rand.Intn(errorRetryFuzzSeconds))
} }
func notFoundRetryAfterSeconds(misses int) int {
retryAfterS := notFoundRetryMinSeconds + notFoundRetryIncSeconds*misses
if retryAfterS > notFoundRetryMaxSeconds {
retryAfterS = notFoundRetryMaxSeconds
}
retryAfterS += rand.Intn(notFoundRetryFuzzSeconds)
return retryAfterS
}
func reannounceAfterString() string { func reannounceAfterString() string {
return strconv.Itoa(reannounceAfterSeconds + rand.Intn(reannounzeFuzzSeconds)) return strconv.Itoa(reannounceAfterSeconds + rand.Intn(reannounzeFuzzSeconds))
} }
type retryAfterTracker struct {
name string
desiredRate float64 // requests per second
mut sync.Mutex
lastCount int // requests in the last bucket
curCount int // requests in the current bucket
bucketStarts time.Time // start of the current bucket
currentDelay int // current delay in seconds
}
func (t *retryAfterTracker) retryAfterS() int {
now := time.Now()
t.mut.Lock()
if durS := now.Sub(t.bucketStarts).Seconds(); durS > float64(t.currentDelay) {
t.bucketStarts = now
t.lastCount = t.curCount
lastRate := float64(t.lastCount) / durS
switch {
case t.currentDelay > notFoundRetryUnknownMinSeconds &&
lastRate < 0.75*t.desiredRate:
t.currentDelay = max(8*t.currentDelay/10, notFoundRetryUnknownMinSeconds)
case t.currentDelay < notFoundRetryUnknownMaxSeconds &&
lastRate > 1.25*t.desiredRate:
t.currentDelay = min(3*t.currentDelay/2, notFoundRetryUnknownMaxSeconds)
}
t.curCount = 0
}
if t.curCount == 0 {
retryAfterLevel.WithLabelValues(t.name).Set(float64(t.currentDelay))
}
t.curCount++
t.mut.Unlock()
return t.currentDelay + rand.Intn(t.currentDelay/4)
}

View File

@ -106,14 +106,11 @@ func addr(host string, port int) *net.TCPAddr {
} }
func BenchmarkAPIRequests(b *testing.B) { func BenchmarkAPIRequests(b *testing.B) {
db, err := newLevelDBStore(b.TempDir()) db := newInMemoryStore(b.TempDir(), 0)
if err != nil {
b.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
go db.Serve(ctx) go db.Serve(ctx)
api := newAPISrv("127.0.0.1:0", tls.Certificate{}, db, nil, true, true, 1) api := newAPISrv("127.0.0.1:0", tls.Certificate{}, db, nil, true, true)
srv := httptest.NewServer(http.HandlerFunc(api.handler)) srv := httptest.NewServer(http.HandlerFunc(api.handler))
kf := b.TempDir() + "/cert" kf := b.TempDir() + "/cert"

View File

@ -10,17 +10,25 @@
package main package main
import ( import (
"bufio"
"context" "context"
"encoding/binary"
"io"
"log" "log"
"net" "net"
"net/url" "net/url"
"os"
"path"
"sort" "sort"
"time" "time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/puzpuzpuz/xsync/v3"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sliceutil" "github.com/syncthing/syncthing/lib/sliceutil"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
) )
type clock interface { type clock interface {
@ -34,270 +42,296 @@ func (defaultClock) Now() time.Time {
} }
type database interface { type database interface {
put(key string, rec DatabaseRecord) error put(key *protocol.DeviceID, rec DatabaseRecord) error
merge(key string, addrs []DatabaseAddress, seen int64) error merge(key *protocol.DeviceID, addrs []DatabaseAddress, seen int64) error
get(key string) (DatabaseRecord, error) get(key *protocol.DeviceID) (DatabaseRecord, error)
} }
type levelDBStore struct { type inMemoryStore struct {
db *leveldb.DB m *xsync.MapOf[protocol.DeviceID, DatabaseRecord]
inbox chan func() dir string
clock clock flushInterval time.Duration
marshalBuf []byte clock clock
} }
func newLevelDBStore(dir string) (*levelDBStore, error) { func newInMemoryStore(dir string, flushInterval time.Duration) *inMemoryStore {
db, err := leveldb.OpenFile(dir, levelDBOptions) s := &inMemoryStore{
if err != nil { m: xsync.NewMapOf[protocol.DeviceID, DatabaseRecord](),
return nil, err dir: dir,
flushInterval: flushInterval,
clock: defaultClock{},
} }
return &levelDBStore{ err := s.read()
db: db, if os.IsNotExist(err) {
inbox: make(chan func(), 16), // Try to read from AWS
clock: defaultClock{}, fd, cerr := os.Create(path.Join(s.dir, "records.db"))
}, nil if cerr != nil {
} log.Println("Error creating database file:", err)
return s
func newMemoryLevelDBStore() (*levelDBStore, error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
return nil, err
}
return &levelDBStore{
db: db,
inbox: make(chan func(), 16),
clock: defaultClock{},
}, nil
}
func (s *levelDBStore) put(key string, rec DatabaseRecord) error {
t0 := time.Now()
defer func() {
databaseOperationSeconds.WithLabelValues(dbOpPut).Observe(time.Since(t0).Seconds())
}()
rc := make(chan error)
s.inbox <- func() {
size := rec.Size()
if len(s.marshalBuf) < size {
s.marshalBuf = make([]byte, size)
} }
n, _ := rec.MarshalTo(s.marshalBuf) if err := s3Download(fd); err != nil {
rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil) log.Printf("Error reading database from S3: %v", err)
}
_ = fd.Close()
err = s.read()
} }
err := <-rc
if err != nil { if err != nil {
databaseOperations.WithLabelValues(dbOpPut, dbResError).Inc() log.Println("Error reading database:", err)
} else {
databaseOperations.WithLabelValues(dbOpPut, dbResSuccess).Inc()
} }
s.calculateStatistics()
return err return s
} }
func (s *levelDBStore) merge(key string, addrs []DatabaseAddress, seen int64) error { func (s *inMemoryStore) put(key *protocol.DeviceID, rec DatabaseRecord) error {
t0 := time.Now()
s.m.Store(*key, rec)
databaseOperations.WithLabelValues(dbOpPut, dbResSuccess).Inc()
databaseOperationSeconds.WithLabelValues(dbOpPut).Observe(time.Since(t0).Seconds())
return nil
}
func (s *inMemoryStore) merge(key *protocol.DeviceID, addrs []DatabaseAddress, seen int64) error {
t0 := time.Now() t0 := time.Now()
defer func() {
databaseOperationSeconds.WithLabelValues(dbOpMerge).Observe(time.Since(t0).Seconds())
}()
rc := make(chan error)
newRec := DatabaseRecord{ newRec := DatabaseRecord{
Addresses: addrs, Addresses: addrs,
Seen: seen, Seen: seen,
} }
s.inbox <- func() { oldRec, _ := s.m.Load(*key)
// grab the existing record newRec = merge(newRec, oldRec)
oldRec, err := s.get(key) s.m.Store(*key, newRec)
if err != nil {
// "not found" is not an error from get, so this is serious
// stuff only
rc <- err
return
}
newRec = merge(newRec, oldRec)
// We replicate s.put() functionality here ourselves instead of databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
// calling it because we want to serialize our get above together databaseOperationSeconds.WithLabelValues(dbOpMerge).Observe(time.Since(t0).Seconds())
// with the put in the same function.
size := newRec.Size()
if len(s.marshalBuf) < size {
s.marshalBuf = make([]byte, size)
}
n, _ := newRec.MarshalTo(s.marshalBuf)
rc <- s.db.Put([]byte(key), s.marshalBuf[:n], nil)
}
err := <-rc return nil
if err != nil {
databaseOperations.WithLabelValues(dbOpMerge, dbResError).Inc()
} else {
databaseOperations.WithLabelValues(dbOpMerge, dbResSuccess).Inc()
}
return err
} }
func (s *levelDBStore) get(key string) (DatabaseRecord, error) { func (s *inMemoryStore) get(key *protocol.DeviceID) (DatabaseRecord, error) {
t0 := time.Now() t0 := time.Now()
defer func() { defer func() {
databaseOperationSeconds.WithLabelValues(dbOpGet).Observe(time.Since(t0).Seconds()) databaseOperationSeconds.WithLabelValues(dbOpGet).Observe(time.Since(t0).Seconds())
}() }()
keyBs := []byte(key) rec, ok := s.m.Load(*key)
val, err := s.db.Get(keyBs, nil) if !ok {
if err == leveldb.ErrNotFound {
databaseOperations.WithLabelValues(dbOpGet, dbResNotFound).Inc() databaseOperations.WithLabelValues(dbOpGet, dbResNotFound).Inc()
return DatabaseRecord{}, nil return DatabaseRecord{}, nil
} }
if err != nil {
databaseOperations.WithLabelValues(dbOpGet, dbResError).Inc()
return DatabaseRecord{}, err
}
var rec DatabaseRecord
if err := rec.Unmarshal(val); err != nil {
databaseOperations.WithLabelValues(dbOpGet, dbResUnmarshalError).Inc()
return DatabaseRecord{}, nil
}
rec.Addresses = expire(rec.Addresses, s.clock.Now().UnixNano()) rec.Addresses = expire(rec.Addresses, s.clock.Now().UnixNano())
databaseOperations.WithLabelValues(dbOpGet, dbResSuccess).Inc() databaseOperations.WithLabelValues(dbOpGet, dbResSuccess).Inc()
return rec, nil return rec, nil
} }
func (s *levelDBStore) Serve(ctx context.Context) error { func (s *inMemoryStore) Serve(ctx context.Context) error {
t := time.NewTimer(0) t := time.NewTimer(s.flushInterval)
defer t.Stop() defer t.Stop()
defer s.db.Close()
// Start the statistics serve routine. It will exit with us when if s.flushInterval <= 0 {
// statisticsTrigger is closed. t.Stop()
statisticsTrigger := make(chan struct{}) }
statisticsDone := make(chan struct{})
go s.statisticsServe(statisticsTrigger, statisticsDone)
loop: loop:
for { for {
select { select {
case fn := <-s.inbox:
// Run function in serialized order.
fn()
case <-t.C: case <-t.C:
// Trigger the statistics routine to do its thing in the if err := s.write(); err != nil {
// background. log.Println("Error writing database:", err)
statisticsTrigger <- struct{}{} }
s.calculateStatistics()
case <-statisticsDone: t.Reset(s.flushInterval)
// The statistics routine is done with one iteratation, schedule
// the next.
t.Reset(databaseStatisticsInterval)
case <-ctx.Done(): case <-ctx.Done():
// We're done. // We're done.
close(statisticsTrigger)
break loop break loop
} }
} }
// Also wait for statisticsServe to return return s.write()
<-statisticsDone }
func (s *inMemoryStore) calculateStatistics() {
t0 := time.Now()
nowNanos := t0.UnixNano()
cutoff24h := t0.Add(-24 * time.Hour).UnixNano()
cutoff1w := t0.Add(-7 * 24 * time.Hour).UnixNano()
current, currentIPv4, currentIPv6, last24h, last1w, errors := 0, 0, 0, 0, 0, 0
s.m.Range(func(key protocol.DeviceID, rec DatabaseRecord) bool {
// If there are addresses that have not expired it's a current
// record, otherwise account it based on when it was last seen
// (last 24 hours or last week) or finally as inactice.
addrs := expire(rec.Addresses, nowNanos)
switch {
case len(addrs) > 0:
current++
seenIPv4, seenIPv6 := false, false
for _, addr := range addrs {
uri, err := url.Parse(addr.Address)
if err != nil {
continue
}
host, _, err := net.SplitHostPort(uri.Host)
if err != nil {
continue
}
if ip := net.ParseIP(host); ip != nil && ip.To4() != nil {
seenIPv4 = true
} else if ip != nil {
seenIPv6 = true
}
if seenIPv4 && seenIPv6 {
break
}
}
if seenIPv4 {
currentIPv4++
}
if seenIPv6 {
currentIPv6++
}
case rec.Seen > cutoff24h:
last24h++
case rec.Seen > cutoff1w:
last1w++
default:
// drop the record if it's older than a week
s.m.Delete(key)
}
return true
})
databaseKeys.WithLabelValues("current").Set(float64(current))
databaseKeys.WithLabelValues("currentIPv4").Set(float64(currentIPv4))
databaseKeys.WithLabelValues("currentIPv6").Set(float64(currentIPv6))
databaseKeys.WithLabelValues("last24h").Set(float64(last24h))
databaseKeys.WithLabelValues("last1w").Set(float64(last1w))
databaseKeys.WithLabelValues("error").Set(float64(errors))
databaseStatisticsSeconds.Set(time.Since(t0).Seconds())
}
func (s *inMemoryStore) write() (err error) {
t0 := time.Now()
defer func() {
if err == nil {
databaseWriteSeconds.Set(time.Since(t0).Seconds())
databaseLastWritten.Set(float64(t0.Unix()))
}
}()
dbf := path.Join(s.dir, "records.db")
fd, err := os.Create(dbf + ".tmp")
if err != nil {
return err
}
bw := bufio.NewWriter(fd)
var buf []byte
var rangeErr error
now := s.clock.Now().UnixNano()
cutoff1w := s.clock.Now().Add(-7 * 24 * time.Hour).UnixNano()
s.m.Range(func(key protocol.DeviceID, value DatabaseRecord) bool {
if value.Seen < cutoff1w {
// drop the record if it's older than a week
return true
}
rec := ReplicationRecord{
Key: key[:],
Addresses: expire(value.Addresses, now),
Seen: value.Seen,
}
s := rec.Size()
if s+4 > len(buf) {
buf = make([]byte, s+4)
}
n, err := rec.MarshalTo(buf[4:])
if err != nil {
rangeErr = err
return false
}
binary.BigEndian.PutUint32(buf, uint32(n))
if _, err := bw.Write(buf[:n+4]); err != nil {
rangeErr = err
return false
}
return true
})
if rangeErr != nil {
_ = fd.Close()
return rangeErr
}
if err := bw.Flush(); err != nil {
_ = fd.Close
return err
}
if err := fd.Close(); err != nil {
return err
}
if err := os.Rename(dbf+".tmp", dbf); err != nil {
return err
}
if os.Getenv("PODINDEX") == "0" {
// Upload to S3
fd, err = os.Open(dbf)
if err != nil {
log.Printf("Error uploading database to S3: %v", err)
return nil
}
defer fd.Close()
if err := s3Upload(fd); err != nil {
log.Printf("Error uploading database to S3: %v", err)
}
}
return nil return nil
} }
func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- struct{}) { func (s *inMemoryStore) read() error {
defer close(done) fd, err := os.Open(path.Join(s.dir, "records.db"))
if err != nil {
return err
}
defer fd.Close()
for range trigger { br := bufio.NewReader(fd)
t0 := time.Now() var buf []byte
nowNanos := t0.UnixNano() for {
cutoff24h := t0.Add(-24 * time.Hour).UnixNano() var n uint32
cutoff1w := t0.Add(-7 * 24 * time.Hour).UnixNano() if err := binary.Read(br, binary.BigEndian, &n); err != nil {
cutoff2Mon := t0.Add(-60 * 24 * time.Hour).UnixNano() if err == io.EOF {
current, currentIPv4, currentIPv6, last24h, last1w, inactive, errors := 0, 0, 0, 0, 0, 0, 0 break
iter := s.db.NewIterator(&util.Range{}, nil)
for iter.Next() {
// Attempt to unmarshal the record and count the
// failure if there's something wrong with it.
var rec DatabaseRecord
if err := rec.Unmarshal(iter.Value()); err != nil {
errors++
continue
}
// If there are addresses that have not expired it's a current
// record, otherwise account it based on when it was last seen
// (last 24 hours or last week) or finally as inactice.
addrs := expire(rec.Addresses, nowNanos)
switch {
case len(addrs) > 0:
current++
seenIPv4, seenIPv6 := false, false
for _, addr := range addrs {
uri, err := url.Parse(addr.Address)
if err != nil {
continue
}
host, _, err := net.SplitHostPort(uri.Host)
if err != nil {
continue
}
if ip := net.ParseIP(host); ip != nil && ip.To4() != nil {
seenIPv4 = true
} else if ip != nil {
seenIPv6 = true
}
if seenIPv4 && seenIPv6 {
break
}
}
if seenIPv4 {
currentIPv4++
}
if seenIPv6 {
currentIPv6++
}
case rec.Seen > cutoff24h:
last24h++
case rec.Seen > cutoff1w:
last1w++
case rec.Seen > cutoff2Mon:
inactive++
case rec.Missed < cutoff2Mon:
// It hasn't been seen lately and we haven't recorded
// someone asking for this device in a long time either;
// delete the record.
if err := s.db.Delete(iter.Key(), nil); err != nil {
databaseOperations.WithLabelValues(dbOpDelete, dbResError).Inc()
} else {
databaseOperations.WithLabelValues(dbOpDelete, dbResSuccess).Inc()
}
default:
inactive++
} }
return err
}
if int(n) > len(buf) {
buf = make([]byte, n)
}
if _, err := io.ReadFull(br, buf[:n]); err != nil {
return err
}
rec := ReplicationRecord{}
if err := rec.Unmarshal(buf[:n]); err != nil {
return err
}
key, err := protocol.DeviceIDFromBytes(rec.Key)
if err != nil {
key, err = protocol.DeviceIDFromString(string(rec.Key))
}
if err != nil {
log.Println("Bad device ID:", err)
continue
} }
iter.Release() s.m.Store(key, DatabaseRecord{
Addresses: rec.Addresses,
databaseKeys.WithLabelValues("current").Set(float64(current)) Seen: rec.Seen,
databaseKeys.WithLabelValues("currentIPv4").Set(float64(currentIPv4)) })
databaseKeys.WithLabelValues("currentIPv6").Set(float64(currentIPv6))
databaseKeys.WithLabelValues("last24h").Set(float64(last24h))
databaseKeys.WithLabelValues("last1w").Set(float64(last1w))
databaseKeys.WithLabelValues("inactive").Set(float64(inactive))
databaseKeys.WithLabelValues("error").Set(float64(errors))
databaseStatisticsSeconds.Set(time.Since(t0).Seconds())
// Signal that we are done and can be scheduled again.
done <- struct{}{}
} }
return nil
} }
// merge returns the merged result of the two database records a and b. The // merge returns the merged result of the two database records a and b. The
@ -411,3 +445,36 @@ func (s databaseAddressOrder) Swap(a, b int) {
func (s databaseAddressOrder) Len() int { func (s databaseAddressOrder) Len() int {
return len(s) return len(s)
} }
func s3Upload(r io.Reader) error {
sess, err := session.NewSession(&aws.Config{
Region: aws.String("fr-par"),
Endpoint: aws.String("s3.fr-par.scw.cloud"),
})
if err != nil {
return err
}
uploader := s3manager.NewUploader(sess)
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("syncthing-discovery"),
Key: aws.String("discovery.db"),
Body: r,
})
return err
}
func s3Download(w io.WriterAt) error {
sess, err := session.NewSession(&aws.Config{
Region: aws.String("fr-par"),
Endpoint: aws.String("s3.fr-par.scw.cloud"),
})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(sess)
_, err = downloader.Download(w, &s3.GetObjectInput{
Bucket: aws.String("syncthing-discovery"),
Key: aws.String("discovery.db"),
})
return err
}

View File

@ -25,9 +25,7 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type DatabaseRecord struct { type DatabaseRecord struct {
Addresses []DatabaseAddress `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses"` Addresses []DatabaseAddress `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses"`
Misses int32 `protobuf:"varint,2,opt,name=misses,proto3" json:"misses,omitempty"`
Seen int64 `protobuf:"varint,3,opt,name=seen,proto3" json:"seen,omitempty"` Seen int64 `protobuf:"varint,3,opt,name=seen,proto3" json:"seen,omitempty"`
Missed int64 `protobuf:"varint,4,opt,name=missed,proto3" json:"missed,omitempty"`
} }
func (m *DatabaseRecord) Reset() { *m = DatabaseRecord{} } func (m *DatabaseRecord) Reset() { *m = DatabaseRecord{} }
@ -64,7 +62,7 @@ func (m *DatabaseRecord) XXX_DiscardUnknown() {
var xxx_messageInfo_DatabaseRecord proto.InternalMessageInfo var xxx_messageInfo_DatabaseRecord proto.InternalMessageInfo
type ReplicationRecord struct { type ReplicationRecord struct {
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
Addresses []DatabaseAddress `protobuf:"bytes,2,rep,name=addresses,proto3" json:"addresses"` Addresses []DatabaseAddress `protobuf:"bytes,2,rep,name=addresses,proto3" json:"addresses"`
Seen int64 `protobuf:"varint,3,opt,name=seen,proto3" json:"seen,omitempty"` Seen int64 `protobuf:"varint,3,opt,name=seen,proto3" json:"seen,omitempty"`
} }
@ -149,24 +147,23 @@ func init() {
func init() { proto.RegisterFile("database.proto", fileDescriptor_b90fe3356ea5df07) } func init() { proto.RegisterFile("database.proto", fileDescriptor_b90fe3356ea5df07) }
var fileDescriptor_b90fe3356ea5df07 = []byte{ var fileDescriptor_b90fe3356ea5df07 = []byte{
// 270 bytes of a gzipped FileDescriptorProto // 243 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x90, 0x41, 0x4a, 0xc4, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4b, 0x49, 0x2c, 0x49,
0x18, 0x85, 0x9b, 0x49, 0x1d, 0x99, 0x08, 0xa3, 0x06, 0x94, 0x20, 0x12, 0x4b, 0xdd, 0x74, 0xd5, 0x4c, 0x4a, 0x2c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xc9, 0x4d, 0xcc, 0xcc,
0x01, 0x5d, 0xb9, 0x74, 0xd0, 0x0b, 0xe4, 0x06, 0xe9, 0xe4, 0x77, 0x08, 0x3a, 0x4d, 0x49, 0x2a, 0x93, 0x52, 0x2e, 0x4a, 0x2d, 0xc8, 0x2f, 0xd6, 0x07, 0x0b, 0x25, 0x95, 0xa6, 0xe9, 0xa7, 0xe7,
0xe8, 0x29, 0xf4, 0x58, 0x5d, 0xce, 0xd2, 0x95, 0x68, 0x7b, 0x11, 0x69, 0x26, 0x55, 0x14, 0x37, 0xa7, 0xe7, 0x83, 0x39, 0x60, 0x16, 0x44, 0xa9, 0x52, 0x3c, 0x17, 0x9f, 0x0b, 0x54, 0x73, 0x50,
0xb3, 0x7b, 0xdf, 0xff, 0xbf, 0x97, 0xbc, 0x84, 0x4c, 0x95, 0xac, 0x65, 0x21, 0x1d, 0xe4, 0x95, 0x6a, 0x72, 0x7e, 0x51, 0x8a, 0x90, 0x25, 0x17, 0x67, 0x62, 0x4a, 0x4a, 0x51, 0x6a, 0x71, 0x71,
0x35, 0xb5, 0xa1, 0xf1, 0x4a, 0xea, 0xf2, 0xe4, 0xdc, 0x42, 0x65, 0xdc, 0xcc, 0x8f, 0x8a, 0xc7, 0x6a, 0xb1, 0x04, 0xa3, 0x02, 0xb3, 0x06, 0xb7, 0x91, 0xa8, 0x1e, 0xc8, 0x40, 0x3d, 0x98, 0x42,
0xbb, 0xd9, 0xd2, 0x2c, 0x8d, 0x07, 0xaf, 0x36, 0xd6, 0xf4, 0x05, 0x91, 0xe9, 0x4d, 0x48, 0x0b, 0x47, 0x88, 0xb4, 0x13, 0xcb, 0x89, 0x7b, 0xf2, 0x0c, 0x41, 0x08, 0xd5, 0x42, 0x42, 0x5c, 0x2c,
0x58, 0x18, 0xab, 0xe8, 0x15, 0x99, 0x48, 0xa5, 0x2c, 0x38, 0x07, 0x8e, 0xa1, 0x04, 0x67, 0x7b, 0xc5, 0xa9, 0xa9, 0x79, 0x12, 0xcc, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x60, 0xb6, 0x52, 0x09, 0x97,
0x17, 0x47, 0x79, 0x7f, 0x62, 0x3e, 0x18, 0xaf, 0x37, 0xeb, 0x79, 0xdc, 0xbc, 0x9f, 0x45, 0xe2, 0x60, 0x50, 0x6a, 0x41, 0x4e, 0x66, 0x72, 0x62, 0x49, 0x66, 0x7e, 0x1e, 0xd4, 0x0e, 0x01, 0x2e,
0xc7, 0x4d, 0x8f, 0xc9, 0x78, 0xa5, 0x7d, 0x6e, 0x94, 0xa0, 0x6c, 0x47, 0x04, 0xa2, 0x94, 0xc4, 0xe6, 0xec, 0xd4, 0x4a, 0x09, 0x46, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x10, 0x13, 0xd5, 0x56, 0x26,
0x0e, 0xa0, 0x64, 0x38, 0x41, 0x19, 0x16, 0x5e, 0x7f, 0x7b, 0x15, 0x8b, 0xfd, 0x34, 0x50, 0x5a, 0x8a, 0x6d, 0x75, 0xe5, 0xe2, 0x47, 0xd3, 0x27, 0x24, 0xc1, 0xc5, 0x0e, 0xd5, 0x03, 0xb6, 0x97,
0x93, 0x43, 0x01, 0xd5, 0x83, 0x5e, 0xc8, 0x5a, 0x9b, 0x32, 0x74, 0x3a, 0x20, 0xf8, 0x1e, 0x9e, 0x33, 0x08, 0xc6, 0x05, 0xc9, 0xa4, 0x56, 0x14, 0x64, 0x16, 0x81, 0x6d, 0x06, 0x99, 0x01, 0xe3,
0x19, 0x4a, 0x50, 0x36, 0x11, 0xbd, 0xfc, 0xdd, 0x72, 0xb4, 0x55, 0xcb, 0x7f, 0xda, 0xa4, 0xb7, 0x3a, 0xc9, 0x9c, 0x78, 0x28, 0xc7, 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f,
0x64, 0xff, 0x4f, 0x8e, 0x32, 0xb2, 0x1b, 0x32, 0xe1, 0xde, 0x01, 0xfb, 0x0d, 0x3c, 0x55, 0xda, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, 0x0c,
0x86, 0x77, 0x62, 0x31, 0xe0, 0xfc, 0xb4, 0xf9, 0xe4, 0x51, 0xd3, 0x72, 0xb4, 0x6e, 0x39, 0xfa, 0x49, 0x6c, 0xe0, 0x20, 0x34, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xc6, 0x0b, 0x9b, 0x77, 0x7f,
0x68, 0x39, 0x7a, 0xed, 0x78, 0xb4, 0xee, 0x78, 0xf4, 0xd6, 0xf1, 0xa8, 0x18, 0xfb, 0x3f, 0xbf, 0x01, 0x00, 0x00,
0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x7a, 0xa2, 0xf6, 0x1e, 0xb0, 0x01, 0x00, 0x00,
} }
func (m *DatabaseRecord) Marshal() (dAtA []byte, err error) { func (m *DatabaseRecord) Marshal() (dAtA []byte, err error) {
@ -189,21 +186,11 @@ func (m *DatabaseRecord) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i _ = i
var l int var l int
_ = l _ = l
if m.Missed != 0 {
i = encodeVarintDatabase(dAtA, i, uint64(m.Missed))
i--
dAtA[i] = 0x20
}
if m.Seen != 0 { if m.Seen != 0 {
i = encodeVarintDatabase(dAtA, i, uint64(m.Seen)) i = encodeVarintDatabase(dAtA, i, uint64(m.Seen))
i-- i--
dAtA[i] = 0x18 dAtA[i] = 0x18
} }
if m.Misses != 0 {
i = encodeVarintDatabase(dAtA, i, uint64(m.Misses))
i--
dAtA[i] = 0x10
}
if len(m.Addresses) > 0 { if len(m.Addresses) > 0 {
for iNdEx := len(m.Addresses) - 1; iNdEx >= 0; iNdEx-- { for iNdEx := len(m.Addresses) - 1; iNdEx >= 0; iNdEx-- {
{ {
@ -328,15 +315,9 @@ func (m *DatabaseRecord) Size() (n int) {
n += 1 + l + sovDatabase(uint64(l)) n += 1 + l + sovDatabase(uint64(l))
} }
} }
if m.Misses != 0 {
n += 1 + sovDatabase(uint64(m.Misses))
}
if m.Seen != 0 { if m.Seen != 0 {
n += 1 + sovDatabase(uint64(m.Seen)) n += 1 + sovDatabase(uint64(m.Seen))
} }
if m.Missed != 0 {
n += 1 + sovDatabase(uint64(m.Missed))
}
return n return n
} }
@ -447,25 +428,6 @@ func (m *DatabaseRecord) Unmarshal(dAtA []byte) error {
return err return err
} }
iNdEx = postIndex iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Misses", wireType)
}
m.Misses = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowDatabase
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Misses |= int32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3: case 3:
if wireType != 0 { if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Seen", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Seen", wireType)
@ -485,25 +447,6 @@ func (m *DatabaseRecord) Unmarshal(dAtA []byte) error {
break break
} }
} }
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Missed", wireType)
}
m.Missed = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowDatabase
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Missed |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipDatabase(dAtA[iNdEx:]) skippy, err := skipDatabase(dAtA[iNdEx:])
@ -558,7 +501,7 @@ func (m *ReplicationRecord) Unmarshal(dAtA []byte) error {
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
} }
var stringLen uint64 var byteLen int
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
if shift >= 64 { if shift >= 64 {
return ErrIntOverflowDatabase return ErrIntOverflowDatabase
@ -568,23 +511,25 @@ func (m *ReplicationRecord) Unmarshal(dAtA []byte) error {
} }
b := dAtA[iNdEx] b := dAtA[iNdEx]
iNdEx++ iNdEx++
stringLen |= uint64(b&0x7F) << shift byteLen |= int(b&0x7F) << shift
if b < 0x80 { if b < 0x80 {
break break
} }
} }
intStringLen := int(stringLen) if byteLen < 0 {
if intStringLen < 0 {
return ErrInvalidLengthDatabase return ErrInvalidLengthDatabase
} }
postIndex := iNdEx + intStringLen postIndex := iNdEx + byteLen
if postIndex < 0 { if postIndex < 0 {
return ErrInvalidLengthDatabase return ErrInvalidLengthDatabase
} }
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Key = string(dAtA[iNdEx:postIndex]) m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
if m.Key == nil {
m.Key = []byte{}
}
iNdEx = postIndex iNdEx = postIndex
case 2: case 2:
if wireType != 2 { if wireType != 2 {

View File

@ -17,15 +17,11 @@ option (gogoproto.goproto_sizecache_all) = false;
message DatabaseRecord { message DatabaseRecord {
repeated DatabaseAddress addresses = 1 [(gogoproto.nullable) = false]; repeated DatabaseAddress addresses = 1 [(gogoproto.nullable) = false];
int32 misses = 2; // Number of lookups* without hits
int64 seen = 3; // Unix nanos, last device announce int64 seen = 3; // Unix nanos, last device announce
int64 missed = 4; // Unix nanos, last* failed lookup
} }
// *) Not every lookup results in a write, so may not be completely accurate
message ReplicationRecord { message ReplicationRecord {
string key = 1; bytes key = 1; // raw 32 byte device ID
repeated DatabaseAddress addresses = 2 [(gogoproto.nullable) = false]; repeated DatabaseAddress addresses = 2 [(gogoproto.nullable) = false];
int64 seen = 3; // Unix nanos, last device announce int64 seen = 3; // Unix nanos, last device announce
} }

View File

@ -11,29 +11,25 @@ import (
"fmt" "fmt"
"testing" "testing"
"time" "time"
"github.com/syncthing/syncthing/lib/protocol"
) )
func TestDatabaseGetSet(t *testing.T) { func TestDatabaseGetSet(t *testing.T) {
db, err := newMemoryLevelDBStore() db := newInMemoryStore(t.TempDir(), 0)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go db.Serve(ctx) go db.Serve(ctx)
defer cancel() defer cancel()
// Check missing record // Check missing record
rec, err := db.get("abcd") rec, err := db.get(&protocol.EmptyDeviceID)
if err != nil { if err != nil {
t.Error("not found should not be an error") t.Error("not found should not be an error")
} }
if len(rec.Addresses) != 0 { if len(rec.Addresses) != 0 {
t.Error("addresses should be empty") t.Error("addresses should be empty")
} }
if rec.Misses != 0 {
t.Error("missing should be zero")
}
// Set up a clock // Set up a clock
@ -46,13 +42,13 @@ func TestDatabaseGetSet(t *testing.T) {
rec.Addresses = []DatabaseAddress{ rec.Addresses = []DatabaseAddress{
{Address: "tcp://1.2.3.4:5", Expires: tc.Now().Add(time.Minute).UnixNano()}, {Address: "tcp://1.2.3.4:5", Expires: tc.Now().Add(time.Minute).UnixNano()},
} }
if err := db.put("abcd", rec); err != nil { if err := db.put(&protocol.EmptyDeviceID, rec); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Verify it // Verify it
rec, err = db.get("abcd") rec, err = db.get(&protocol.EmptyDeviceID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -72,13 +68,13 @@ func TestDatabaseGetSet(t *testing.T) {
addrs := []DatabaseAddress{ addrs := []DatabaseAddress{
{Address: "tcp://6.7.8.9:0", Expires: tc.Now().Add(time.Minute).UnixNano()}, {Address: "tcp://6.7.8.9:0", Expires: tc.Now().Add(time.Minute).UnixNano()},
} }
if err := db.merge("abcd", addrs, tc.Now().UnixNano()); err != nil { if err := db.merge(&protocol.EmptyDeviceID, addrs, tc.Now().UnixNano()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Verify it // Verify it
rec, err = db.get("abcd") rec, err = db.get(&protocol.EmptyDeviceID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -101,7 +97,7 @@ func TestDatabaseGetSet(t *testing.T) {
// Verify it // Verify it
rec, err = db.get("abcd") rec, err = db.get(&protocol.EmptyDeviceID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -114,40 +110,18 @@ func TestDatabaseGetSet(t *testing.T) {
t.Error("incorrect address") t.Error("incorrect address")
} }
// Put a record with misses
rec = DatabaseRecord{Misses: 42, Missed: tc.Now().UnixNano()}
if err := db.put("efgh", rec); err != nil {
t.Fatal(err)
}
// Verify it
rec, err = db.get("efgh")
if err != nil {
t.Fatal(err)
}
if len(rec.Addresses) != 0 {
t.Log(rec.Addresses)
t.Fatal("should have no addresses")
}
if rec.Misses != 42 {
t.Log(rec.Misses)
t.Error("incorrect misses")
}
// Set an address // Set an address
addrs = []DatabaseAddress{ addrs = []DatabaseAddress{
{Address: "tcp://6.7.8.9:0", Expires: tc.Now().Add(time.Minute).UnixNano()}, {Address: "tcp://6.7.8.9:0", Expires: tc.Now().Add(time.Minute).UnixNano()},
} }
if err := db.merge("efgh", addrs, tc.Now().UnixNano()); err != nil { if err := db.merge(&protocol.GlobalDeviceID, addrs, tc.Now().UnixNano()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Verify it // Verify it
rec, err = db.get("efgh") rec, err = db.get(&protocol.GlobalDeviceID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -155,10 +129,6 @@ func TestDatabaseGetSet(t *testing.T) {
t.Log(rec.Addresses) t.Log(rec.Addresses)
t.Fatal("should have one address") t.Fatal("should have one address")
} }
if rec.Misses != 0 {
t.Log(rec.Misses)
t.Error("should have no misses")
}
} }
func TestFilter(t *testing.T) { func TestFilter(t *testing.T) {

View File

@ -14,6 +14,7 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"os/signal"
"runtime" "runtime"
"strings" "strings"
"time" "time"
@ -24,7 +25,6 @@ import (
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/thejerf/suture/v4" "github.com/thejerf/suture/v4"
) )
@ -39,17 +39,12 @@ const (
errorRetryAfterSeconds = 1500 errorRetryAfterSeconds = 1500
errorRetryFuzzSeconds = 300 errorRetryFuzzSeconds = 300
// Retry for not found is minSeconds + failures * incSeconds + // Retry for not found is notFoundRetrySeenSeconds for records we have
// random(fuzz), where failures is the number of consecutive lookups // seen an announcement for (but it's not active right now) and
// with no answer, up to maxSeconds. The fuzz is applied after capping // notFoundRetryUnknownSeconds for records we have never seen (or not
// to maxSeconds. // seen within the last week).
notFoundRetryMinSeconds = 60 notFoundRetryUnknownMinSeconds = 60
notFoundRetryMaxSeconds = 3540 notFoundRetryUnknownMaxSeconds = 3600
notFoundRetryIncSeconds = 10
notFoundRetryFuzzSeconds = 60
// How often (in requests) we serialize the missed counter to database.
notFoundMissesWriteInterval = 10
httpReadTimeout = 5 * time.Second httpReadTimeout = 5 * time.Second
httpWriteTimeout = 5 * time.Second httpWriteTimeout = 5 * time.Second
@ -59,14 +54,6 @@ const (
replicationOutboxSize = 10000 replicationOutboxSize = 10000
) )
// These options make the database a little more optimized for writes, at
// the expense of some memory usage and risk of losing writes in a (system)
// crash.
var levelDBOptions = &opt.Options{
NoSync: true,
WriteBuffer: 32 << 20, // default 4<<20
}
var debug = false var debug = false
func main() { func main() {
@ -81,16 +68,15 @@ func main() {
var replKeyFile string var replKeyFile string
var useHTTP bool var useHTTP bool
var compression bool var compression bool
var largeDB bool
var amqpAddress string var amqpAddress string
missesIncrease := 1 var flushInterval time.Duration
log.SetOutput(os.Stdout) log.SetOutput(os.Stdout)
log.SetFlags(0) log.SetFlags(0)
flag.StringVar(&certFile, "cert", "./cert.pem", "Certificate file") flag.StringVar(&certFile, "cert", "./cert.pem", "Certificate file")
flag.StringVar(&keyFile, "key", "./key.pem", "Key file") flag.StringVar(&keyFile, "key", "./key.pem", "Key file")
flag.StringVar(&dir, "db-dir", "./discovery.db", "Database directory") flag.StringVar(&dir, "db-dir", ".", "Database directory")
flag.BoolVar(&debug, "debug", false, "Print debug output") flag.BoolVar(&debug, "debug", false, "Print debug output")
flag.BoolVar(&useHTTP, "http", false, "Listen on HTTP (behind an HTTPS proxy)") flag.BoolVar(&useHTTP, "http", false, "Listen on HTTP (behind an HTTPS proxy)")
flag.BoolVar(&compression, "compression", true, "Enable GZIP compression of responses") flag.BoolVar(&compression, "compression", true, "Enable GZIP compression of responses")
@ -100,9 +86,8 @@ func main() {
flag.StringVar(&replicationListen, "replication-listen", ":19200", "Replication listen address") flag.StringVar(&replicationListen, "replication-listen", ":19200", "Replication listen address")
flag.StringVar(&replCertFile, "replication-cert", "", "Certificate file for replication") flag.StringVar(&replCertFile, "replication-cert", "", "Certificate file for replication")
flag.StringVar(&replKeyFile, "replication-key", "", "Key file for replication") flag.StringVar(&replKeyFile, "replication-key", "", "Key file for replication")
flag.BoolVar(&largeDB, "large-db", false, "Use larger database settings")
flag.StringVar(&amqpAddress, "amqp-address", "", "Address to AMQP broker") flag.StringVar(&amqpAddress, "amqp-address", "", "Address to AMQP broker")
flag.IntVar(&missesIncrease, "misses-increase", 1, "How many times to increase the misses counter on each miss") flag.DurationVar(&flushInterval, "flush-interval", 5*time.Minute, "Interval between database flushes")
showVersion := flag.Bool("version", false, "Show version") showVersion := flag.Bool("version", false, "Show version")
flag.Parse() flag.Parse()
@ -113,15 +98,6 @@ func main() {
buildInfo.WithLabelValues(build.Version, runtime.Version(), build.User, build.Date.UTC().Format("2006-01-02T15:04:05Z")).Set(1) buildInfo.WithLabelValues(build.Version, runtime.Version(), build.User, build.Date.UTC().Format("2006-01-02T15:04:05Z")).Set(1)
if largeDB {
levelDBOptions.BlockCacheCapacity = 64 << 20
levelDBOptions.BlockSize = 64 << 10
levelDBOptions.CompactionTableSize = 16 << 20
levelDBOptions.CompactionTableSizeMultiplier = 2.0
levelDBOptions.WriteBuffer = 64 << 20
levelDBOptions.CompactionL0Trigger = 8
}
cert, err := tls.LoadX509KeyPair(certFile, keyFile) cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if os.IsNotExist(err) { if os.IsNotExist(err) {
log.Println("Failed to load keypair. Generating one, this might take a while...") log.Println("Failed to load keypair. Generating one, this might take a while...")
@ -190,10 +166,7 @@ func main() {
}) })
// Start the database. // Start the database.
db, err := newLevelDBStore(dir) db := newInMemoryStore(dir, flushInterval)
if err != nil {
log.Fatalln("Open database:", err)
}
main.Add(db) main.Add(db)
// Start any replication senders. // Start any replication senders.
@ -218,16 +191,8 @@ func main() {
main.Add(kr) main.Add(kr)
} }
go func() {
for range time.NewTicker(time.Second).C {
for _, r := range repl {
r.send("<heartbeat>", nil, time.Now().UnixNano())
}
}
}()
// Start the main API server. // Start the main API server.
qs := newAPISrv(listen, cert, db, repl, useHTTP, compression, missesIncrease) qs := newAPISrv(listen, cert, db, repl, useHTTP, compression)
main.Add(qs) main.Add(qs)
// If we have a metrics port configured, start a metrics handler. // If we have a metrics port configured, start a metrics handler.
@ -239,6 +204,18 @@ func main() {
}() }()
} }
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Cancel on signal
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
go func() {
sig := <-signalChan
log.Printf("Received signal %s; shutting down", sig)
cancel()
}()
// Engage! // Engage!
main.Serve(context.Background()) main.Serve(ctx)
} }

View File

@ -26,7 +26,7 @@ const (
) )
type replicator interface { type replicator interface {
send(key string, addrs []DatabaseAddress, seen int64) send(key *protocol.DeviceID, addrs []DatabaseAddress, seen int64)
} }
// a replicationSender tries to connect to the remote address and provide // a replicationSender tries to connect to the remote address and provide
@ -144,9 +144,9 @@ func (s *replicationSender) String() string {
return fmt.Sprintf("replicationSender(%q)", s.dst) return fmt.Sprintf("replicationSender(%q)", s.dst)
} }
func (s *replicationSender) send(key string, ps []DatabaseAddress, seen int64) { func (s *replicationSender) send(key *protocol.DeviceID, ps []DatabaseAddress, seen int64) {
item := ReplicationRecord{ item := ReplicationRecord{
Key: key, Key: key[:],
Addresses: ps, Addresses: ps,
Seen: seen, Seen: seen,
} }
@ -163,7 +163,7 @@ func (s *replicationSender) send(key string, ps []DatabaseAddress, seen int64) {
// a replicationMultiplexer sends to multiple replicators // a replicationMultiplexer sends to multiple replicators
type replicationMultiplexer []replicator type replicationMultiplexer []replicator
func (m replicationMultiplexer) send(key string, ps []DatabaseAddress, seen int64) { func (m replicationMultiplexer) send(key *protocol.DeviceID, ps []DatabaseAddress, seen int64) {
for _, s := range m { for _, s := range m {
// each send is nonblocking // each send is nonblocking
s.send(key, ps, seen) s.send(key, ps, seen)
@ -290,9 +290,18 @@ func (l *replicationListener) handle(ctx context.Context, conn net.Conn) {
replicationRecvsTotal.WithLabelValues("error").Inc() replicationRecvsTotal.WithLabelValues("error").Inc()
continue continue
} }
id, err := protocol.DeviceIDFromBytes(rec.Key)
if err != nil {
id, err = protocol.DeviceIDFromString(string(rec.Key))
}
if err != nil {
log.Println("Replication device ID:", err)
replicationRecvsTotal.WithLabelValues("error").Inc()
continue
}
// Store // Store
l.db.merge(rec.Key, rec.Addresses, rec.Seen) l.db.merge(&id, rec.Addresses, rec.Seen)
replicationRecvsTotal.WithLabelValues("success").Inc() replicationRecvsTotal.WithLabelValues("success").Inc()
} }
} }

View File

@ -96,13 +96,28 @@ var (
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"operation"}) }, []string{"operation"})
retryAfterHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ databaseWriteSeconds = prometheus.NewGauge(
Namespace: "syncthing", prometheus.GaugeOpts{
Subsystem: "discovery", Namespace: "syncthing",
Name: "retry_after_seconds", Subsystem: "discovery",
Help: "Retry-After header value in seconds.", Name: "database_write_seconds",
Buckets: prometheus.ExponentialBuckets(60, 2, 7), // 60, 120, 240, 480, 960, 1920, 3840 Help: "Time spent writing the database.",
}) })
databaseLastWritten = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "syncthing",
Subsystem: "discovery",
Name: "database_last_written",
Help: "Timestamp of the last successful database write.",
})
retryAfterLevel = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "syncthing",
Subsystem: "discovery",
Name: "retry_after_seconds",
Help: "Retry-After header value in seconds.",
}, []string{"name"})
) )
const ( const (
@ -123,5 +138,6 @@ func init() {
replicationSendsTotal, replicationRecvsTotal, replicationSendsTotal, replicationRecvsTotal,
databaseKeys, databaseStatisticsSeconds, databaseKeys, databaseStatisticsSeconds,
databaseOperations, databaseOperationSeconds, databaseOperations, databaseOperationSeconds,
retryAfterHistogram) databaseWriteSeconds, databaseLastWritten,
retryAfterLevel)
} }

3
go.mod
View File

@ -5,6 +5,7 @@ go 1.22.0
require ( require (
github.com/AudriusButkevicius/recli v0.0.7-0.20220911121932-d000ce8fbf0f github.com/AudriusButkevicius/recli v0.0.7-0.20220911121932-d000ce8fbf0f
github.com/alecthomas/kong v0.9.0 github.com/alecthomas/kong v0.9.0
github.com/aws/aws-sdk-go v1.55.5
github.com/calmh/incontainer v1.0.0 github.com/calmh/incontainer v1.0.0
github.com/calmh/xdr v1.1.0 github.com/calmh/xdr v1.1.0
github.com/ccding/go-stun v0.1.5 github.com/ccding/go-stun v0.1.5
@ -28,6 +29,7 @@ require (
github.com/oschwald/geoip2-golang v1.11.0 github.com/oschwald/geoip2-golang v1.11.0
github.com/pierrec/lz4/v4 v4.1.21 github.com/pierrec/lz4/v4 v4.1.21
github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_golang v1.19.1
github.com/puzpuzpuz/xsync/v3 v3.4.0
github.com/quic-go/quic-go v0.46.0 github.com/quic-go/quic-go v0.46.0
github.com/rabbitmq/amqp091-go v1.10.0 github.com/rabbitmq/amqp091-go v1.10.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
@ -67,6 +69,7 @@ require (
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.11 // indirect github.com/nxadm/tail v1.4.11 // indirect

10
go.sum
View File

@ -11,6 +11,8 @@ github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI=
github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/calmh/glob v0.0.0-20220615080505-1d823af5017b h1:Fjm4GuJ+TGMgqfGHN42IQArJb77CfD/mAwLbDUoJe6g= github.com/calmh/glob v0.0.0-20220615080505-1d823af5017b h1:Fjm4GuJ+TGMgqfGHN42IQArJb77CfD/mAwLbDUoJe6g=
@ -124,6 +126,10 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
@ -194,6 +200,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/quic-go/quic-go v0.46.0 h1:uuwLClEEyk1DNvchH8uCByQVjo3yKL9opKulExNDs7Y= github.com/quic-go/quic-go v0.46.0 h1:uuwLClEEyk1DNvchH8uCByQVjo3yKL9opKulExNDs7Y=
github.com/quic-go/quic-go v0.46.0/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI= github.com/quic-go/quic-go v0.46.0/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
@ -381,7 +389,9 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=