lib/connections: Improve write rate limiting (fixes #5138) (#5996)

This splits large writes into smaller ones when using a rate limit,
making them into a legitimate trickle rather than large bursts with a
long time in between.
This commit is contained in:
Jakob Borg 2019-09-04 11:12:17 +01:00 committed by GitHub
parent 80894948f6
commit 0104e78589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 244 additions and 33 deletions

View File

@ -32,9 +32,13 @@ type limiter struct {
type waiter interface { type waiter interface {
// This is the rate limiting operation // This is the rate limiting operation
WaitN(ctx context.Context, n int) error WaitN(ctx context.Context, n int) error
Limit() rate.Limit
} }
const limiterBurstSize = 4 * 128 << 10 const (
limiterBurstSize = 4 * 128 << 10
maxSingleWriteSize = 8 << 10
)
func newLimiter(cfg config.Wrapper) *limiter { func newLimiter(cfg config.Wrapper) *limiter {
l := &limiter{ l := &limiter{
@ -186,19 +190,23 @@ func (lim *limiter) getLimiters(remoteID protocol.DeviceID, rw io.ReadWriter, is
func (lim *limiter) newLimitedReaderLocked(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader { func (lim *limiter) newLimitedReaderLocked(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader {
return &limitedReader{ return &limitedReader{
reader: r, reader: r,
limitsLAN: &lim.limitsLAN, waiterHolder: waiterHolder{
waiter: totalWaiter{lim.getReadLimiterLocked(remoteID), lim.read}, waiter: totalWaiter{lim.getReadLimiterLocked(remoteID), lim.read},
isLAN: isLAN, limitsLAN: &lim.limitsLAN,
isLAN: isLAN,
},
} }
} }
func (lim *limiter) newLimitedWriterLocked(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer { func (lim *limiter) newLimitedWriterLocked(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer {
return &limitedWriter{ return &limitedWriter{
writer: w, writer: w,
limitsLAN: &lim.limitsLAN, waiterHolder: waiterHolder{
waiter: totalWaiter{lim.getWriteLimiterLocked(remoteID), lim.write}, waiter: totalWaiter{lim.getWriteLimiterLocked(remoteID), lim.write},
isLAN: isLAN, limitsLAN: &lim.limitsLAN,
isLAN: isLAN,
},
} }
} }
@ -221,53 +229,87 @@ func getRateLimiter(m map[protocol.DeviceID]*rate.Limiter, deviceID protocol.Dev
// limitedReader is a rate limited io.Reader // limitedReader is a rate limited io.Reader
type limitedReader struct { type limitedReader struct {
reader io.Reader reader io.Reader
limitsLAN *atomicBool waiterHolder
waiter waiter
isLAN bool
} }
func (r *limitedReader) Read(buf []byte) (int, error) { func (r *limitedReader) Read(buf []byte) (int, error) {
n, err := r.reader.Read(buf) n, err := r.reader.Read(buf)
if !r.isLAN || r.limitsLAN.get() { if !r.unlimited() {
take(r.waiter, n) r.take(n)
} }
return n, err return n, err
} }
// limitedWriter is a rate limited io.Writer // limitedWriter is a rate limited io.Writer
type limitedWriter struct { type limitedWriter struct {
writer io.Writer writer io.Writer
limitsLAN *atomicBool waiterHolder
waiter waiter
isLAN bool
} }
func (w *limitedWriter) Write(buf []byte) (int, error) { func (w *limitedWriter) Write(buf []byte) (int, error) {
if !w.isLAN || w.limitsLAN.get() { if w.unlimited() {
take(w.waiter, len(buf)) return w.writer.Write(buf)
} }
return w.writer.Write(buf)
// This does (potentially) multiple smaller writes in order to be less
// bursty with large writes and slow rates.
written := 0
for written < len(buf) {
toWrite := maxSingleWriteSize
if toWrite > len(buf)-written {
toWrite = len(buf) - written
}
w.take(toWrite)
n, err := w.writer.Write(buf[written : written+toWrite])
written += n
if err != nil {
return written, err
}
}
return written, nil
} }
// take is a utility function to consume tokens from a overall rate.Limiter and deviceLimiter. // waiterHolder is the common functionality around having and evaluating a
// No call to WaitN can be larger than the limiter burst size so we split it up into // waiter, valid for both writers and readers
// several calls when necessary. type waiterHolder struct {
func take(waiter waiter, tokens int) { waiter waiter
limitsLAN *atomicBool
isLAN bool
}
// unlimited returns true if the waiter is not limiting the rate
func (w waiterHolder) unlimited() bool {
if w.isLAN && !w.limitsLAN.get() {
return true
}
return w.waiter.Limit() == rate.Inf
}
// take is a utility function to consume tokens, because no call to WaitN
// must be larger than the limiter burst size or it will hang.
func (w waiterHolder) take(tokens int) {
// For writes we already split the buffer into smaller operations so those
// will always end up in the fast path below. For reads, however, we don't
// control the size of the incoming buffer and don't split the calls
// into the lower level reads so we might get a large amount of data and
// end up in the loop further down.
if tokens < limiterBurstSize { if tokens < limiterBurstSize {
// This is the by far more common case so we get it out of the way // Fast path. We won't get an error from WaitN as we don't pass a
// early. // context with a deadline.
waiter.WaitN(context.TODO(), tokens) _ = w.waiter.WaitN(context.TODO(), tokens)
return return
} }
for tokens > 0 { for tokens > 0 {
// Consume limiterBurstSize tokens at a time until we're done. // Consume limiterBurstSize tokens at a time until we're done.
if tokens > limiterBurstSize { if tokens > limiterBurstSize {
waiter.WaitN(context.TODO(), limiterBurstSize) _ = w.waiter.WaitN(context.TODO(), limiterBurstSize)
tokens -= limiterBurstSize tokens -= limiterBurstSize
} else { } else {
waiter.WaitN(context.TODO(), tokens) _ = w.waiter.WaitN(context.TODO(), tokens)
tokens = 0 tokens = 0
} }
} }
@ -300,3 +342,13 @@ func (tw totalWaiter) WaitN(ctx context.Context, n int) error {
} }
return nil return nil
} }
func (tw totalWaiter) Limit() rate.Limit {
min := rate.Inf
for _, w := range tw {
if l := w.Limit(); l < min {
min = l
}
}
return min
}

View File

@ -7,12 +7,16 @@
package connections package connections
import ( import (
"bytes"
crand "crypto/rand"
"io"
"math/rand"
"testing"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"math/rand"
"testing"
) )
var device1, device2, device3, device4 protocol.DeviceID var device1, device2, device3, device4 protocol.DeviceID
@ -185,6 +189,151 @@ func TestAddAndRemove(t *testing.T) {
checkActualAndExpected(t, actualR, actualW, expectedR, expectedW) checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
} }
func TestLimitedWriterWrite(t *testing.T) {
// Check that the limited writer writes the correct data in the correct manner.
// A buffer with random data that is larger than the write size and not
// a precise multiple either.
src := make([]byte, int(12.5*maxSingleWriteSize))
if _, err := crand.Reader.Read(src); err != nil {
t.Fatal(err)
}
// Write it to the destination using a limited writer, with a wrapper to
// count the write calls. The defaults on the limited writer should mean
// it is used (and doesn't take the fast path). In practice the limiter
// won't delay the test as the burst size is large enough to accommodate
// regardless of the rate.
dst := new(bytes.Buffer)
cw := &countingWriter{w: dst}
lw := &limitedWriter{
writer: cw,
waiterHolder: waiterHolder{
waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
limitsLAN: new(atomicBool),
isLAN: false, // enables limiting
},
}
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
// Verify there were lots of writes and that the end result is identical.
if cw.writeCount != 13 {
t.Error("expected lots of smaller writes, but not too many")
}
if !bytes.Equal(src, dst.Bytes()) {
t.Error("results should be equal")
}
// Write it to the destination using a limited writer, with a wrapper to
// count the write calls. Now we make sure the fast path is used.
dst = new(bytes.Buffer)
cw = &countingWriter{w: dst}
lw = &limitedWriter{
writer: cw,
waiterHolder: waiterHolder{
waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
limitsLAN: new(atomicBool),
isLAN: true, // disables limiting
},
}
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
// Verify there were a single write and that the end result is identical.
if cw.writeCount != 1 {
t.Error("expected just the one write")
}
if !bytes.Equal(src, dst.Bytes()) {
t.Error("results should be equal")
}
// Once more, but making sure the fast path is used for an unlimited
// rate, with multiple unlimited raters even (global and per-device).
dst = new(bytes.Buffer)
cw = &countingWriter{w: dst}
lw = &limitedWriter{
writer: cw,
waiterHolder: waiterHolder{
waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)},
limitsLAN: new(atomicBool),
isLAN: false, // enables limiting
},
}
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
// Verify there were a single write and that the end result is identical.
if cw.writeCount != 1 {
t.Error("expected just the one write")
}
if !bytes.Equal(src, dst.Bytes()) {
t.Error("results should be equal")
}
// Once more, but making sure we *don't* take the fast path when there
// is a combo of limited and unlimited writers.
dst = new(bytes.Buffer)
cw = &countingWriter{w: dst}
lw = &limitedWriter{
writer: cw,
waiterHolder: waiterHolder{
waiter: totalWaiter{
rate.NewLimiter(rate.Inf, limiterBurstSize),
rate.NewLimiter(rate.Limit(42), limiterBurstSize),
rate.NewLimiter(rate.Inf, limiterBurstSize),
},
limitsLAN: new(atomicBool),
isLAN: false, // enables limiting
},
}
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
// Verify there were lots of writes and that the end result is identical.
if cw.writeCount != 13 {
t.Error("expected just the one write")
}
if !bytes.Equal(src, dst.Bytes()) {
t.Error("results should be equal")
}
}
func TestTotalWaiterLimit(t *testing.T) {
cases := []struct {
w waiter
r rate.Limit
}{
{
totalWaiter{},
rate.Inf,
},
{
totalWaiter{rate.NewLimiter(rate.Inf, 42)},
rate.Inf,
},
{
totalWaiter{rate.NewLimiter(rate.Inf, 42), rate.NewLimiter(rate.Inf, 42)},
rate.Inf,
},
{
totalWaiter{rate.NewLimiter(rate.Inf, 42), rate.NewLimiter(rate.Limit(12), 42), rate.NewLimiter(rate.Limit(15), 42)},
rate.Limit(12),
},
}
for _, tc := range cases {
l := tc.w.Limit()
if l != tc.r {
t.Error("incorrect limit returned")
}
}
}
func checkActualAndExpected(t *testing.T, actualR, actualW, expectedR, expectedW map[protocol.DeviceID]*rate.Limiter) { func checkActualAndExpected(t *testing.T, actualR, actualW, expectedR, expectedW map[protocol.DeviceID]*rate.Limiter) {
t.Helper() t.Helper()
if len(expectedW) != len(actualW) || len(expectedR) != len(actualR) { if len(expectedW) != len(actualW) || len(expectedR) != len(actualR) {
@ -204,3 +353,13 @@ func checkActualAndExpected(t *testing.T, actualR, actualW, expectedR, expectedW
} }
} }
} }
type countingWriter struct {
w io.Writer
writeCount int
}
func (w *countingWriter) Write(data []byte) (int, error) {
w.writeCount++
return w.w.Write(data)
}