diff --git a/lib/config/folderconfiguration.go b/lib/config/folderconfiguration.go index 50ec9190b..0e79b9ddd 100644 --- a/lib/config/folderconfiguration.go +++ b/lib/config/folderconfiguration.go @@ -57,6 +57,7 @@ type FolderConfiguration struct { MarkerName string `xml:"markerName" json:"markerName"` CopyOwnershipFromParent bool `xml:"copyOwnershipFromParent" json:"copyOwnershipFromParent"` RawModTimeWindowS int `xml:"modTimeWindowS" json:"modTimeWindowS"` + MaxConcurrentWrites int `xml:"maxConcurrentWrites" json:"maxConcurrentWrites" default:"2"` cachedFilesystem fs.Filesystem cachedModTimeWindow time.Duration diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 138d9ff30..c1d11856f 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -9,6 +9,7 @@ package model import ( "bytes" "fmt" + "io" "path/filepath" "runtime" "sort" @@ -102,7 +103,8 @@ type sendReceiveFolder struct { fs fs.Filesystem versioner versioner.Versioner - queue *jobQueue + queue *jobQueue + writeLimiter *byteSemaphore pullErrors map[string]string // errors for most recent/current iteration oldPullErrors map[string]string // errors from previous iterations for log filtering only @@ -115,6 +117,7 @@ func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matche fs: fs, versioner: ver, queue: newJobQueue(), + writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites), pullErrorsMut: sync.NewMutex(), } f.folder.puller = f @@ -1261,10 +1264,9 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch return true } - _, err = dstFd.WriteAt(buf, block.Offset) + _, err = f.limitedWriteAt(dstFd, buf, block.Offset) if err != nil { state.fail(errors.Wrap(err, "dst write")) - } if offset == block.Offset { state.copiedFromOrigin() @@ -1297,7 +1299,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch return false } - _, err = dstFd.WriteAt(buf, block.Offset) + _, err = f.limitedWriteAt(dstFd, buf, block.Offset) if err != nil { state.fail(errors.Wrap(err, "dst write")) } @@ -1446,7 +1448,7 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu } // Save the block data we got from the cluster - _, err = fd.WriteAt(buf, state.block.Offset) + _, err = f.limitedWriteAt(fd, buf, state.block.Offset) if err != nil { state.fail(errors.Wrap(err, "save")) } else { @@ -1936,6 +1938,14 @@ func (f *sendReceiveFolder) inWritableDir(fn func(string) error, path string) er return inWritableDir(fn, f.fs, path, f.IgnorePerms) } +func (f *sendReceiveFolder) limitedWriteAt(fd io.WriterAt, data []byte, offset int64) (int, error) { + if err := f.writeLimiter.takeWithContext(f.ctx, 1); err != nil { + return 0, err + } + defer f.writeLimiter.give(1) + return fd.WriteAt(data, offset) +} + // A []FileError is sent as part of an event and will be JSON serialized. type FileError struct { Path string `json:"path"` diff --git a/lib/model/folder_sendrecv_test.go b/lib/model/folder_sendrecv_test.go index 326d1f217..969669eea 100644 --- a/lib/model/folder_sendrecv_test.go +++ b/lib/model/folder_sendrecv_test.go @@ -107,6 +107,7 @@ func setupSendReceiveFolder(files ...protocol.FileInfo) (*model, *sendReceiveFol }, queue: newJobQueue(), + writeLimiter: newByteSemaphore(2), pullErrors: make(map[string]string), pullErrorsMut: sync.NewMutex(), }