cmd, docker: Updates for infrastructure

These are some changes to the relay pool server, upgrade server, and
crash receiver to run under Kubernetes. It's been in production for a
while.
This commit is contained in:
Jakob Borg 2023-01-23 08:38:55 +01:00
parent a6c2a5a0ce
commit 65cfefaa3c
12 changed files with 653 additions and 161 deletions

View File

@ -0,0 +1,18 @@
ARG GOVERSION=latest
FROM golang:$GOVERSION AS builder
WORKDIR /src
COPY . .
ENV CGO_ENABLED=0
ENV BUILD_HOST=syncthing.net
ENV BUILD_USER=docker
RUN rm -f stcrashreceiver && go run build.go build stcrashreceiver
FROM alpine
EXPOSE 8080
COPY --from=builder /src/stcrashreceiver /bin/stcrashreceiver
ENTRYPOINT [ "/bin/stcrashreceiver" ]

26
Dockerfile.strelaypoolsrv Normal file
View File

@ -0,0 +1,26 @@
ARG GOVERSION=latest
FROM golang:$GOVERSION AS builder
WORKDIR /src
COPY . .
ENV CGO_ENABLED=0
ENV BUILD_HOST=syncthing.net
ENV BUILD_USER=docker
RUN rm -f strelaysrv && go run build.go -no-upgrade build strelaypoolsrv
FROM alpine
EXPOSE 8080
RUN apk add --no-cache ca-certificates su-exec curl
ENV PUID=1000 PGID=1000 MAXMIND_KEY=
RUN mkdir /var/strelaypoolsrv && chown 1000 /var/strelaypoolsrv
USER 1000
COPY --from=builder /src/strelaypoolsrv /bin/strelaypoolsrv
COPY --from=builder /src/script/strelaypoolsrv-entrypoint.sh /bin/entrypoint.sh
WORKDIR /var/strelaypoolsrv
ENTRYPOINT ["/bin/entrypoint.sh", "/bin/strelaypoolsrv", "-listen", ":8080"]

23
Dockerfile.stupgrades Normal file
View File

@ -0,0 +1,23 @@
ARG GOVERSION=latest
FROM golang:$GOVERSION AS builder
WORKDIR /src
COPY . .
ENV CGO_ENABLED=0
ENV BUILD_HOST=syncthing.net
ENV BUILD_USER=docker
RUN rm -f stupgrades && go run build.go build stupgrades
FROM alpine
EXPOSE 8080
COPY --from=builder /src/stupgrades /bin/stupgrades
ENTRYPOINT [ \
"/bin/stupgrades", \
"-f", "/nightly.json->https://build.syncthing.net/guestAuth/repository/download/Release_Nightly/.lastSuccessful/nightly.json", \
"-f", "/syncthing-macos/appcast.xml->https://build.syncthing.net/guestAuth/repository/download/SyncthingMacOS_CreateAppcastXml/.lastSuccessful/appcast.xml" \
]

View File

