From 65cfefaa3c2989db25fb847b44847a683e9881c4 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 23 Jan 2023 08:38:55 +0100 Subject: [PATCH] cmd, docker: Updates for infrastructure These are some changes to the relay pool server, upgrade server, and crash receiver to run under Kubernetes. It's been in production for a while. --- Dockerfile.stcrashreceiver | 18 +++ Dockerfile.strelaypoolsrv | 26 ++++ Dockerfile.stupgrades | 23 ++++ build.go | 12 ++ cmd/stcrashreceiver/diskstore.go | 181 +++++++++++++++++++++++++ cmd/stcrashreceiver/main.go | 48 +++++-- cmd/stcrashreceiver/sentry.go | 40 ++++++ cmd/stcrashreceiver/stcrashreceiver.go | 80 +++-------- cmd/strelaypoolsrv/main.go | 151 ++++++++++----------- cmd/stupgrades/main.go | 101 ++++++++++++-- lib/httpcache/httpcache.go | 124 +++++++++++++++++ script/strelaypoolsrv-entrypoint.sh | 10 ++ 12 files changed, 653 insertions(+), 161 deletions(-) create mode 100644 Dockerfile.stcrashreceiver create mode 100644 Dockerfile.strelaypoolsrv create mode 100644 Dockerfile.stupgrades create mode 100644 cmd/stcrashreceiver/diskstore.go create mode 100644 lib/httpcache/httpcache.go create mode 100755 script/strelaypoolsrv-entrypoint.sh diff --git a/Dockerfile.stcrashreceiver b/Dockerfile.stcrashreceiver new file mode 100644 index 000000000..4f6e60986 --- /dev/null +++ b/Dockerfile.stcrashreceiver @@ -0,0 +1,18 @@ +ARG GOVERSION=latest +FROM golang:$GOVERSION AS builder + +WORKDIR /src +COPY . . + +ENV CGO_ENABLED=0 +ENV BUILD_HOST=syncthing.net +ENV BUILD_USER=docker +RUN rm -f stcrashreceiver && go run build.go build stcrashreceiver + +FROM alpine + +EXPOSE 8080 + +COPY --from=builder /src/stcrashreceiver /bin/stcrashreceiver + +ENTRYPOINT [ "/bin/stcrashreceiver" ] diff --git a/Dockerfile.strelaypoolsrv b/Dockerfile.strelaypoolsrv new file mode 100644 index 000000000..814948b25 --- /dev/null +++ b/Dockerfile.strelaypoolsrv @@ -0,0 +1,26 @@ +ARG GOVERSION=latest +FROM golang:$GOVERSION AS builder + +WORKDIR /src +COPY . . + +ENV CGO_ENABLED=0 +ENV BUILD_HOST=syncthing.net +ENV BUILD_USER=docker +RUN rm -f strelaysrv && go run build.go -no-upgrade build strelaypoolsrv + +FROM alpine + +EXPOSE 8080 + +RUN apk add --no-cache ca-certificates su-exec curl +ENV PUID=1000 PGID=1000 MAXMIND_KEY= + +RUN mkdir /var/strelaypoolsrv && chown 1000 /var/strelaypoolsrv +USER 1000 + +COPY --from=builder /src/strelaypoolsrv /bin/strelaypoolsrv +COPY --from=builder /src/script/strelaypoolsrv-entrypoint.sh /bin/entrypoint.sh + +WORKDIR /var/strelaypoolsrv +ENTRYPOINT ["/bin/entrypoint.sh", "/bin/strelaypoolsrv", "-listen", ":8080"] diff --git a/Dockerfile.stupgrades b/Dockerfile.stupgrades new file mode 100644 index 000000000..b5caf44a9 --- /dev/null +++ b/Dockerfile.stupgrades @@ -0,0 +1,23 @@ +ARG GOVERSION=latest +FROM golang:$GOVERSION AS builder + +WORKDIR /src +COPY . . + +ENV CGO_ENABLED=0 +ENV BUILD_HOST=syncthing.net +ENV BUILD_USER=docker +RUN rm -f stupgrades && go run build.go build stupgrades + +FROM alpine + +EXPOSE 8080 + +COPY --from=builder /src/stupgrades /bin/stupgrades + +ENTRYPOINT [ \ + "/bin/stupgrades", \ + "-f", "/nightly.json->https://build.syncthing.net/guestAuth/repository/download/Release_Nightly/.lastSuccessful/nightly.json", \ + "-f", "/syncthing-macos/appcast.xml->https://build.syncthing.net/guestAuth/repository/download/SyncthingMacOS_CreateAppcastXml/.lastSuccessful/appcast.xml" \ + ] + diff --git a/build.go b/build.go index 14b9e18e6..a96eb91ef 100644 --- a/build.go +++ b/build.go @@ -207,6 +207,18 @@ var targets = map[string]target{ {src: "AUTHORS", dst: "deb/usr/share/doc/syncthing-relaypoolsrv/AUTHORS.txt", perm: 0644}, }, }, + "stupgrades": { + name: "stupgrades", + description: "Syncthing Upgrade Check Server", + buildPkgs: []string{"github.com/syncthing/syncthing/cmd/stupgrades"}, + binaryName: "stupgrades", + }, + "stcrashreceiver": { + name: "stupgrastcrashreceiverdes", + description: "Syncthing Crash Server", + buildPkgs: []string{"github.com/syncthing/syncthing/cmd/stcrashreceiver"}, + binaryName: "stcrashreceiver", + }, } func initTargets() { diff --git a/cmd/stcrashreceiver/diskstore.go b/cmd/stcrashreceiver/diskstore.go new file mode 100644 index 000000000..a5f459ad0 --- /dev/null +++ b/cmd/stcrashreceiver/diskstore.go @@ -0,0 +1,181 @@ +package main + +import ( + "bytes" + "compress/gzip" + "context" + "io" + "log" + "os" + "path/filepath" + "sort" + "time" +) + +type diskStore struct { + dir string + inbox chan diskEntry + maxBytes int64 + maxFiles int + + currentFiles []currentFile + currentSize int64 +} + +type diskEntry struct { + path string + data []byte +} + +type currentFile struct { + path string + size int64 + mtime int64 +} + +func (d *diskStore) Serve(ctx context.Context) { + if err := os.MkdirAll(d.dir, 0750); err != nil { + log.Println("Creating directory:", err) + return + } + + if err := d.inventory(); err != nil { + log.Println("Failed to inventory disk store:", err) + } + d.clean() + + cleanTimer := time.NewTicker(time.Minute) + inventoryTimer := time.NewTicker(24 * time.Hour) + + buf := new(bytes.Buffer) + gw := gzip.NewWriter(buf) + for { + select { + case entry := <-d.inbox: + path := d.fullPath(entry.path) + + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + log.Println("Creating directory:", err) + continue + } + + buf.Reset() + gw.Reset(buf) + if _, err := gw.Write(entry.data); err != nil { + log.Println("Failed to compress crash report:", err) + continue + } + if err := gw.Close(); err != nil { + log.Println("Failed to compress crash report:", err) + continue + } + if err := os.WriteFile(path, buf.Bytes(), 0644); err != nil { + log.Printf("Failed to write %s: %v", entry.path, err) + _ = os.Remove(path) + continue + } + + d.currentSize += int64(buf.Len()) + d.currentFiles = append(d.currentFiles, currentFile{ + size: int64(len(entry.data)), + path: path, + }) + + case <-cleanTimer.C: + d.clean() + + case <-inventoryTimer.C: + if err := d.inventory(); err != nil { + log.Println("Failed to inventory disk store:", err) + } + + case <-ctx.Done(): + return + } + } +} + +func (d *diskStore) Put(path string, data []byte) bool { + select { + case d.inbox <- diskEntry{ + path: path, + data: data, + }: + return true + default: + return false + } +} + +func (d *diskStore) Get(path string) ([]byte, error) { + path = d.fullPath(path) + bs, err := os.ReadFile(path) + if err != nil { + return nil, err + } + gr, err := gzip.NewReader(bytes.NewReader(bs)) + if err != nil { + return nil, err + } + defer gr.Close() + return io.ReadAll(gr) +} + +func (d *diskStore) Exists(path string) bool { + path = d.fullPath(path) + _, err := os.Lstat(path) + return err == nil +} + +func (d *diskStore) clean() { + for len(d.currentFiles) > 0 && (len(d.currentFiles) > d.maxFiles || d.currentSize > d.maxBytes) { + f := d.currentFiles[0] + log.Println("Removing", f.path) + if err := os.Remove(f.path); err != nil { + log.Println("Failed to remove file:", err) + } + d.currentFiles = d.currentFiles[1:] + d.currentSize -= f.size + } + var oldest time.Duration + if len(d.currentFiles) > 0 { + oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute) + } + log.Printf("Clean complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest) +} + +func (d *diskStore) inventory() error { + d.currentFiles = nil + d.currentSize = 0 + err := filepath.Walk(d.dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + if filepath.Ext(path) != ".gz" { + return nil + } + d.currentSize += info.Size() + d.currentFiles = append(d.currentFiles, currentFile{ + path: path, + size: info.Size(), + mtime: info.ModTime().Unix(), + }) + return nil + }) + sort.Slice(d.currentFiles, func(i, j int) bool { + return d.currentFiles[i].mtime < d.currentFiles[j].mtime + }) + var oldest time.Duration + if len(d.currentFiles) > 0 { + oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute) + } + log.Printf("Inventory complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest) + return err +} + +func (d *diskStore) fullPath(path string) string { + return filepath.Join(d.dir, path[0:2], path[2:]) + ".gz" +} diff --git a/cmd/stcrashreceiver/main.go b/cmd/stcrashreceiver/main.go index 62b70d5c2..15f48c17b 100644 --- a/cmd/stcrashreceiver/main.go +++ b/cmd/stcrashreceiver/main.go @@ -13,15 +13,17 @@ package main import ( + "context" "encoding/json" - "flag" "fmt" "io" "log" "net/http" "os" "path/filepath" + "time" + "github.com/alecthomas/kong" "github.com/syncthing/syncthing/lib/sha256" "github.com/syncthing/syncthing/lib/ur" @@ -30,26 +32,50 @@ import ( const maxRequestSize = 1 << 20 // 1 MiB +type cli struct { + Dir string `help:"Parent directory to store crash and failure reports in" env:"REPORTS_DIR" default:"."` + DSN string `help:"Sentry DSN" env:"SENTRY_DSN"` + Listen string `help:"HTTP listen address" default:":8080" env:"LISTEN_ADDRESS"` + MaxDiskFiles int `help:"Maximum number of reports on disk" default:"100000" env:"MAX_DISK_FILES"` + MaxDiskSizeMB int64 `help:"Maximum disk space to use for reports" default:"1024" env:"MAX_DISK_SIZE_MB"` + CleanInterval time.Duration `help:"Interval between cleaning up old reports" default:"12h" env:"CLEAN_INTERVAL"` + SentryQueue int `help:"Maximum number of reports to queue for sending to Sentry" default:"64" env:"SENTRY_QUEUE"` + DiskQueue int `help:"Maximum number of reports to queue for writing to disk" default:"64" env:"DISK_QUEUE"` +} + func main() { - dir := flag.String("dir", ".", "Parent directory to store crash and failure reports in") - dsn := flag.String("dsn", "", "Sentry DSN") - listen := flag.String("listen", ":22039", "HTTP listen address") - flag.Parse() + var params cli + kong.Parse(¶ms) mux := http.NewServeMux() - cr := &crashReceiver{ - dir: filepath.Join(*dir, "crash_reports"), - dsn: *dsn, + ds := &diskStore{ + dir: filepath.Join(params.Dir, "crash_reports"), + inbox: make(chan diskEntry, params.DiskQueue), + maxFiles: params.MaxDiskFiles, + maxBytes: params.MaxDiskSizeMB << 20, } + go ds.Serve(context.Background()) + + ss := &sentryService{ + dsn: params.DSN, + inbox: make(chan sentryRequest, params.SentryQueue), + } + go ss.Serve(context.Background()) + + cr := &crashReceiver{ + store: ds, + sentry: ss, + } + mux.Handle("/", cr) - if *dsn != "" { - mux.HandleFunc("/newcrash/failure", handleFailureFn(*dsn, filepath.Join(*dir, "failure_reports"))) + if params.DSN != "" { + mux.HandleFunc("/newcrash/failure", handleFailureFn(params.DSN, filepath.Join(params.Dir, "failure_reports"))) } log.SetOutput(os.Stdout) - if err := http.ListenAndServe(*listen, mux); err != nil { + if err := http.ListenAndServe(params.Listen, mux); err != nil { log.Fatalln("HTTP serve:", err) } } diff --git a/cmd/stcrashreceiver/sentry.go b/cmd/stcrashreceiver/sentry.go index e7796fe29..de8d91e57 100644 --- a/cmd/stcrashreceiver/sentry.go +++ b/cmd/stcrashreceiver/sentry.go @@ -8,8 +8,10 @@ package main import ( "bytes" + "context" "errors" "io" + "log" "regexp" "strings" "sync" @@ -31,6 +33,44 @@ var ( clientsMut sync.Mutex ) +type sentryService struct { + dsn string + inbox chan sentryRequest +} + +type sentryRequest struct { + reportID string + data []byte +} + +func (s *sentryService) Serve(ctx context.Context) { + for { + select { + case req := <-s.inbox: + pkt, err := parseCrashReport(req.reportID, req.data) + if err != nil { + log.Println("Failed to parse crash report:", err) + continue + } + if err := sendReport(s.dsn, pkt, req.reportID); err != nil { + log.Println("Failed to send crash report:", err) + } + + case <-ctx.Done(): + return + } + } +} + +func (s *sentryService) Send(reportID string, data []byte) bool { + select { + case s.inbox <- sentryRequest{reportID, data}: + return true + default: + return false + } +} + func sendReport(dsn string, pkt *raven.Packet, userID string) error { pkt.Interfaces = append(pkt.Interfaces, &raven.User{ID: userID}) diff --git a/cmd/stcrashreceiver/stcrashreceiver.go b/cmd/stcrashreceiver/stcrashreceiver.go index 57b98a5d1..0c1dcd4e2 100644 --- a/cmd/stcrashreceiver/stcrashreceiver.go +++ b/cmd/stcrashreceiver/stcrashreceiver.go @@ -7,20 +7,16 @@ package main import ( - "bytes" - "compress/gzip" "io" "log" "net/http" - "os" "path" - "path/filepath" "strings" ) type crashReceiver struct { - dir string - dsn string + store *diskStore + sentry *sentryService } func (r *crashReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -43,55 +39,38 @@ func (r *crashReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - // The location of the report on disk, compressed - fullPath := filepath.Join(r.dir, r.dirFor(reportID), reportID) + ".gz" - switch req.Method { case http.MethodGet: - r.serveGet(fullPath, w, req) + r.serveGet(reportID, w, req) case http.MethodHead: - r.serveHead(fullPath, w, req) + r.serveHead(reportID, w, req) case http.MethodPut: - r.servePut(reportID, fullPath, w, req) + r.servePut(reportID, w, req) default: http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } } // serveGet responds to GET requests by serving the uncompressed report. -func (*crashReceiver) serveGet(fullPath string, w http.ResponseWriter, _ *http.Request) { - fd, err := os.Open(fullPath) +func (r *crashReceiver) serveGet(reportID string, w http.ResponseWriter, _ *http.Request) { + bs, err := r.store.Get(reportID) if err != nil { http.Error(w, "Not found", http.StatusNotFound) return } - - defer fd.Close() - gr, err := gzip.NewReader(fd) - if err != nil { - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - _, _ = io.Copy(w, gr) // best effort + w.Write(bs) } // serveHead responds to HEAD requests by checking if the named report // already exists in the system. -func (*crashReceiver) serveHead(fullPath string, w http.ResponseWriter, _ *http.Request) { - if _, err := os.Lstat(fullPath); err != nil { +func (r *crashReceiver) serveHead(reportID string, w http.ResponseWriter, _ *http.Request) { + if !r.store.Exists(reportID) { http.Error(w, "Not found", http.StatusNotFound) } } // servePut accepts and stores the given report. -func (r *crashReceiver) servePut(reportID, fullPath string, w http.ResponseWriter, req *http.Request) { - // Ensure the destination directory exists - if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil { - log.Println("Creating directory:", err) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - +func (r *crashReceiver) servePut(reportID string, w http.ResponseWriter, req *http.Request) { // Read at most maxRequestSize of report data. log.Println("Receiving report", reportID) lr := io.LimitReader(req.Body, maxRequestSize) @@ -102,40 +81,13 @@ func (r *crashReceiver) servePut(reportID, fullPath string, w http.ResponseWrite return } - // Compress the report for storage - buf := new(bytes.Buffer) - gw := gzip.NewWriter(buf) - _, _ = gw.Write(bs) // can't fail - gw.Close() - - // Create an output file with the compressed report - err = os.WriteFile(fullPath, buf.Bytes(), 0644) - if err != nil { - log.Println("Saving report:", err) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return + // Store the report + if !r.store.Put(reportID, bs) { + log.Println("Failed to store report (queue full):", reportID) } // Send the report to Sentry - if r.dsn != "" { - // Remote ID - user := userIDFor(req) - - go func() { - // There's no need for the client to have to wait for this part. - pkt, err := parseCrashReport(reportID, bs) - if err != nil { - log.Println("Failed to parse crash report:", err) - return - } - if err := sendReport(r.dsn, pkt, user); err != nil { - log.Println("Failed to send crash report:", err) - } - }() + if !r.sentry.Send(reportID, bs) { + log.Println("Failed to send report to sentry (queue full):", reportID) } } - -// 01234567890abcdef... => 01/23 -func (*crashReceiver) dirFor(base string) string { - return filepath.Join(base[0:2], base[2:4]) -} diff --git a/cmd/strelaypoolsrv/main.go b/cmd/strelaypoolsrv/main.go index 063097017..81bc11693 100644 --- a/cmd/strelaypoolsrv/main.go +++ b/cmd/strelaypoolsrv/main.go @@ -3,14 +3,12 @@ package main import ( - "compress/gzip" "context" "crypto/tls" "crypto/x509" "encoding/json" "flag" "fmt" - "io" "log" "net" "net/http" @@ -19,11 +17,13 @@ import ( "path/filepath" "strconv" "strings" + "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/syncthing/syncthing/lib/httpcache" "github.com/syncthing/syncthing/lib/protocol" - "github.com/golang/groupcache/lru" "github.com/oschwald/geoip2-golang" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -33,7 +33,6 @@ import ( "github.com/syncthing/syncthing/lib/relay/client" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/tlsutil" - "golang.org/x/time/rate" ) type location struct { @@ -99,27 +98,13 @@ var ( dir string evictionTime = time.Hour debug bool - getLRUSize = 10 << 10 - getLimitBurst = 10 - getLimitAvg = 2 - postLRUSize = 1 << 10 - postLimitBurst = 2 - postLimitAvg = 2 - getLimit time.Duration - postLimit time.Duration permRelaysFile string ipHeader string geoipPath string proto string - statsRefresh = time.Minute / 2 - requestQueueLen = 10 - requestProcessors = 1 - - getMut = sync.NewMutex() - getLRUCache *lru.Cache - - postMut = sync.NewMutex() - postLRUCache *lru.Cache + statsRefresh = time.Minute + requestQueueLen = 64 + requestProcessors = 8 requests chan request @@ -127,6 +112,7 @@ var ( knownRelays = make([]*relay, 0) permanentRelays = make([]*relay, 0) evictionTimers = make(map[string]*time.Timer) + globalBlocklist = newErrorTracker(1000) ) const ( @@ -141,13 +127,8 @@ func main() { flag.StringVar(&dir, "keys", dir, "Directory where http-cert.pem and http-key.pem is stored for TLS listening") flag.BoolVar(&debug, "debug", debug, "Enable debug output") flag.DurationVar(&evictionTime, "eviction", evictionTime, "After how long the relay is evicted") - flag.IntVar(&getLRUSize, "get-limit-cache", getLRUSize, "Get request limiter cache size") - flag.IntVar(&getLimitAvg, "get-limit-avg", getLimitAvg, "Allowed average get request rate, per 10 s") - flag.IntVar(&getLimitBurst, "get-limit-burst", getLimitBurst, "Allowed burst get requests") - flag.IntVar(&postLRUSize, "post-limit-cache", postLRUSize, "Post request limiter cache size") - flag.IntVar(&postLimitAvg, "post-limit-avg", postLimitAvg, "Allowed average post request rate, per minute") - flag.IntVar(&postLimitBurst, "post-limit-burst", postLimitBurst, "Allowed burst post requests") flag.StringVar(&permRelaysFile, "perm-relays", "", "Path to list of permanent relays") + flag.StringVar(&knownRelaysFile, "known-relays", knownRelaysFile, "Path to list of current relays") flag.StringVar(&ipHeader, "ip-header", "", "Name of header which holds clients ip:port. Only meaningful when running behind a reverse proxy.") flag.StringVar(&geoipPath, "geoip", "GeoLite2-City.mmdb", "Path to GeoLite2-City database") flag.StringVar(&proto, "protocol", "tcp", "Protocol used for listening. 'tcp' for IPv4 and IPv6, 'tcp4' for IPv4, 'tcp6' for IPv6") @@ -159,12 +140,6 @@ func main() { requests = make(chan request, requestQueueLen) - getLimit = 10 * time.Second / time.Duration(getLimitAvg) - postLimit = time.Minute / time.Duration(postLimitAvg) - - getLRUCache = lru.New(getLRUSize) - postLRUCache = lru.New(postLRUSize) - var listener net.Listener var err error @@ -240,7 +215,7 @@ func main() { handler := http.NewServeMux() handler.HandleFunc("/", handleAssets) - handler.HandleFunc("/endpoint", handleRequest) + handler.Handle("/endpoint", httpcache.SinglePath(http.HandlerFunc(handleRequest), 15*time.Second)) handler.HandleFunc("/metrics", handleMetrics) srv := http.Server{ @@ -291,21 +266,17 @@ func handleRequest(w http.ResponseWriter, r *http.Request) { }() if ipHeader != "" { - r.RemoteAddr = r.Header.Get(ipHeader) + hdr := r.Header.Get(ipHeader) + fields := strings.Split(hdr, ",") + if len(fields) > 0 { + r.RemoteAddr = strings.TrimSpace(fields[len(fields)-1]) + } } w.Header().Set("Access-Control-Allow-Origin", "*") switch r.Method { case "GET": - if limit(r.RemoteAddr, getLRUCache, getMut, getLimit, getLimitBurst) { - w.WriteHeader(httpStatusEnhanceYourCalm) - return - } handleGetRequest(w, r) case "POST": - if limit(r.RemoteAddr, postLRUCache, postMut, postLimit, postLimitBurst) { - w.WriteHeader(httpStatusEnhanceYourCalm) - return - } handlePostRequest(w, r) default: if debug { @@ -327,20 +298,28 @@ func handleGetRequest(rw http.ResponseWriter, r *http.Request) { // Shuffle rand.Shuffle(relays) - w := io.Writer(rw) - if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - rw.Header().Set("Content-Encoding", "gzip") - gw := gzip.NewWriter(rw) - defer gw.Close() - w = gw - } - - _ = json.NewEncoder(w).Encode(map[string][]*relay{ + _ = json.NewEncoder(rw).Encode(map[string][]*relay{ "relays": relays, }) } func handlePostRequest(w http.ResponseWriter, r *http.Request) { + // Get the IP address of the client + rhost := r.RemoteAddr + if host, _, err := net.SplitHostPort(rhost); err == nil { + rhost = host + } + + // Check the black list. A client is blacklisted if their last 10 + // attempts to join have all failed. The "Unauthorized" status return + // causes strelaysrv to cease attempting to join. + if globalBlocklist.IsBlocked(rhost) { + log.Println("Rejected blocked client", rhost) + http.Error(w, "Too many errors", http.StatusUnauthorized) + globalBlocklist.ClearErrors(rhost) + return + } + var relayCert *x509.Certificate if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 { relayCert = r.TLS.PeerCertificates[0] @@ -392,12 +371,6 @@ func handlePostRequest(w http.ResponseWriter, r *http.Request) { return } - // Get the IP address of the client - rhost := r.RemoteAddr - if host, _, err := net.SplitHostPort(rhost); err == nil { - rhost = host - } - ip := net.ParseIP(host) // The client did not provide an IP address, use the IP address of the client. if ip == nil || ip.IsUnspecified() { @@ -429,10 +402,14 @@ func handlePostRequest(w http.ResponseWriter, r *http.Request) { case requests <- request{&newRelay, reschan, prometheus.NewTimer(relayTestActionsSeconds.WithLabelValues("queue"))}: result := <-reschan if result.err != nil { + log.Println("Join from", r.RemoteAddr, "failed:", result.err) + globalBlocklist.AddError(rhost) relayTestsTotal.WithLabelValues("failed").Inc() http.Error(w, result.err.Error(), http.StatusBadRequest) return } + log.Println("Join from", r.RemoteAddr, "succeeded") + globalBlocklist.ClearErrors(rhost) relayTestsTotal.WithLabelValues("success").Inc() w.Header().Set("Content-Type", "application/json; charset=utf-8") json.NewEncoder(w).Encode(map[string]time.Duration{ @@ -546,23 +523,6 @@ func evict(relay *relay) func() { } } -func limit(addr string, cache *lru.Cache, lock sync.Mutex, intv time.Duration, burst int) bool { - if host, _, err := net.SplitHostPort(addr); err == nil { - addr = host - } - - lock.Lock() - v, _ := cache.Get(addr) - bkt, ok := v.(*rate.Limiter) - if !ok { - bkt = rate.NewLimiter(rate.Every(intv), burst) - cache.Add(addr, bkt) - } - lock.Unlock() - - return !bkt.Allow() -} - func loadRelays(file string) []*relay { content, err := os.ReadFile(file) if err != nil { @@ -602,7 +562,7 @@ func saveRelays(file string, relays []*relay) error { for _, relay := range relays { content += relay.uri.String() + "\n" } - return os.WriteFile(file, []byte(content), 0777) + return os.WriteFile(file, []byte(content), 0o777) } func createTestCertificate() tls.Certificate { @@ -661,3 +621,42 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) { lrw.statusCode = code lrw.ResponseWriter.WriteHeader(code) } + +type errorTracker struct { + errors *lru.TwoQueueCache[string, *errorCounter] +} + +type errorCounter struct { + count atomic.Int32 +} + +func newErrorTracker(size int) *errorTracker { + cache, err := lru.New2Q[string, *errorCounter](size) + if err != nil { + panic(err) + } + return &errorTracker{ + errors: cache, + } +} + +func (b *errorTracker) AddError(host string) { + entry, ok := b.errors.Get(host) + if !ok { + entry = &errorCounter{} + b.errors.Add(host, entry) + } + c := entry.count.Add(1) + log.Printf("Error count for %s is now %d", host, c) +} + +func (b *errorTracker) ClearErrors(host string) { + b.errors.Remove(host) +} + +func (b *errorTracker) IsBlocked(host string) bool { + if be, ok := b.errors.Get(host); ok { + return be.count.Load() > 10 + } + return false +} diff --git a/cmd/stupgrades/main.go b/cmd/stupgrades/main.go index 429f026e3..3b2c79ff2 100644 --- a/cmd/stupgrades/main.go +++ b/cmd/stupgrades/main.go @@ -7,31 +7,112 @@ package main import ( + "bytes" "encoding/json" - "flag" + "fmt" + "io" + "log" + "net/http" "os" "sort" + "strings" + "time" + "github.com/alecthomas/kong" + "github.com/syncthing/syncthing/lib/httpcache" "github.com/syncthing/syncthing/lib/upgrade" ) -const defaultURL = "https://api.github.com/repos/syncthing/syncthing/releases?per_page=25" +type cli struct { + Listen string `default:":8080" help:"Listen address"` + URL string `short:"u" default:"https://api.github.com/repos/syncthing/syncthing/releases?per_page=25" help:"GitHub releases url"` + Forward []string `short:"f" help:"Forwarded pages, format: /path->https://example/com/url"` + CacheTime time.Duration `default:"15m" help:"Cache time"` +} func main() { - url := flag.String("u", defaultURL, "GitHub releases url") - flag.Parse() - - rels := upgrade.FetchLatestReleases(*url, "") - if rels == nil { - // An error was already logged + var params cli + kong.Parse(¶ms) + if err := server(¶ms); err != nil { + fmt.Printf("Error: %v\n", err) os.Exit(1) } +} + +func server(params *cli) error { + http.Handle("/meta.json", httpcache.SinglePath(&githubReleases{url: params.URL}, params.CacheTime)) + + for _, fwd := range params.Forward { + path, url, ok := strings.Cut(fwd, "->") + if !ok { + return fmt.Errorf("invalid forward: %q", fwd) + } + http.Handle(path, httpcache.SinglePath(&proxy{url: url}, params.CacheTime)) + } + + return http.ListenAndServe(params.Listen, nil) +} + +type githubReleases struct { + url string +} + +func (p *githubReleases) ServeHTTP(w http.ResponseWriter, req *http.Request) { + log.Println("Fetching", p.url) + rels := upgrade.FetchLatestReleases(p.url, "") + if rels == nil { + http.Error(w, "no releases", http.StatusInternalServerError) + return + } sort.Sort(upgrade.SortByRelease(rels)) rels = filterForLatest(rels) - if err := json.NewEncoder(os.Stdout).Encode(rels); err != nil { - os.Exit(1) + buf := new(bytes.Buffer) + _ = json.NewEncoder(buf).Encode(rels) + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET") + w.Write(buf.Bytes()) +} + +type proxy struct { + url string +} + +func (p *proxy) ServeHTTP(w http.ResponseWriter, req *http.Request) { + log.Println("Fetching", p.url) + req, err := http.NewRequestWithContext(req.Context(), http.MethodGet, p.url, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer resp.Body.Close() + + ct := resp.Header.Get("Content-Type") + w.Header().Set("Content-Type", ct) + if resp.StatusCode == http.StatusOK { + w.Header().Set("Cache-Control", "public, max-age=900") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET") + } + w.WriteHeader(resp.StatusCode) + if strings.HasPrefix(ct, "application/json") { + // Special JSON handling; clean it up a bit. + var v interface{} + if err := json.NewDecoder(resp.Body).Decode(&v); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _ = json.NewEncoder(w).Encode(v) + } else { + _, _ = io.Copy(w, resp.Body) } } diff --git a/lib/httpcache/httpcache.go b/lib/httpcache/httpcache.go new file mode 100644 index 000000000..fd56b45e0 --- /dev/null +++ b/lib/httpcache/httpcache.go @@ -0,0 +1,124 @@ +package httpcache + +import ( + "bytes" + "compress/gzip" + "context" + "fmt" + "net/http" + "strings" + "sync" + "time" +) + +type SinglePathCache struct { + next http.Handler + keep time.Duration + + mut sync.RWMutex + resp *recordedResponse +} + +func SinglePath(next http.Handler, keep time.Duration) *SinglePathCache { + return &SinglePathCache{ + next: next, + keep: keep, + } +} + +type recordedResponse struct { + status int + header http.Header + data []byte + gzip []byte + when time.Time + keep time.Duration +} + +func (resp *recordedResponse) ServeHTTP(w http.ResponseWriter, r *http.Request) { + for k, v := range resp.header { + w.Header()[k] = v + } + + w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d", int(resp.keep.Seconds()))) + + if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { + w.Header().Set("Content-Encoding", "gzip") + w.Header().Set("Content-Length", fmt.Sprint(len(resp.gzip))) + w.WriteHeader(resp.status) + _, _ = w.Write(resp.gzip) + return + } + + w.Header().Set("Content-Length", fmt.Sprint(len(resp.data))) + w.WriteHeader(resp.status) + _, _ = w.Write(resp.data) +} + +type responseRecorder struct { + resp *recordedResponse +} + +func (r *responseRecorder) WriteHeader(status int) { + r.resp.status = status +} + +func (r *responseRecorder) Write(data []byte) (int, error) { + r.resp.data = append(r.resp.data, data...) + return len(data), nil +} + +func (r *responseRecorder) Header() http.Header { + return r.resp.header +} + +func (s *SinglePathCache) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + s.next.ServeHTTP(w, r) + return + } + + w.Header().Set("X-Cache", "MISS") + + s.mut.RLock() + ok := s.serveCached(w, r) + s.mut.RUnlock() + if ok { + return + } + + s.mut.Lock() + defer s.mut.Unlock() + if s.serveCached(w, r) { + return + } + + rec := &recordedResponse{status: http.StatusOK, header: make(http.Header), when: time.Now(), keep: s.keep} + childRec := r.Clone(context.Background()) + childRec.Header.Del("Accept-Encoding") // don't let the client dictate the encoding + s.next.ServeHTTP(&responseRecorder{resp: rec}, childRec) + + if rec.status == http.StatusOK { + buf := new(bytes.Buffer) + gw := gzip.NewWriter(buf) + _, _ = gw.Write(rec.data) + gw.Close() + rec.gzip = buf.Bytes() + + s.resp = rec + } + + rec.ServeHTTP(w, r) +} + +func (s *SinglePathCache) serveCached(w http.ResponseWriter, r *http.Request) bool { + if s.resp == nil || time.Since(s.resp.when) > s.keep { + return false + } + + w.Header().Set("X-Cache", "HIT") + w.Header().Set("X-Cache-From", s.resp.when.Format(time.RFC3339)) + + s.resp.ServeHTTP(w, r) + return true +} diff --git a/script/strelaypoolsrv-entrypoint.sh b/script/strelaypoolsrv-entrypoint.sh new file mode 100755 index 000000000..945c77d5b --- /dev/null +++ b/script/strelaypoolsrv-entrypoint.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +set -eu + +if [ "$MAXMIND_KEY" != "" ] ; then + curl "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City&license_key=${MAXMIND_KEY}&suffix=tar.gz" \ + | tar --strip-components 1 -zxv +fi + +exec "$@"