mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-08 22:31:04 +00:00
This commit is contained in:
parent
3c920c61e9
commit
2621c6fd2f
@ -797,3 +797,12 @@ func filterURLSchemePrefix(addrs []string, prefix string) []string {
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
// mapDeviceConfigs returns a map of device ID to device configuration for the given configuration.
|
||||
func (cfg *Configuration) DeviceMap() map[protocol.DeviceID]DeviceConfiguration {
|
||||
m := make(map[protocol.DeviceID]DeviceConfiguration, len(cfg.Devices))
|
||||
for _, dev := range cfg.Devices {
|
||||
m[dev.DeviceID] = dev
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
@ -20,6 +20,8 @@ type DeviceConfiguration struct {
|
||||
Paused bool `xml:"paused" json:"paused"`
|
||||
AllowedNetworks []string `xml:"allowedNetwork,omitempty" json:"allowedNetworks"`
|
||||
AutoAcceptFolders bool `xml:"autoAcceptFolders" json:"autoAcceptFolders"`
|
||||
MaxSendKbps int `xml:"maxSendKbps" json:"maxSendKbps"`
|
||||
MaxRecvKbps int `xml:"maxRecvKbps" json:"maxRecvKbps"`
|
||||
}
|
||||
|
||||
func NewDeviceConfiguration(id protocol.DeviceID, name string) DeviceConfiguration {
|
||||
|
@ -12,6 +12,8 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
@ -19,30 +21,94 @@ import (
|
||||
// limiter manages a read and write rate limit, reacting to config changes
|
||||
// as appropriate.
|
||||
type limiter struct {
|
||||
write *rate.Limiter
|
||||
read *rate.Limiter
|
||||
limitsLAN atomicBool
|
||||
write *rate.Limiter
|
||||
read *rate.Limiter
|
||||
limitsLAN atomicBool
|
||||
deviceReadLimiters map[protocol.DeviceID]*rate.Limiter
|
||||
deviceWriteLimiters map[protocol.DeviceID]*rate.Limiter
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
const limiterBurstSize = 4 * 128 << 10
|
||||
|
||||
func newLimiter(cfg *config.Wrapper) *limiter {
|
||||
l := &limiter{
|
||||
write: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
read: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
write: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
read: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
mu: sync.NewMutex(),
|
||||
deviceReadLimiters: make(map[protocol.DeviceID]*rate.Limiter),
|
||||
deviceWriteLimiters: make(map[protocol.DeviceID]*rate.Limiter),
|
||||
}
|
||||
|
||||
cfg.Subscribe(l)
|
||||
prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}}
|
||||
|
||||
l.CommitConfiguration(prev, cfg.RawCopy())
|
||||
return l
|
||||
}
|
||||
|
||||
func (lim *limiter) newReadLimiter(r io.Reader, isLAN bool) io.Reader {
|
||||
return &limitedReader{reader: r, limiter: lim, isLAN: isLAN}
|
||||
// This function sets limiters according to corresponding DeviceConfiguration
|
||||
func (lim *limiter) setLimitsLocked(device config.DeviceConfiguration) bool {
|
||||
readLimiter := lim.getReadLimiterLocked(device.DeviceID)
|
||||
writeLimiter := lim.getWriteLimiterLocked(device.DeviceID)
|
||||
|
||||
// limiters for this device are created so we can store previous rates for logging
|
||||
previousReadLimit := readLimiter.Limit()
|
||||
previousWriteLimit := writeLimiter.Limit()
|
||||
currentReadLimit := rate.Limit(device.MaxRecvKbps) * 1024
|
||||
currentWriteLimit := rate.Limit(device.MaxSendKbps) * 1024
|
||||
if device.MaxSendKbps <= 0 {
|
||||
currentWriteLimit = rate.Inf
|
||||
}
|
||||
if device.MaxRecvKbps <= 0 {
|
||||
currentReadLimit = rate.Inf
|
||||
}
|
||||
// Nothing about this device has changed. Start processing next device
|
||||
if previousWriteLimit == currentWriteLimit && previousReadLimit == currentReadLimit {
|
||||
return false
|
||||
}
|
||||
|
||||
readLimiter.SetLimit(currentReadLimit)
|
||||
writeLimiter.SetLimit(currentWriteLimit)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (lim *limiter) newWriteLimiter(w io.Writer, isLAN bool) io.Writer {
|
||||
return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN}
|
||||
// This function handles removing, adding and updating of device limiters.
|
||||
func (lim *limiter) processDevicesConfigurationLocked(from, to config.Configuration) {
|
||||
seen := make(map[protocol.DeviceID]struct{})
|
||||
|
||||
// Mark devices which should not be removed, create new limiters if needed and assign new limiter rate
|
||||
for _, dev := range to.Devices {
|
||||
if dev.DeviceID == to.MyID {
|
||||
// This limiter was created for local device. Should skip this device
|
||||
continue
|
||||
}
|
||||
seen[dev.DeviceID] = struct{}{}
|
||||
|
||||
if lim.setLimitsLocked(dev) {
|
||||
readLimitStr := "is unlimited"
|
||||
if dev.MaxRecvKbps > 0 {
|
||||
readLimitStr = fmt.Sprintf("limit is %d KiB/s", dev.MaxRecvKbps)
|
||||
}
|
||||
writeLimitStr := "is unlimited"
|
||||
if dev.MaxSendKbps > 0 {
|
||||
writeLimitStr = fmt.Sprintf("limit is %d KiB/s", dev.MaxSendKbps)
|
||||
}
|
||||
|
||||
l.Infof("Device %s send rate %s, receive rate %s", dev.DeviceID, readLimitStr, writeLimitStr)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete remote devices which were removed in new configuration
|
||||
for _, dev := range from.Devices {
|
||||
if _, ok := seen[dev.DeviceID]; !ok {
|
||||
l.Debugf("deviceID: %s should be removed", dev.DeviceID)
|
||||
|
||||
delete(lim.deviceWriteLimiters, dev.DeviceID)
|
||||
delete(lim.deviceReadLimiters, dev.DeviceID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error {
|
||||
@ -50,6 +116,13 @@ func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error {
|
||||
}
|
||||
|
||||
func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
|
||||
// to ensure atomic update of configuration
|
||||
lim.mu.Lock()
|
||||
defer lim.mu.Unlock()
|
||||
|
||||
// Delete, add or update limiters for devices
|
||||
lim.processDevicesConfigurationLocked(from, to)
|
||||
|
||||
if from.Options.MaxRecvKbps == to.Options.MaxRecvKbps &&
|
||||
from.Options.MaxSendKbps == to.Options.MaxSendKbps &&
|
||||
from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan {
|
||||
@ -58,7 +131,6 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
|
||||
|
||||
// The rate variables are in KiB/s in the config (despite the camel casing
|
||||
// of the name). We multiply by 1024 to get bytes/s.
|
||||
|
||||
if to.Options.MaxRecvKbps <= 0 {
|
||||
lim.read.SetLimit(rate.Inf)
|
||||
} else {
|
||||
@ -81,7 +153,7 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
|
||||
if to.Options.MaxRecvKbps > 0 {
|
||||
recvLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxRecvKbps)
|
||||
}
|
||||
l.Infof("Send rate %s, receive rate %s", sendLimitStr, recvLimitStr)
|
||||
l.Infof("Overall send rate %s, receive rate %s", sendLimitStr, recvLimitStr)
|
||||
|
||||
if to.Options.LimitBandwidthInLan {
|
||||
l.Infoln("Rate limits apply to LAN connections")
|
||||
@ -97,53 +169,74 @@ func (lim *limiter) String() string {
|
||||
return "connections.limiter"
|
||||
}
|
||||
|
||||
func (lim *limiter) getLimiters(remoteID protocol.DeviceID, c internalConn, isLAN bool) (io.Reader, io.Writer) {
|
||||
lim.mu.Lock()
|
||||
wr := lim.newLimitedWriterLocked(remoteID, c, isLAN)
|
||||
rd := lim.newLimitedReaderLocked(remoteID, c, isLAN)
|
||||
lim.mu.Unlock()
|
||||
return rd, wr
|
||||
}
|
||||
|
||||
func (lim *limiter) newLimitedReaderLocked(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader {
|
||||
return &limitedReader{reader: r, limiter: lim, deviceLimiter: lim.getReadLimiterLocked(remoteID), isLAN: isLAN}
|
||||
}
|
||||
|
||||
func (lim *limiter) newLimitedWriterLocked(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer {
|
||||
return &limitedWriter{writer: w, limiter: lim, deviceLimiter: lim.getWriteLimiterLocked(remoteID), isLAN: isLAN}
|
||||
}
|
||||
|
||||
// limitedReader is a rate limited io.Reader
|
||||
type limitedReader struct {
|
||||
reader io.Reader
|
||||
limiter *limiter
|
||||
isLAN bool
|
||||
reader io.Reader
|
||||
limiter *limiter
|
||||
deviceLimiter *rate.Limiter
|
||||
isLAN bool
|
||||
}
|
||||
|
||||
func (r *limitedReader) Read(buf []byte) (int, error) {
|
||||
n, err := r.reader.Read(buf)
|
||||
if !r.isLAN || r.limiter.limitsLAN.get() {
|
||||
take(r.limiter.read, n)
|
||||
take(r.limiter.read, r.deviceLimiter, n)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// limitedWriter is a rate limited io.Writer
|
||||
type limitedWriter struct {
|
||||
writer io.Writer
|
||||
limiter *limiter
|
||||
isLAN bool
|
||||
writer io.Writer
|
||||
limiter *limiter
|
||||
deviceLimiter *rate.Limiter
|
||||
isLAN bool
|
||||
}
|
||||
|
||||
func (w *limitedWriter) Write(buf []byte) (int, error) {
|
||||
if !w.isLAN || w.limiter.limitsLAN.get() {
|
||||
take(w.limiter.write, len(buf))
|
||||
take(w.limiter.write, w.deviceLimiter, len(buf))
|
||||
}
|
||||
return w.writer.Write(buf)
|
||||
}
|
||||
|
||||
// take is a utility function to consume tokens from a rate.Limiter. No call
|
||||
// to WaitN can be larger than the limiter burst size so we split it up into
|
||||
// take is a utility function to consume tokens from a overall rate.Limiter and deviceLimiter.
|
||||
// No call to WaitN can be larger than the limiter burst size so we split it up into
|
||||
// several calls when necessary.
|
||||
func take(l *rate.Limiter, tokens int) {
|
||||
func take(overallLimiter, deviceLimiter *rate.Limiter, tokens int) {
|
||||
if tokens < limiterBurstSize {
|
||||
// This is the by far more common case so we get it out of the way
|
||||
// early.
|
||||
l.WaitN(context.TODO(), tokens)
|
||||
deviceLimiter.WaitN(context.TODO(), tokens)
|
||||
overallLimiter.WaitN(context.TODO(), tokens)
|
||||
return
|
||||
}
|
||||
|
||||
for tokens > 0 {
|
||||
// Consume limiterBurstSize tokens at a time until we're done.
|
||||
if tokens > limiterBurstSize {
|
||||
l.WaitN(context.TODO(), limiterBurstSize)
|
||||
deviceLimiter.WaitN(context.TODO(), limiterBurstSize)
|
||||
overallLimiter.WaitN(context.TODO(), limiterBurstSize)
|
||||
tokens -= limiterBurstSize
|
||||
} else {
|
||||
l.WaitN(context.TODO(), tokens)
|
||||
deviceLimiter.WaitN(context.TODO(), tokens)
|
||||
overallLimiter.WaitN(context.TODO(), tokens)
|
||||
tokens = 0
|
||||
}
|
||||
}
|
||||
@ -162,3 +255,26 @@ func (b *atomicBool) set(v bool) {
|
||||
func (b *atomicBool) get() bool {
|
||||
return atomic.LoadInt32((*int32)(b)) != 0
|
||||
}
|
||||
|
||||
// Utility functions for atomic operations on device limiters map
|
||||
func (lim *limiter) getWriteLimiterLocked(deviceID protocol.DeviceID) *rate.Limiter {
|
||||
limiter, ok := lim.deviceWriteLimiters[deviceID]
|
||||
|
||||
if !ok {
|
||||
limiter = rate.NewLimiter(rate.Inf, limiterBurstSize)
|
||||
lim.deviceWriteLimiters[deviceID] = limiter
|
||||
}
|
||||
|
||||
return limiter
|
||||
}
|
||||
|
||||
func (lim *limiter) getReadLimiterLocked(deviceID protocol.DeviceID) *rate.Limiter {
|
||||
limiter, ok := lim.deviceReadLimiters[deviceID]
|
||||
|
||||
if !ok {
|
||||
limiter = rate.NewLimiter(rate.Inf, limiterBurstSize)
|
||||
lim.deviceReadLimiters[deviceID] = limiter
|
||||
}
|
||||
|
||||
return limiter
|
||||
}
|
||||
|
205
lib/connections/limiter_test.go
Normal file
205
lib/connections/limiter_test.go
Normal file
@ -0,0 +1,205 @@
|
||||
// Copyright (C) 2017 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package connections
|
||||
|
||||
import (
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"golang.org/x/time/rate"
|
||||
"math/rand"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var device1, device2, device3, device4 protocol.DeviceID
|
||||
var dev1Conf, dev2Conf, dev3Conf, dev4Conf config.DeviceConfiguration
|
||||
|
||||
func init() {
|
||||
device1, _ = protocol.DeviceIDFromString("AIR6LPZ7K4PTTUXQSMUUCPQ5YWOEDFIIQJUG7772YQXXR5YD6AWQ")
|
||||
device2, _ = protocol.DeviceIDFromString("GYRZZQB-IRNPV4Z-T7TC52W-EQYJ3TT-FDQW6MW-DFLMU42-SSSU6EM-FBK2VAY")
|
||||
device3, _ = protocol.DeviceIDFromString("LGFPDIT-7SKNNJL-VJZA4FC-7QNCRKA-CE753K7-2BW5QDK-2FOZ7FR-FEP57QJ")
|
||||
device4, _ = protocol.DeviceIDFromString("P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2")
|
||||
}
|
||||
|
||||
func initConfig() *config.Wrapper {
|
||||
cfg := config.Wrap("/dev/null", config.New(device1))
|
||||
dev1Conf = config.NewDeviceConfiguration(device1, "device1")
|
||||
dev2Conf = config.NewDeviceConfiguration(device2, "device2")
|
||||
dev3Conf = config.NewDeviceConfiguration(device3, "device3")
|
||||
dev4Conf = config.NewDeviceConfiguration(device4, "device4")
|
||||
|
||||
dev2Conf.MaxRecvKbps = rand.Int() % 100000
|
||||
dev2Conf.MaxSendKbps = rand.Int() % 100000
|
||||
|
||||
waiter, _ := cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf})
|
||||
waiter.Wait()
|
||||
return cfg
|
||||
}
|
||||
|
||||
func TestLimiterInit(t *testing.T) {
|
||||
cfg := initConfig()
|
||||
lim := newLimiter(cfg)
|
||||
|
||||
device2ReadLimit := dev2Conf.MaxRecvKbps
|
||||
device2WriteLimit := dev2Conf.MaxSendKbps
|
||||
|
||||
expectedR := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(device2ReadLimit*1024), limiterBurstSize),
|
||||
device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
}
|
||||
|
||||
expectedW := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(device2WriteLimit*1024), limiterBurstSize),
|
||||
device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
}
|
||||
|
||||
actualR := lim.deviceReadLimiters
|
||||
actualW := lim.deviceWriteLimiters
|
||||
|
||||
checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
|
||||
}
|
||||
|
||||
func TestSetDeviceLimits(t *testing.T) {
|
||||
cfg := initConfig()
|
||||
lim := newLimiter(cfg)
|
||||
|
||||
// should still be inf/inf because this is local device
|
||||
dev1ReadLimit := rand.Int() % 100000
|
||||
dev1WriteLimit := rand.Int() % 100000
|
||||
dev1Conf.MaxRecvKbps = dev1ReadLimit
|
||||
dev1Conf.MaxSendKbps = dev1WriteLimit
|
||||
|
||||
dev2ReadLimit := rand.Int() % 100000
|
||||
dev2WriteLimit := rand.Int() % 100000
|
||||
dev2Conf.MaxRecvKbps = dev2ReadLimit
|
||||
dev2Conf.MaxSendKbps = dev2WriteLimit
|
||||
|
||||
dev3ReadLimit := rand.Int() % 10000
|
||||
dev3Conf.MaxRecvKbps = dev3ReadLimit
|
||||
|
||||
waiter, _ := cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf})
|
||||
waiter.Wait()
|
||||
|
||||
expectedR := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(dev2ReadLimit*1024), limiterBurstSize),
|
||||
device3: rate.NewLimiter(rate.Limit(dev3ReadLimit*1024), limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
}
|
||||
expectedW := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(dev2WriteLimit*1024), limiterBurstSize),
|
||||
device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
}
|
||||
|
||||
actualR := lim.deviceReadLimiters
|
||||
actualW := lim.deviceWriteLimiters
|
||||
|
||||
checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
|
||||
}
|
||||
|
||||
func TestRemoveDevice(t *testing.T) {
|
||||
cfg := initConfig()
|
||||
lim := newLimiter(cfg)
|
||||
|
||||
waiter, _ := cfg.RemoveDevice(device3)
|
||||
waiter.Wait()
|
||||
expectedR := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
}
|
||||
expectedW := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
}
|
||||
actualR := lim.deviceReadLimiters
|
||||
actualW := lim.deviceWriteLimiters
|
||||
|
||||
checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
|
||||
}
|
||||
|
||||
func TestAddDevice(t *testing.T) {
|
||||
cfg := initConfig()
|
||||
lim := newLimiter(cfg)
|
||||
|
||||
addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU")
|
||||
addDevConf := config.NewDeviceConfiguration(addedDevice, "addedDevice")
|
||||
addDevConf.MaxRecvKbps = 120
|
||||
addDevConf.MaxSendKbps = 240
|
||||
|
||||
waiter, _ := cfg.SetDevice(addDevConf)
|
||||
waiter.Wait()
|
||||
|
||||
expectedR := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
|
||||
device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize),
|
||||
}
|
||||
|
||||
expectedW := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
|
||||
device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize),
|
||||
}
|
||||
actualR := lim.deviceReadLimiters
|
||||
actualW := lim.deviceWriteLimiters
|
||||
|
||||
checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
|
||||
}
|
||||
|
||||
func TestAddAndRemove(t *testing.T) {
|
||||
cfg := initConfig()
|
||||
lim := newLimiter(cfg)
|
||||
|
||||
addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU")
|
||||
addDevConf := config.NewDeviceConfiguration(addedDevice, "addedDevice")
|
||||
addDevConf.MaxRecvKbps = 120
|
||||
addDevConf.MaxSendKbps = 240
|
||||
|
||||
waiter, _ := cfg.SetDevice(addDevConf)
|
||||
waiter.Wait()
|
||||
waiter, _ = cfg.RemoveDevice(device3)
|
||||
waiter.Wait()
|
||||
|
||||
expectedR := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize),
|
||||
}
|
||||
|
||||
expectedW := map[protocol.DeviceID]*rate.Limiter{
|
||||
device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
|
||||
device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
|
||||
addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize),
|
||||
}
|
||||
actualR := lim.deviceReadLimiters
|
||||
actualW := lim.deviceWriteLimiters
|
||||
|
||||
checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
|
||||
}
|
||||
|
||||
func checkActualAndExpected(t *testing.T, actualR, actualW, expectedR, expectedW map[protocol.DeviceID]*rate.Limiter) {
|
||||
t.Helper()
|
||||
if len(expectedW) != len(actualW) || len(expectedR) != len(actualR) {
|
||||
t.Errorf("Map lengths differ!")
|
||||
}
|
||||
|
||||
for key, val := range expectedR {
|
||||
if _, ok := actualR[key]; !ok {
|
||||
t.Errorf("Device %s not found in limiter", key)
|
||||
}
|
||||
|
||||
if val.Limit() != actualR[key].Limit() {
|
||||
t.Errorf("Read limits for device %s differ actual: %f, expected: %f", key, actualR[key].Limit(), val.Limit())
|
||||
}
|
||||
if expectedW[key].Limit() != actualW[key].Limit() {
|
||||
t.Errorf("Write limits for device %s differ actual: %f, expected: %f", key, actualW[key].Limit(), expectedW[key].Limit())
|
||||
}
|
||||
}
|
||||
}
|
@ -266,8 +266,7 @@ next:
|
||||
// keep up with config changes to the rate and whether or not LAN
|
||||
// connections are limited.
|
||||
isLAN := s.isLAN(c.RemoteAddr())
|
||||
wr := s.limiter.newWriteLimiter(c, isLAN)
|
||||
rd := s.limiter.newReadLimiter(c, isLAN)
|
||||
rd, wr := s.limiter.getLimiters(remoteID, c, isLAN)
|
||||
|
||||
protoConn := protocol.NewConnection(remoteID, rd, wr, s.model, c.String(), deviceCfg.Compression)
|
||||
modelConn := completeConn{c, protoConn}
|
||||
|
@ -2625,8 +2625,8 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool {
|
||||
// clean residue device state that is not part of any folder.
|
||||
|
||||
// Pausing a device, unpausing is handled by the connection service.
|
||||
fromDevices := mapDeviceConfigs(from.Devices)
|
||||
toDevices := mapDeviceConfigs(to.Devices)
|
||||
fromDevices := from.DeviceMap()
|
||||
toDevices := to.DeviceMap()
|
||||
for deviceID, toCfg := range toDevices {
|
||||
fromCfg, ok := fromDevices[deviceID]
|
||||
if !ok || fromCfg.Paused == toCfg.Paused {
|
||||
@ -2715,16 +2715,6 @@ func mapDevices(devices []protocol.DeviceID) map[protocol.DeviceID]struct{} {
|
||||
return m
|
||||
}
|
||||
|
||||
// mapDeviceConfigs returns a map of device ID to device configuration for the given
|
||||
// slice of folder configurations.
|
||||
func mapDeviceConfigs(devices []config.DeviceConfiguration) map[protocol.DeviceID]config.DeviceConfiguration {
|
||||
m := make(map[protocol.DeviceID]config.DeviceConfiguration, len(devices))
|
||||
for _, dev := range devices {
|
||||
m[dev.DeviceID] = dev
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// Skips `skip` elements and retrieves up to `get` elements from a given slice.
|
||||
// Returns the resulting slice, plus how much elements are left to skip or
|
||||
// copy to satisfy the values which were provided, given the slice is not
|
||||
|
Loading…
Reference in New Issue
Block a user