From a1f32095dfeea0188c4fb5d3708ca8b0097cdae7 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 1 Apr 2014 20:36:54 +0200 Subject: [PATCH] Rate limit sent data, not uncompressed --- cmd/syncthing/limitedwriter.go | 19 +++++++++++++++++++ cmd/syncthing/main.go | 26 +++++++++++++++++--------- cmd/syncthing/model.go | 11 ----------- 3 files changed, 36 insertions(+), 20 deletions(-) create mode 100644 cmd/syncthing/limitedwriter.go diff --git a/cmd/syncthing/limitedwriter.go b/cmd/syncthing/limitedwriter.go new file mode 100644 index 000000000..1734609eb --- /dev/null +++ b/cmd/syncthing/limitedwriter.go @@ -0,0 +1,19 @@ +package main + +import ( + "io" + + "github.com/juju/ratelimit" +) + +type limitedWriter struct { + w io.Writer + bucket *ratelimit.Bucket +} + +func (w *limitedWriter) Write(buf []byte) (int, error) { + if w.bucket != nil { + w.bucket.Wait(int64(len(buf))) + } + return w.w.Write(buf) +} diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index e2d1fbddb..da152295c 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "flag" "fmt" + "io" "log" "net" "net/http" @@ -19,6 +20,7 @@ import ( "github.com/calmh/ini" "github.com/calmh/syncthing/discover" "github.com/calmh/syncthing/protocol" + "github.com/juju/ratelimit" ) const BlockSize = 128 * 1024 @@ -27,12 +29,9 @@ var cfg Configuration var Version = "unknown-dev" var ( - myID string -) - -var ( - showVersion bool - confDir string + myID string + confDir string + rateBucket *ratelimit.Bucket ) const ( @@ -60,6 +59,7 @@ const ( ) func main() { + var showVersion bool flag.StringVar(&confDir, "home", getDefaultConfDir(), "Set configuration directory") flag.BoolVar(&showVersion, "version", false, "Show version") flag.Usage = usageFor(flag.CommandLine, usage, extraUsage) @@ -186,11 +186,15 @@ func main() { MinVersion: tls.VersionTLS12, } - m := NewModel(cfg.Options.MaxChangeKbps * 1000) + // If the write rate should be limited, set up a rate limiter for it. + // This will be used on connections created in the connect and listen routines. + if cfg.Options.MaxSendKbps > 0 { - m.LimitRate(cfg.Options.MaxSendKbps) + rateBucket = ratelimit.NewBucketWithRate(float64(1000*cfg.Options.MaxSendKbps), int64(5*1000*cfg.Options.MaxSendKbps)) } + m := NewModel(cfg.Options.MaxChangeKbps * 1000) + for i := range cfg.Repositories { cfg.Repositories[i].Nodes = cleanNodeList(cfg.Repositories[i].Nodes, myID) dir := expandTilde(cfg.Repositories[i].Directory) @@ -415,7 +419,11 @@ next: for _, nodeCfg := range cfg.Repositories[0].Nodes { if nodeCfg.NodeID == remoteID { - protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts) + var wr io.Writer = conn + if rateBucket != nil { + wr = &limitedWriter{conn, rateBucket} + } + protoConn := protocol.NewConnection(remoteID, conn, wr, m, connOpts) m.AddConnection(conn, protoConn) continue next } diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go index 3fc3096c2..9f23f7277 100644 --- a/cmd/syncthing/model.go +++ b/cmd/syncthing/model.go @@ -18,7 +18,6 @@ import ( "github.com/calmh/syncthing/lamport" "github.com/calmh/syncthing/protocol" "github.com/calmh/syncthing/scanner" - "github.com/juju/ratelimit" ) type Model struct { @@ -36,8 +35,6 @@ type Model struct { sup suppressor - limitRequestRate *ratelimit.Bucket - addedRepo bool started bool } @@ -66,10 +63,6 @@ func NewModel(maxChangeBw int) *Model { return m } -func (m *Model) LimitRate(kbps int) { - m.limitRequestRate = ratelimit.NewBucketWithRate(float64(kbps), int64(5*kbps)) -} - // StartRW starts read/write processing on the current model. When in // read/write mode the model will attempt to keep in sync with the cluster by // pulling needed files from peer nodes. @@ -351,10 +344,6 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by return nil, err } - if m.limitRequestRate != nil { - m.limitRequestRate.Wait(int64(size / 1024)) - } - return buf, nil }