@ -207,6 +207,18 @@ var targets = map[string]target{
{src: "AUTHORS", dst: "deb/usr/share/doc/syncthing-relaypoolsrv/AUTHORS.txt", perm: 0644}, {src: "AUTHORS", dst: "deb/usr/share/doc/syncthing-relaypoolsrv/AUTHORS.txt", perm: 0644},
}, },
}, },
"stupgrades": {
name: "stupgrades",
description: "Syncthing Upgrade Check Server",
buildPkgs: []string{"github.com/syncthing/syncthing/cmd/stupgrades"},
binaryName: "stupgrades",
},
"stcrashreceiver": {
name: "stupgrastcrashreceiverdes",
description: "Syncthing Crash Server",
buildPkgs: []string{"github.com/syncthing/syncthing/cmd/stcrashreceiver"},
binaryName: "stcrashreceiver",
},
} }
func initTargets() { func initTargets() {

View File

@ -0,0 +1,181 @@
package main
import (
"bytes"
"compress/gzip"
"context"
"io"
"log"
"os"
"path/filepath"
"sort"
"time"
)
type diskStore struct {
dir string
inbox chan diskEntry
maxBytes int64
maxFiles int
currentFiles []currentFile
currentSize int64
}
type diskEntry struct {
path string
data []byte
}
type currentFile struct {
path string
size int64
mtime int64
}
func (d *diskStore) Serve(ctx context.Context) {
if err := os.MkdirAll(d.dir, 0750); err != nil {
log.Println("Creating directory:", err)
return
}
if err := d.inventory(); err != nil {
log.Println("Failed to inventory disk store:", err)
}
d.clean()
cleanTimer := time.NewTicker(time.Minute)
inventoryTimer := time.NewTicker(24 * time.Hour)
buf := new(bytes.Buffer)
gw := gzip.NewWriter(buf)
for {
select {
case entry := <-d.inbox:
path := d.fullPath(entry.path)
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
log.Println("Creating directory:", err)
continue
}
buf.Reset()
gw.Reset(buf)
if _, err := gw.Write(entry.data); err != nil {
log.Println("Failed to compress crash report:", err)
continue
}
if err := gw.Close(); err != nil {
log.Println("Failed to compress crash report:", err)
continue
}
if err := os.WriteFile(path, buf.Bytes(), 0644); err != nil {
log.Printf("Failed to write %s: %v", entry.path, err)
_ = os.Remove(path)
continue
}
d.currentSize += int64(buf.Len())
d.currentFiles = append(d.currentFiles, currentFile{
size: int64(len(entry.data)),
path: path,
})
case <-cleanTimer.C:
d.clean()
case <-inventoryTimer.C:
if err := d.inventory(); err != nil {
log.Println("Failed to inventory disk store:", err)
}
case <-ctx.Done():
return
}
}
}
func (d *diskStore) Put(path string, data []byte) bool {
select {
case d.inbox <- diskEntry{
path: path,
data: data,
}:
return true
default:
return false
}
}
func (d *diskStore) Get(path string) ([]byte, error) {
path = d.fullPath(path)
bs, err := os.ReadFile(path)
if err != nil {
return nil, err
}
gr, err := gzip.NewReader(bytes.NewReader(bs))
if err != nil {
return nil, err
}
defer gr.Close()
return io.ReadAll(gr)
}
func (d *diskStore) Exists(path string) bool {
path = d.fullPath(path)
_, err := os.Lstat(path)
return err == nil
}
func (d *diskStore) clean() {
for len(d.currentFiles) > 0 && (len(d.currentFiles) > d.maxFiles || d.currentSize > d.maxBytes) {
f := d.currentFiles[0]
log.Println("Removing", f.path)
if err := os.Remove(f.path); err != nil {
log.Println("Failed to remove file:", err)
}
d.currentFiles = d.currentFiles[1:]
d.currentSize -= f.size
}
var oldest time.Duration
if len(d.currentFiles) > 0 {
oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute)
}
log.Printf("Clean complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest)
}
func (d *diskStore) inventory() error {
d.currentFiles = nil
d.currentSize = 0
err := filepath.Walk(d.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if filepath.Ext(path) != ".gz" {
return nil
}
d.currentSize += info.Size()
d.currentFiles = append(d.currentFiles, currentFile{
path: path,
size: info.Size(),
mtime: info.ModTime().Unix(),
})
return nil
})
sort.Slice(d.currentFiles, func(i, j int) bool {
return d.currentFiles[i].mtime < d.currentFiles[j].mtime
})
var oldest time.Duration
if len(d.currentFiles) > 0 {
oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute)
}
log.Printf("Inventory complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest)
return err
}
func (d *diskStore) fullPath(path string) string {
return filepath.Join(d.dir, path[0:2], path[2:]) + ".gz"
}

View File

