diff --git a/lib/config/config.go b/lib/config/config.go
index 7546b92b7..9988f01f9 100644
--- a/lib/config/config.go
+++ b/lib/config/config.go
@@ -32,7 +32,7 @@ import (
const (
OldestHandledVersion = 10
- CurrentVersion = 26
+ CurrentVersion = 27
MaxRescanIntervalS = 365 * 24 * 60 * 60
)
@@ -312,6 +312,9 @@ func (cfg *Configuration) clean() error {
if cfg.Version == 25 {
convertV25V26(cfg)
}
+ if cfg.Version == 26 {
+ convertV26V27(cfg)
+ }
// Build a list of available devices
existingDevices := make(map[protocol.DeviceID]bool)
@@ -371,6 +374,17 @@ func (cfg *Configuration) clean() error {
return nil
}
+func convertV26V27(cfg *Configuration) {
+ for i := range cfg.Folders {
+ f := &cfg.Folders[i]
+ if f.DeprecatedPullers != 0 {
+ f.PullerMaxPendingKiB = 128 * f.DeprecatedPullers
+ f.DeprecatedPullers = 0
+ }
+ }
+ cfg.Version = 27
+}
+
func convertV25V26(cfg *Configuration) {
// triggers database update
cfg.Version = 26
diff --git a/lib/config/config_test.go b/lib/config/config_test.go
index adb1a3157..ee7913f63 100644
--- a/lib/config/config_test.go
+++ b/lib/config/config_test.go
@@ -107,7 +107,6 @@ func TestDeviceConfig(t *testing.T) {
FSWatcherEnabled: false,
FSWatcherDelayS: 10,
Copiers: 0,
- Pullers: 0,
Hashers: 0,
AutoNormalize: true,
MinDiskFree: Size{1, "%"},
diff --git a/lib/config/folderconfiguration.go b/lib/config/folderconfiguration.go
index 7951a2012..6d8f9ef4b 100644
--- a/lib/config/folderconfiguration.go
+++ b/lib/config/folderconfiguration.go
@@ -40,7 +40,7 @@ type FolderConfiguration struct {
MinDiskFree Size `xml:"minDiskFree" json:"minDiskFree"`
Versioning VersioningConfiguration `xml:"versioning" json:"versioning"`
Copiers int `xml:"copiers" json:"copiers"` // This defines how many files are handled concurrently.
- Pullers int `xml:"pullers" json:"pullers"` // Defines how many blocks are fetched at the same time, possibly between separate copier routines.
+ PullerMaxPendingKiB int `xml:"pullerMaxPendingKiB" json:"pullerMaxPendingKiB"`
Hashers int `xml:"hashers" json:"hashers"` // Less than one sets the value to the number of cores. These are CPU bound due to hashing.
Order PullOrder `xml:"order" json:"order"`
IgnoreDelete bool `xml:"ignoreDelete" json:"ignoreDelete"`
@@ -57,6 +57,7 @@ type FolderConfiguration struct {
DeprecatedReadOnly bool `xml:"ro,attr,omitempty" json:"-"`
DeprecatedMinDiskFreePct float64 `xml:"minDiskFreePct,omitempty" json:"-"`
+ DeprecatedPullers int `xml:"pullers,omitempty" json:"-"`
}
type FolderDeviceConfiguration struct {
diff --git a/lib/config/testdata/v27.xml b/lib/config/testdata/v27.xml
new file mode 100644
index 000000000..c11d5e8bf
--- /dev/null
+++ b/lib/config/testdata/v27.xml
@@ -0,0 +1,16 @@
+
+
+ basic
+
+
+ 1
+ -1
+ true
+
+
+ tcp://a
+
+
+ tcp://b
+
+
diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go
index 9343402cc..9d4292ae7 100644
--- a/lib/model/rwfolder.go
+++ b/lib/model/rwfolder.go
@@ -15,6 +15,7 @@ import (
"runtime"
"sort"
"strings"
+ stdsync "sync"
"time"
"github.com/syncthing/syncthing/lib/config"
@@ -78,9 +79,10 @@ const (
)
const (
- defaultCopiers = 2
- defaultPullers = 64
- defaultPullerPause = 60 * time.Second
+ defaultCopiers = 2
+ defaultPullerPause = 60 * time.Second
+ defaultPullerPendingKiB = 8192 // must be larger than block size
+
maxPullerIterations = 3
)
@@ -114,20 +116,23 @@ func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver vers
errorsMut: sync.NewMutex(),
}
- f.configureCopiersAndPullers()
-
- return f
-}
-
-func (f *sendReceiveFolder) configureCopiersAndPullers() {
if f.Copiers == 0 {
f.Copiers = defaultCopiers
}
- if f.Pullers == 0 {
- f.Pullers = defaultPullers
+
+ // If the configured max amount of pending data is zero, we use the
+ // default. If it's configured to something non-zero but less than the
+ // protocol block size we adjust it upwards accordingly.
+ if f.PullerMaxPendingKiB == 0 {
+ f.PullerMaxPendingKiB = defaultPullerPendingKiB
+ }
+ if blockSizeKiB := protocol.BlockSize / 1024; f.PullerMaxPendingKiB < blockSizeKiB {
+ f.PullerMaxPendingKiB = blockSizeKiB
}
f.pause = f.basePause()
+
+ return f
}
// Serve will run scans and pulls. It will return when Stop()ed or on a
@@ -317,7 +322,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
doneWg := sync.NewWaitGroup()
updateWg := sync.NewWaitGroup()
- l.Debugln(f, "c", f.Copiers, "p", f.Pullers)
+ l.Debugln(f, "copiers:", f.Copiers, "pullerPendingKiB:", f.PullerMaxPendingKiB)
updateWg.Add(1)
go func() {
@@ -335,14 +340,12 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
}()
}
- for i := 0; i < f.Pullers; i++ {
- pullWg.Add(1)
- go func() {
- // pullerRoutine finishes when pullChan is closed
- f.pullerRoutine(pullChan, finisherChan)
- pullWg.Done()
- }()
- }
+ pullWg.Add(1)
+ go func() {
+ // pullerRoutine finishes when pullChan is closed
+ f.pullerRoutine(pullChan, finisherChan)
+ pullWg.Done()
+ }()
doneWg.Add(1)
// finisherRoutine finishes when finisherChan is closed
@@ -1340,76 +1343,99 @@ func verifyBuffer(buf []byte, block protocol.BlockInfo) ([]byte, error) {
}
func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
+ requestLimiter := newByteSemaphore(f.PullerMaxPendingKiB * 1024)
+ wg := sync.NewWaitGroup()
+
for state := range in {
if state.failed() != nil {
out <- state.sharedPullerState
continue
}
- // Get an fd to the temporary file. Technically we don't need it until
- // after fetching the block, but if we run into an error here there is
- // no point in issuing the request to the network.
- fd, err := state.tempFile()
- if err != nil {
- out <- state.sharedPullerState
- continue
- }
+ // The requestLimiter limits how many pending block requests we have
+ // ongoing at any given time, based on the size of the blocks
+ // themselves.
- if !f.DisableSparseFiles && state.reused == 0 && state.block.IsEmpty() {
- // There is no need to request a block of all zeroes. Pretend we
- // requested it and handled it correctly.
- state.pullDone(state.block)
- out <- state.sharedPullerState
- continue
- }
+ state := state
+ bytes := int(state.block.Size)
- var lastError error
- candidates := f.model.Availability(f.folderID, state.file.Name, state.file.Version, state.block)
- for {
- // Select the least busy device to pull the block from. If we found no
- // feasible device at all, fail the block (and in the long run, the
- // file).
- selected, found := activity.leastBusy(candidates)
- if !found {
- if lastError != nil {
- state.fail("pull", lastError)
- } else {
- state.fail("pull", errNoDevice)
- }
- break
- }
+ requestLimiter.take(bytes)
+ wg.Add(1)
- candidates = removeAvailability(candidates, selected)
+ go func() {
+ defer wg.Done()
+ defer requestLimiter.give(bytes)
- // Fetch the block, while marking the selected device as in use so that
- // leastBusy can select another device when someone else asks.
- activity.using(selected)
- buf, lastError := f.model.requestGlobal(selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary)
- activity.done(selected)
+ f.pullBlock(state, out)
+ }()
+ }
+ wg.Wait()
+}
+
+func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPullerState) {
+ // Get an fd to the temporary file. Technically we don't need it until
+ // after fetching the block, but if we run into an error here there is
+ // no point in issuing the request to the network.
+ fd, err := state.tempFile()
+ if err != nil {
+ out <- state.sharedPullerState
+ return
+ }
+
+ if !f.DisableSparseFiles && state.reused == 0 && state.block.IsEmpty() {
+ // There is no need to request a block of all zeroes. Pretend we
+ // requested it and handled it correctly.
+ state.pullDone(state.block)
+ out <- state.sharedPullerState
+ return
+ }
+
+ var lastError error
+ candidates := f.model.Availability(f.folderID, state.file.Name, state.file.Version, state.block)
+ for {
+ // Select the least busy device to pull the block from. If we found no
+ // feasible device at all, fail the block (and in the long run, the
+ // file).
+ selected, found := activity.leastBusy(candidates)
+ if !found {
if lastError != nil {
- l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
- continue
- }
-
- // Verify that the received block matches the desired hash, if not
- // try pulling it from another device.
- _, lastError = verifyBuffer(buf, state.block)
- if lastError != nil {
- l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
- continue
- }
-
- // Save the block data we got from the cluster
- _, err = fd.WriteAt(buf, state.block.Offset)
- if err != nil {
- state.fail("save", err)
+ state.fail("pull", lastError)
} else {
- state.pullDone(state.block)
+ state.fail("pull", errNoDevice)
}
break
}
- out <- state.sharedPullerState
+
+ candidates = removeAvailability(candidates, selected)
+
+ // Fetch the block, while marking the selected device as in use so that
+ // leastBusy can select another device when someone else asks.
+ activity.using(selected)
+ buf, lastError := f.model.requestGlobal(selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary)
+ activity.done(selected)
+ if lastError != nil {
+ l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
+ continue
+ }
+
+ // Verify that the received block matches the desired hash, if not
+ // try pulling it from another device.
+ _, lastError = verifyBuffer(buf, state.block)
+ if lastError != nil {
+ l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
+ continue
+ }
+
+ // Save the block data we got from the cluster
+ _, err = fd.WriteAt(buf, state.block.Offset)
+ if err != nil {
+ state.fail("save", err)
+ } else {
+ state.pullDone(state.block)
+ }
+ break
}
+ out <- state.sharedPullerState
}
func (f *sendReceiveFolder) performFinish(ignores *ignore.Matcher, state *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
@@ -1899,3 +1925,41 @@ func componentCount(name string) int {
}
return count
}
+
+type byteSemaphore struct {
+ max int
+ available int
+ mut stdsync.Mutex
+ cond *stdsync.Cond
+}
+
+func newByteSemaphore(max int) *byteSemaphore {
+ s := byteSemaphore{
+ max: max,
+ available: max,
+ }
+ s.cond = stdsync.NewCond(&s.mut)
+ return &s
+}
+
+func (s *byteSemaphore) take(bytes int) {
+ if bytes > s.max {
+ panic("bug: more than max bytes will never be available")
+ }
+ s.mut.Lock()
+ for bytes > s.available {
+ s.cond.Wait()
+ }
+ s.available -= bytes
+ s.mut.Unlock()
+}
+
+func (s *byteSemaphore) give(bytes int) {
+ s.mut.Lock()
+ if s.available+bytes > s.max {
+ panic("bug: can never give more than max")
+ }
+ s.available += bytes
+ s.cond.Broadcast()
+ s.mut.Unlock()
+}
diff --git a/lib/model/rwfolder_test.go b/lib/model/rwfolder_test.go
index 17002df23..1df714591 100644
--- a/lib/model/rwfolder_test.go
+++ b/lib/model/rwfolder_test.go
@@ -17,6 +17,7 @@ import (
"testing"
"time"
+ "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
@@ -107,6 +108,9 @@ func setUpSendReceiveFolder(model *Model) *sendReceiveFolder {
model: model,
initialScanFinished: make(chan struct{}),
ctx: context.TODO(),
+ FolderConfiguration: config.FolderConfiguration{
+ PullerMaxPendingKiB: defaultPullerPendingKiB,
+ },
},
fs: fs.NewMtimeFS(fs.NewFilesystem(fs.FilesystemTypeBasic, "testdata"), db.NewNamespacedKV(model.db, "mtime")),
diff --git a/test/h1/config.xml b/test/h1/config.xml
index d7e384c4e..80ccec9c3 100644
--- a/test/h1/config.xml
+++ b/test/h1/config.xml
@@ -8,7 +8,6 @@
1
1
- 16
0
random
false
@@ -28,7 +27,6 @@
1
1
- 16
0
random
false
diff --git a/test/h2/config.xml b/test/h2/config.xml
index 4c94a7903..cf25c4c7d 100644
--- a/test/h2/config.xml
+++ b/test/h2/config.xml
@@ -7,7 +7,6 @@
1
1
- 16
0
random
false
@@ -27,7 +26,6 @@
1
1
- 16
0
random
false
@@ -47,7 +45,6 @@
1
1
- 16
0
random
false
diff --git a/test/h3/config.xml b/test/h3/config.xml
index fce12731e..6eca4321a 100644
--- a/test/h3/config.xml
+++ b/test/h3/config.xml
@@ -9,7 +9,6 @@
1
- 16
0
random
false
@@ -29,7 +28,6 @@
1
1
- 16
0
random
false