@ -13,15 +13,17 @@
package main package main
import ( import (
"context"
"encoding/json" "encoding/json"
"flag"
"fmt" "fmt"
"io" "io"
"log" "log"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"time"
"github.com/alecthomas/kong"
"github.com/syncthing/syncthing/lib/sha256" "github.com/syncthing/syncthing/lib/sha256"
"github.com/syncthing/syncthing/lib/ur" "github.com/syncthing/syncthing/lib/ur"
@ -30,26 +32,50 @@ import (
const maxRequestSize = 1 << 20 // 1 MiB const maxRequestSize = 1 << 20 // 1 MiB
type cli struct {
Dir string `help:"Parent directory to store crash and failure reports in" env:"REPORTS_DIR" default:"."`
DSN string `help:"Sentry DSN" env:"SENTRY_DSN"`
Listen string `help:"HTTP listen address" default:":8080" env:"LISTEN_ADDRESS"`
MaxDiskFiles int `help:"Maximum number of reports on disk" default:"100000" env:"MAX_DISK_FILES"`
MaxDiskSizeMB int64 `help:"Maximum disk space to use for reports" default:"1024" env:"MAX_DISK_SIZE_MB"`
CleanInterval time.Duration `help:"Interval between cleaning up old reports" default:"12h" env:"CLEAN_INTERVAL"`
SentryQueue int `help:"Maximum number of reports to queue for sending to Sentry" default:"64" env:"SENTRY_QUEUE"`
DiskQueue int `help:"Maximum number of reports to queue for writing to disk" default:"64" env:"DISK_QUEUE"`
}
func main() { func main() {
dir := flag.String("dir", ".", "Parent directory to store crash and failure reports in") var params cli
dsn := flag.String("dsn", "", "Sentry DSN") kong.Parse(&params)
listen := flag.String("listen", ":22039", "HTTP listen address")
flag.Parse()
mux := http.NewServeMux() mux := http.NewServeMux()
cr := &crashReceiver{ ds := &diskStore{
dir: filepath.Join(*dir, "crash_reports"), dir: filepath.Join(params.Dir, "crash_reports"),
dsn: *dsn, inbox: make(chan diskEntry, params.DiskQueue),
maxFiles: params.MaxDiskFiles,
maxBytes: params.MaxDiskSizeMB << 20,
} }
go ds.Serve(context.Background())
ss := &sentryService{
dsn: params.DSN,
inbox: make(chan sentryRequest, params.SentryQueue),
}
go ss.Serve(context.Background())
cr := &crashReceiver{
store: ds,
sentry: ss,
}
mux.Handle("/", cr) mux.Handle("/", cr)
if *dsn != "" { if params.DSN != "" {
mux.HandleFunc("/newcrash/failure", handleFailureFn(*dsn, filepath.Join(*dir, "failure_reports"))) mux.HandleFunc("/newcrash/failure", handleFailureFn(params.DSN, filepath.Join(params.Dir, "failure_reports")))
} }
log.SetOutput(os.Stdout) log.SetOutput(os.Stdout)
if err := http.ListenAndServe(*listen, mux); err != nil { if err := http.ListenAndServe(params.Listen, mux); err != nil {
log.Fatalln("HTTP serve:", err) log.Fatalln("HTTP serve:", err)
} }
} }

View File

@ -8,8 +8,10 @@ package main
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"io" "io"
"log"
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
@ -31,6 +33,44 @@ var (
clientsMut sync.Mutex clientsMut sync.Mutex
) )
type sentryService struct {
dsn string
inbox chan sentryRequest
}
type sentryRequest struct {
reportID string
data []byte
}
func (s *sentryService) Serve(ctx context.Context) {
for {
select {
case req := <-s.inbox:
pkt, err := parseCrashReport(req.reportID, req.data)
if err != nil {
log.Println("Failed to parse crash report:", err)
continue
}
if err := sendReport(s.dsn, pkt, req.reportID); err != nil {
log.Println("Failed to send crash report:", err)
}
case <-ctx.Done():
return
}
}
}
func (s *sentryService) Send(reportID string, data []byte) bool {
select {
case s.inbox <- sentryRequest{reportID, data}:
return true
default:
return false
}
}
func sendReport(dsn string, pkt *raven.Packet, userID string) error { func sendReport(dsn string, pkt *raven.Packet, userID string) error {
pkt.Interfaces = append(pkt.Interfaces, &raven.User{ID: userID}) pkt.Interfaces = append(pkt.Interfaces, &raven.User{ID: userID})

View File

@ -7,20 +7,16 @@
package main package main
import ( import (
"bytes"
"compress/gzip"
"io" "io"
"log" "log"
"net/http" "net/http"
"os"
"path" "path"
"path/filepath"
"strings" "strings"
) )
type crashReceiver struct { type crashReceiver struct {
dir string store *diskStore
dsn string sentry *sentryService
} }
func (r *crashReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *crashReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -43,55 +39,38 @@ func (r *crashReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
// The location of the report on disk, compressed
fullPath := filepath.Join(r.dir, r.dirFor(reportID), reportID) + ".gz"
switch req.Method { switch req.Method {
case http.MethodGet: case http.MethodGet:
r.serveGet(fullPath, w, req) r.serveGet(reportID, w, req)
case http.MethodHead: case http.MethodHead:
r.serveHead(fullPath, w, req) r.serveHead(reportID, w, req)
case http.MethodPut: case http.MethodPut:
r.servePut(reportID, fullPath, w, req) r.servePut(reportID, w, req)
default: default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
} }
} }
// serveGet responds to GET requests by serving the uncompressed report. // serveGet responds to GET requests by serving the uncompressed report.
func (*crashReceiver) serveGet(fullPath string, w http.ResponseWriter, _ *http.Request) { func (r *crashReceiver) serveGet(reportID string, w http.ResponseWriter, _ *http.Request) {
fd, err := os.Open(fullPath) bs, err := r.store.Get(reportID)
if err != nil { if err != nil {
http.Error(w, "Not found", http.StatusNotFound) http.Error(w, "Not found", http.StatusNotFound)
return return
} }
w.Write(bs)
defer fd.Close()
gr, err := gzip.NewReader(fd)
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
_, _ = io.Copy(w, gr) // best effort
} }
// serveHead responds to HEAD requests by checking if the named report // serveHead responds to HEAD requests by checking if the named report
// already exists in the system. // already exists in the system.
func (*crashReceiver) serveHead(fullPath string, w http.ResponseWriter, _ *http.Request) { func (r *crashReceiver) serveHead(reportID string, w http.ResponseWriter, _ *http.Request) {
if _, err := os.Lstat(fullPath); err != nil { if !r.store.Exists(reportID) {
http.Error(w, "Not found", http.StatusNotFound) http.Error(w, "Not found", http.StatusNotFound)
} }
} }
// servePut accepts and stores the given report. // servePut accepts and stores the given report.
func (r *crashReceiver) servePut(reportID, fullPath string, w http.ResponseWriter, req *http.Request) { func (r *crashReceiver) servePut(reportID string, w http.ResponseWriter, req *http.Request) {
// Ensure the destination directory exists
if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
log.Println("Creating directory:", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
// Read at most maxRequestSize of report data. // Read at most maxRequestSize of report data.
log.Println("Receiving report", reportID) log.Println("Receiving report", reportID)
lr := io.LimitReader(req.Body, maxRequestSize) lr := io.LimitReader(req.Body, maxRequestSize)
@ -102,40 +81,13 @@ func (r *crashReceiver) servePut(reportID, fullPath string, w http.ResponseWrite
return return
} }
// Compress the report for storage // Store the report
buf := new(bytes.Buffer) if !r.store.Put(reportID, bs) {
gw := gzip.NewWriter(buf) log.Println("Failed to store report (queue full):", reportID)
_, _ = gw.Write(bs) // can't fail
gw.Close()
// Create an output file with the compressed report
err = os.WriteFile(fullPath, buf.Bytes(), 0644)
if err != nil {
log.Println("Saving report:", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
} }
// Send the report to Sentry // Send the report to Sentry
if r.dsn != "" { if !r.sentry.Send(reportID, bs) {
// Remote ID log.Println("Failed to send report to sentry (queue full):", reportID)
user := userIDFor(req)
go func() {
// There's no need for the client to have to wait for this part.
pkt, err := parseCrashReport(reportID, bs)
if err != nil {
log.Println("Failed to parse crash report:", err)
return
}
if err := sendReport(r.dsn, pkt, user); err != nil {
log.Println("Failed to send crash report:", err)
}
}()
} }
} }
// 01234567890abcdef... => 01/23
func (*crashReceiver) dirFor(base string) string {
return filepath.Join(base[0:2], base[2:4])
}

View File

@ -3,14 +3,12 @@
package main package main
import ( import (
"compress/gzip"
"context" "context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"io"
"log" "log"
"net" "net"
"net/http" "net/http"
@ -19,11 +17,13 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time" "time"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/syncthing/syncthing/lib/httpcache"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/golang/groupcache/lru"
"github.com/oschwald/geoip2-golang" "github.com/oschwald/geoip2-golang"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -33,7 +33,6 @@ import (
"github.com/syncthing/syncthing/lib/relay/client" "github.com/syncthing/syncthing/lib/relay/client"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/tlsutil"
"golang.org/x/time/rate"
) )
type location struct { type location struct {
@ -99,27 +98,13 @@ var (
dir string dir string
evictionTime = time.Hour evictionTime = time.Hour
debug bool debug bool
getLRUSize = 10 << 10
getLimitBurst = 10
getLimitAvg = 2
postLRUSize = 1 << 10
postLimitBurst = 2
postLimitAvg = 2
getLimit time.Duration
postLimit time.Duration
permRelaysFile string permRelaysFile string
ipHeader string ipHeader string
geoipPath string geoipPath string
proto string proto string
statsRefresh = time.Minute / 2 statsRefresh = time.Minute
requestQueueLen = 10 requestQueueLen = 64
requestProcessors = 1 requestProcessors = 8
getMut = sync.NewMutex()
getLRUCache *lru.Cache
postMut = sync.NewMutex()
postLRUCache *lru.Cache
requests chan request requests chan request
@ -127,6 +112,7 @@ var (
knownRelays = make([]*relay, 0) knownRelays = make([]*relay, 0)
permanentRelays = make([]*relay, 0) permanentRelays = make([]*relay, 0)
evictionTimers = make(map[string]*time.Timer) evictionTimers = make(map[string]*time.Timer)
globalBlocklist = newErrorTracker(1000)
) )
const ( const (
@ -141,13 +127,8 @@ func main() {
flag.StringVar(&dir, "keys", dir, "Directory where http-cert.pem and http-key.pem is stored for TLS listening") flag.StringVar(&dir, "keys", dir, "Directory where http-cert.pem and http-key.pem is stored for TLS listening")
flag.BoolVar(&debug, "debug", debug, "Enable debug output") flag.BoolVar(&debug, "debug", debug, "Enable debug output")
flag.DurationVar(&evictionTime, "eviction", evictionTime, "After how long the relay is evicted") flag.DurationVar(&evictionTime, "eviction", evictionTime, "After how long the relay is evicted")
flag.IntVar(&getLRUSize, "get-limit-cache", getLRUSize, "Get request limiter cache size")
flag.IntVar(&getLimitAvg, "get-limit-avg", getLimitAvg, "Allowed average get request rate, per 10 s")
flag.IntVar(&getLimitBurst, "get-limit-burst", getLimitBurst, "Allowed burst get requests")
flag.IntVar(&postLRUSize, "post-limit-cache", postLRUSize, "Post request limiter cache size")
flag.IntVar(&postLimitAvg, "post-limit-avg", postLimitAvg, "Allowed average post request rate, per minute")
flag.IntVar(&postLimitBurst, "post-limit-burst", postLimitBurst, "Allowed burst post requests")
flag.StringVar(&permRelaysFile, "perm-relays", "", "Path to list of permanent relays") flag.StringVar(&permRelaysFile, "perm-relays", "", "Path to list of permanent relays")
flag.StringVar(&knownRelaysFile, "known-relays", knownRelaysFile, "Path to list of current relays")
flag.StringVar(&ipHeader, "ip-header", "", "Name of header which holds clients ip:port. Only meaningful when running behind a reverse proxy.") flag.StringVar(&ipHeader, "ip-header", "", "Name of header which holds clients ip:port. Only meaningful when running behind a reverse proxy.")
flag.StringVar(&geoipPath, "geoip", "GeoLite2-City.mmdb", "Path to GeoLite2-City database") flag.StringVar(&geoipPath, "geoip", "GeoLite2-City.mmdb", "Path to GeoLite2-City database")
flag.StringVar(&proto, "protocol", "tcp", "Protocol used for listening. 'tcp' for IPv4 and IPv6, 'tcp4' for IPv4, 'tcp6' for IPv6") flag.StringVar(&proto, "protocol", "tcp", "Protocol used for listening. 'tcp' for IPv4 and IPv6, 'tcp4' for IPv4, 'tcp6' for IPv6")
@ -159,12 +140,6 @@ func main() {
requests = make(chan request, requestQueueLen) requests = make(chan request, requestQueueLen)
getLimit = 10 * time.Second / time.Duration(getLimitAvg)
postLimit = time.Minute / time.Duration(postLimitAvg)
getLRUCache = lru.New(getLRUSize)
postLRUCache = lru.New(postLRUSize)
var listener net.Listener var listener net.Listener
var err error var err error
@ -240,7 +215,7 @@ func main() {
handler := http.NewServeMux() handler := http.NewServeMux()
handler.HandleFunc("/", handleAssets) handler.HandleFunc("/", handleAssets)
handler.HandleFunc("/endpoint", handleRequest) handler.Handle("/endpoint", httpcache.SinglePath(http.HandlerFunc(handleRequest), 15*time.Second))
handler.HandleFunc("/metrics", handleMetrics) handler.HandleFunc("/metrics", handleMetrics)
srv := http.Server{ srv := http.Server{
@ -291,21 +266,17 @@ func handleRequest(w http.ResponseWriter, r *http.Request) {
}() }()
if ipHeader != "" { if ipHeader != "" {
r.RemoteAddr = r.Header.Get(ipHeader) hdr := r.Header.Get(ipHeader)
fields := strings.Split(hdr, ",")
if len(fields) > 0 {
r.RemoteAddr = strings.TrimSpace(fields[len(fields)-1])
}
} }
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
switch r.Method { switch r.Method {
case "GET": case "GET":
if limit(r.RemoteAddr, getLRUCache, getMut, getLimit, getLimitBurst) {
w.WriteHeader(httpStatusEnhanceYourCalm)
return
}
handleGetRequest(w, r) handleGetRequest(w, r)
case "POST": case "POST":
if limit(r.RemoteAddr, postLRUCache, postMut, postLimit, postLimitBurst) {
w.WriteHeader(httpStatusEnhanceYourCalm)
return
}
handlePostRequest(w, r) handlePostRequest(w, r)
default: default:
if debug { if debug {
@ -327,20 +298,28 @@ func handleGetRequest(rw http.ResponseWriter, r *http.Request) {
// Shuffle // Shuffle
rand.Shuffle(relays) rand.Shuffle(relays)
w := io.Writer(rw) _ = json.NewEncoder(rw).Encode(map[string][]*relay{
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
rw.Header().Set("Content-Encoding", "gzip")
gw := gzip.NewWriter(rw)
defer gw.Close()
w = gw
}
_ = json.NewEncoder(w).Encode(map[string][]*relay{
"relays": relays, "relays": relays,
}) })
} }
func handlePostRequest(w http.ResponseWriter, r *http.Request) { func handlePostRequest(w http.ResponseWriter, r *http.Request) {
// Get the IP address of the client
rhost := r.RemoteAddr
if host, _, err := net.SplitHostPort(rhost); err == nil {
rhost = host
}
// Check the black list. A client is blacklisted if their last 10
// attempts to join have all failed. The "Unauthorized" status return
// causes strelaysrv to cease attempting to join.
if globalBlocklist.IsBlocked(rhost) {
log.Println("Rejected blocked client", rhost)
http.Error(w, "Too many errors", http.StatusUnauthorized)
globalBlocklist.ClearErrors(rhost)
return
}
var relayCert *x509.Certificate var relayCert *x509.Certificate
if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 { if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 {
relayCert = r.TLS.PeerCertificates[0] relayCert = r.TLS.PeerCertificates[0]
@ -392,12 +371,6 @@ func handlePostRequest(w http.ResponseWriter, r *http.Request) {
return return
} }
// Get the IP address of the client
rhost := r.RemoteAddr
if host, _, err := net.SplitHostPort(rhost); err == nil {
rhost = host
}
ip := net.ParseIP(host) ip := net.ParseIP(host)
// The client did not provide an IP address, use the IP address of the client. // The client did not provide an IP address, use the IP address of the client.
if ip == nil || ip.IsUnspecified() { if ip == nil || ip.IsUnspecified() {
@ -429,10 +402,14 @@ func handlePostRequest(w http.ResponseWriter, r *http.Request) {
case requests <- request{&newRelay, reschan, prometheus.NewTimer(relayTestActionsSeconds.WithLabelValues("queue"))}: case requests <- request{&newRelay, reschan, prometheus.NewTimer(relayTestActionsSeconds.WithLabelValues("queue"))}:
result := <-reschan result := <-reschan
if result.err != nil { if result.err != nil {
log.Println("Join from", r.RemoteAddr, "failed:", result.err)
globalBlocklist.AddError(rhost)
relayTestsTotal.WithLabelValues("failed").Inc() relayTestsTotal.WithLabelValues("failed").Inc()
http.Error(w, result.err.Error(), http.StatusBadRequest) http.Error(w, result.err.Error(), http.StatusBadRequest)
return return
} }
log.Println("Join from", r.RemoteAddr, "succeeded")
globalBlocklist.ClearErrors(rhost)
relayTestsTotal.WithLabelValues("success").Inc() relayTestsTotal.WithLabelValues("success").Inc()
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(map[string]time.Duration{ json.NewEncoder(w).Encode(map[string]time.Duration{
@ -546,23 +523,6 @@ func evict(relay *relay) func() {
} }
} }
func limit(addr string, cache *lru.Cache, lock sync.Mutex, intv time.Duration, burst int) bool {
if host, _, err := net.SplitHostPort(addr); err == nil {
addr = host
}
lock.Lock()
v, _ := cache.Get(addr)
bkt, ok := v.(*rate.Limiter)
if !ok {
bkt = rate.NewLimiter(rate.Every(intv), burst)
cache.Add(addr, bkt)
}
lock.Unlock()
return !bkt.Allow()
}
func loadRelays(file string) []*relay { func loadRelays(file string) []*relay {
content, err := os.ReadFile(file) content, err := os.ReadFile(file)
if err != nil { if err != nil {
@ -602,7 +562,7 @@ func saveRelays(file string, relays []*relay) error {
for _, relay := range relays { for _, relay := range relays {
content += relay.uri.String() + "\n" content += relay.uri.String() + "\n"
} }
return os.WriteFile(file, []byte(content), 0777) return os.WriteFile(file, []byte(content), 0o777)
} }
func createTestCertificate() tls.Certificate { func createTestCertificate() tls.Certificate {
@ -661,3 +621,42 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code) lrw.ResponseWriter.WriteHeader(code)
} }
type errorTracker struct {
errors *lru.TwoQueueCache[string, *errorCounter]
}
type errorCounter struct {
count atomic.Int32
}
func newErrorTracker(size int) *errorTracker {
cache, err := lru.New2Q[string, *errorCounter](size)
if err != nil {
panic(err)
}
return &errorTracker{
errors: cache,
}
}
func (b *errorTracker) AddError(host string) {
entry, ok := b.errors.Get(host)
if !ok {
entry = &errorCounter{}
b.errors.Add(host, entry)
}
c := entry.count.Add(1)
log.Printf("Error count for %s is now %d", host, c)
}
func (b *errorTracker) ClearErrors(host string) {
b.errors.Remove(host)
}
func (b *errorTracker) IsBlocked(host string) bool {
if be, ok := b.errors.Get(host); ok {
return be.count.Load() > 10
}
return false
}

View File

@ -7,31 +7,112 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"flag" "fmt"
"io"
"log"
"net/http"
"os" "os"
"sort" "sort"
"strings"
"time"
"github.com/alecthomas/kong"
"github.com/syncthing/syncthing/lib/httpcache"
"github.com/syncthing/syncthing/lib/upgrade" "github.com/syncthing/syncthing/lib/upgrade"
) )
const defaultURL = "https://api.github.com/repos/syncthing/syncthing/releases?per_page=25" type cli struct {
Listen string `default:":8080" help:"Listen address"`
URL string `short:"u" default:"https://api.github.com/repos/syncthing/syncthing/releases?per_page=25" help:"GitHub releases url"`
Forward []string `short:"f" help:"Forwarded pages, format: /path->https://example/com/url"`
CacheTime time.Duration `default:"15m" help:"Cache time"`
}
func main() { func main() {
url := flag.String("u", defaultURL, "GitHub releases url") var params cli
flag.Parse() kong.Parse(&params)
if err := server(&params); err != nil {
rels := upgrade.FetchLatestReleases(*url, "") fmt.Printf("Error: %v\n", err)
if rels == nil {
// An error was already logged
os.Exit(1) os.Exit(1)
} }
}
func server(params *cli) error {
http.Handle("/meta.json", httpcache.SinglePath(&githubReleases{url: params.URL}, params.CacheTime))
for _, fwd := range params.Forward {
path, url, ok := strings.Cut(fwd, "->")
if !ok {
return fmt.Errorf("invalid forward: %q", fwd)
}
http.Handle(path, httpcache.SinglePath(&proxy{url: url}, params.CacheTime))
}
return http.ListenAndServe(params.Listen, nil)
}
type githubReleases struct {
url string
}
func (p *githubReleases) ServeHTTP(w http.ResponseWriter, req *http.Request) {
log.Println("Fetching", p.url)
rels := upgrade.FetchLatestReleases(p.url, "")
if rels == nil {
http.Error(w, "no releases", http.StatusInternalServerError)
return
}
sort.Sort(upgrade.SortByRelease(rels)) sort.Sort(upgrade.SortByRelease(rels))
rels = filterForLatest(rels) rels = filterForLatest(rels)
if err := json.NewEncoder(os.Stdout).Encode(rels); err != nil { buf := new(bytes.Buffer)
os.Exit(1) _ = json.NewEncoder(buf).Encode(rels)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET")
w.Write(buf.Bytes())
}
type proxy struct {
url string
}
func (p *proxy) ServeHTTP(w http.ResponseWriter, req *http.Request) {
log.Println("Fetching", p.url)
req, err := http.NewRequestWithContext(req.Context(), http.MethodGet, p.url, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer resp.Body.Close()
ct := resp.Header.Get("Content-Type")
w.Header().Set("Content-Type", ct)
if resp.StatusCode == http.StatusOK {
w.Header().Set("Cache-Control", "public, max-age=900")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET")
}
w.WriteHeader(resp.StatusCode)
if strings.HasPrefix(ct, "application/json") {
// Special JSON handling; clean it up a bit.
var v interface{}
if err := json.NewDecoder(resp.Body).Decode(&v); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_ = json.NewEncoder(w).Encode(v)
} else {
_, _ = io.Copy(w, resp.Body)
} }
} }

124
lib/httpcache/httpcache.go Normal file
View File

@ -0,0 +1,124 @@
package httpcache
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"
)
type SinglePathCache struct {
next http.Handler
keep time.Duration
mut sync.RWMutex
resp *recordedResponse
}
func SinglePath(next http.Handler, keep time.Duration) *SinglePathCache {
return &SinglePathCache{
next: next,
keep: keep,
}
}
type recordedResponse struct {
status int
header http.Header
data []byte
gzip []byte
when time.Time
keep time.Duration
}
func (resp *recordedResponse) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for k, v := range resp.header {
w.Header()[k] = v
}
w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d", int(resp.keep.Seconds())))
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Set("Content-Encoding", "gzip")
w.Header().Set("Content-Length", fmt.Sprint(len(resp.gzip)))
w.WriteHeader(resp.status)
_, _ = w.Write(resp.gzip)
return
}
w.Header().Set("Content-Length", fmt.Sprint(len(resp.data)))
w.WriteHeader(resp.status)
_, _ = w.Write(resp.data)
}
type responseRecorder struct {
resp *recordedResponse
}
func (r *responseRecorder) WriteHeader(status int) {
r.resp.status = status
}
func (r *responseRecorder) Write(data []byte) (int, error) {
r.resp.data = append(r.resp.data, data...)
return len(data), nil
}
func (r *responseRecorder) Header() http.Header {
return r.resp.header
}
func (s *SinglePathCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
s.next.ServeHTTP(w, r)
return
}
w.Header().Set("X-Cache", "MISS")
s.mut.RLock()
ok := s.serveCached(w, r)
s.mut.RUnlock()
if ok {
return
}
s.mut.Lock()
defer s.mut.Unlock()
if s.serveCached(w, r) {
return
}
rec := &recordedResponse{status: http.StatusOK, header: make(http.Header), when: time.Now(), keep: s.keep}
childRec := r.Clone(context.Background())
childRec.Header.Del("Accept-Encoding") // don't let the client dictate the encoding
s.next.ServeHTTP(&responseRecorder{resp: rec}, childRec)
if rec.status == http.StatusOK {
buf := new(bytes.Buffer)
gw := gzip.NewWriter(buf)
_, _ = gw.Write(rec.data)
gw.Close()
rec.gzip = buf.Bytes()
s.resp = rec
}
rec.ServeHTTP(w, r)
}
func (s *SinglePathCache) serveCached(w http.ResponseWriter, r *http.Request) bool {
if s.resp == nil || time.Since(s.resp.when) > s.keep {
return false
}
w.Header().Set("X-Cache", "HIT")
w.Header().Set("X-Cache-From", s.resp.when.Format(time.RFC3339))
s.resp.ServeHTTP(w, r)
return true
}

View File

@ -0,0 +1,10 @@
#!/bin/sh
set -eu
if [ "$MAXMIND_KEY" != "" ] ; then
curl "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City&license_key=${MAXMIND_KEY}&suffix=tar.gz" \
| tar --strip-components 1 -zxv
fi
exec "$@"