diff --git a/lib/config/blockpullorder.go b/lib/config/blockpullorder.go new file mode 100644 index 000000000..03b26552a --- /dev/null +++ b/lib/config/blockpullorder.go @@ -0,0 +1,46 @@ +// Copyright (C) 2020 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package config + +type BlockPullOrder int + +const ( + BlockPullOrderStandard BlockPullOrder = iota // default is standard + BlockPullOrderRandom + BlockPullOrderInOrder +) + +func (o BlockPullOrder) String() string { + switch o { + case BlockPullOrderStandard: + return "standard" + case BlockPullOrderRandom: + return "random" + case BlockPullOrderInOrder: + return "inOrder" + default: + return "unknown" + } +} + +func (o BlockPullOrder) MarshalText() ([]byte, error) { + return []byte(o.String()), nil +} + +func (o *BlockPullOrder) UnmarshalText(bs []byte) error { + switch string(bs) { + case "standard": + *o = BlockPullOrderStandard + case "random": + *o = BlockPullOrderRandom + case "inOrder": + *o = BlockPullOrderInOrder + default: + *o = BlockPullOrderStandard + } + return nil +} diff --git a/lib/config/folderconfiguration.go b/lib/config/folderconfiguration.go index f7c054899..5a594ff89 100644 --- a/lib/config/folderconfiguration.go +++ b/lib/config/folderconfiguration.go @@ -59,6 +59,7 @@ type FolderConfiguration struct { RawModTimeWindowS int `xml:"modTimeWindowS" json:"modTimeWindowS"` MaxConcurrentWrites int `xml:"maxConcurrentWrites" json:"maxConcurrentWrites" default:"2"` DisableFsync bool `xml:"disableFsync" json:"disableFsync"` + BlockPullOrder BlockPullOrder `xml:"blockPullOrder" json:"blockPullOrder"` cachedFilesystem fs.Filesystem cachedModTimeWindow time.Duration diff --git a/lib/model/blockpullreorderer.go b/lib/model/blockpullreorderer.go new file mode 100644 index 000000000..cb5ad9805 --- /dev/null +++ b/lib/model/blockpullreorderer.go @@ -0,0 +1,125 @@ +// Copyright (C) 2020 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package model + +import ( + "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/rand" + "sort" +) + +type blockPullReorderer interface { + Reorder(blocks []protocol.BlockInfo) []protocol.BlockInfo +} + +func newBlockPullReorderer(order config.BlockPullOrder, id protocol.DeviceID, otherDevices []protocol.DeviceID) blockPullReorderer { + switch order { + case config.BlockPullOrderRandom: + return randomOrderBlockPullReorderer{} + case config.BlockPullOrderInOrder: + return inOrderBlockPullReorderer{} + case config.BlockPullOrderStandard: + fallthrough + default: + return newStandardBlockPullReorderer(id, otherDevices) + } +} + +type inOrderBlockPullReorderer struct{} + +func (inOrderBlockPullReorderer) Reorder(blocks []protocol.BlockInfo) []protocol.BlockInfo { + return blocks +} + +type randomOrderBlockPullReorderer struct{} + +func (randomOrderBlockPullReorderer) Reorder(blocks []protocol.BlockInfo) []protocol.BlockInfo { + rand.Shuffle(blocks) + return blocks +} + +type standardBlockPullReorderer struct { + myIndex int + count int + shuffle func(interface{}) // Used for test +} + +func newStandardBlockPullReorderer(id protocol.DeviceID, otherDevices []protocol.DeviceID) *standardBlockPullReorderer { + allDevices := append(otherDevices, id) + sort.Slice(allDevices, func(i, j int) bool { + return allDevices[i].Compare(allDevices[j]) == -1 + }) + // Find our index + myIndex := -1 + for i, dev := range allDevices { + if dev == id { + myIndex = i + break + } + } + if myIndex < 0 { + panic("bug: could not find my own index") + } + return &standardBlockPullReorderer{ + myIndex: myIndex, + count: len(allDevices), + shuffle: rand.Shuffle, + } +} + +func (p *standardBlockPullReorderer) Reorder(blocks []protocol.BlockInfo) []protocol.BlockInfo { + if len(blocks) == 0 { + return blocks + } + + // Split the blocks into len(allDevices) chunks. Chunk count might be less than device count, if there are more + // devices than blocks. + chunks := chunk(blocks, p.count) + + newBlocks := make([]protocol.BlockInfo, 0, len(blocks)) + + // First add our own chunk. We might fall off the list if there are more devices than chunks... + if p.myIndex < len(chunks) { + newBlocks = append(newBlocks, chunks[p.myIndex]...) + } + + // The rest of the chunks we fetch in a random order in whole chunks. + // Generate chunk index slice and shuffle it + indexes := make([]int, 0, len(chunks)-1) + for i := 0; i < len(chunks); i++ { + if i != p.myIndex { + indexes = append(indexes, i) + } + } + + p.shuffle(indexes) + + // Append the chunks in the order of the index slices. + for _, idx := range indexes { + newBlocks = append(newBlocks, chunks[idx]...) + } + + return newBlocks +} + +func chunk(blocks []protocol.BlockInfo, partCount int) [][]protocol.BlockInfo { + if partCount == 0 { + return [][]protocol.BlockInfo{blocks} + } + count := len(blocks) + chunkSize := (count + partCount - 1) / partCount + parts := make([][]protocol.BlockInfo, 0, partCount) + for i := 0; i < count; i += chunkSize { + end := i + chunkSize + if end > count { + end = count + } + parts = append(parts, blocks[i:end]) + } + return parts +} diff --git a/lib/model/blockpullreorderer_test.go b/lib/model/blockpullreorderer_test.go new file mode 100644 index 000000000..f5cc7b4e5 --- /dev/null +++ b/lib/model/blockpullreorderer_test.go @@ -0,0 +1,105 @@ +// Copyright (C) 2020 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package model + +import ( + "github.com/syncthing/syncthing/lib/protocol" + "reflect" + "sort" + "testing" +) + +var ( + someBlocks = []protocol.BlockInfo{{Offset: 1}, {Offset: 2}, {Offset: 3}} +) + +func Test_chunk(t *testing.T) { + type args struct { + blocks []protocol.BlockInfo + partCount int + } + tests := []struct { + name string + args args + want [][]protocol.BlockInfo + }{ + {"one", args{someBlocks, 1}, [][]protocol.BlockInfo{someBlocks}}, + {"two", args{someBlocks, 2}, [][]protocol.BlockInfo{someBlocks[:2], someBlocks[2:]}}, + {"three", args{someBlocks, 3}, [][]protocol.BlockInfo{someBlocks[:1], someBlocks[1:2], someBlocks[2:]}}, + {"four", args{someBlocks, 4}, [][]protocol.BlockInfo{someBlocks[:1], someBlocks[1:2], someBlocks[2:]}}, + // Never happens as myIdx would be -1, so we'd return in order. + {"zero", args{someBlocks, 0}, [][]protocol.BlockInfo{someBlocks}}, + {"empty-one", args{nil, 1}, [][]protocol.BlockInfo{}}, + {"empty-zero", args{nil, 0}, [][]protocol.BlockInfo{nil}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := chunk(tt.args.blocks, tt.args.partCount); !reflect.DeepEqual(got, tt.want) { + t.Errorf("chunk() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_inOrderBlockPullReorderer_Reorder(t *testing.T) { + type args struct { + blocks []protocol.BlockInfo + } + tests := []struct { + name string + blocks []protocol.BlockInfo + want []protocol.BlockInfo + }{ + {"basic", someBlocks, someBlocks}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + in := inOrderBlockPullReorderer{} + if got := in.Reorder(tt.blocks); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Reorder() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_standardBlockPullReorderer_Reorder(t *testing.T) { + // Order the devices, so we know their ordering ahead of time. + devices := []protocol.DeviceID{myID, device1, device2} + sort.Slice(devices, func(i, j int) bool { + return devices[i].Compare(devices[j]) == -1 + }) + + blocks := func(i ...int) []protocol.BlockInfo { + b := make([]protocol.BlockInfo, 0, len(i)) + for _, v := range i { + b = append(b, protocol.BlockInfo{Offset: int64(v)}) + } + return b + } + tests := []struct { + name string + myId protocol.DeviceID + devices []protocol.DeviceID + blocks []protocol.BlockInfo + want []protocol.BlockInfo + }{ + {"front", devices[0], []protocol.DeviceID{devices[1], devices[2]}, blocks(1, 2, 3), blocks(1, 2, 3)}, + {"back", devices[2], []protocol.DeviceID{devices[0], devices[1]}, blocks(1, 2, 3), blocks(3, 1, 2)}, + {"few-blocks", devices[2], []protocol.DeviceID{devices[0], devices[1]}, blocks(1), blocks(1)}, + {"more-than-one-block", devices[1], []protocol.DeviceID{devices[0]}, blocks(1, 2, 3, 4), blocks(3, 4, 1, 2)}, + {"empty-blocks", devices[0], []protocol.DeviceID{devices[1]}, blocks(), blocks()}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := newStandardBlockPullReorderer(tt.myId, tt.devices) + p.shuffle = func(i interface{}) {} // Noop shuffle + if got := p.Reorder(tt.blocks); !reflect.DeepEqual(got, tt.want) { + t.Errorf("reorderBlocksForDevices() = %v, want %v (my idx: %d, count %d)", got, tt.want, p.myIndex, p.count) + } + }) + } +} diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 54bd7cf0a..c6a178773 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -25,7 +25,6 @@ import ( "github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/protocol" - "github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/scanner" "github.com/syncthing/syncthing/lib/sha256" "github.com/syncthing/syncthing/lib/sync" @@ -104,8 +103,9 @@ type sendReceiveFolder struct { fs fs.Filesystem versioner versioner.Versioner - queue *jobQueue - writeLimiter *byteSemaphore + queue *jobQueue + blockPullReorderer blockPullReorderer + writeLimiter *byteSemaphore pullErrors map[string]string // errors for most recent/current iteration oldPullErrors map[string]string // errors from previous iterations for log filtering only @@ -114,12 +114,13 @@ type sendReceiveFolder struct { func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service { f := &sendReceiveFolder{ - folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter), - fs: fs, - versioner: ver, - queue: newJobQueue(), - writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites), - pullErrorsMut: sync.NewMutex(), + folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter), + fs: fs, + versioner: ver, + queue: newJobQueue(), + blockPullReorderer: newBlockPullReorderer(cfg.BlockPullOrder, model.id, cfg.DeviceIDs()), + writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites), + pullErrorsMut: sync.NewMutex(), } f.folder.puller = f f.folder.Service = util.AsService(f.serve, f.String()) @@ -1071,8 +1072,8 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, snap *db.Snapshot blocks = append(blocks, file.Blocks...) } - // Shuffle the blocks - rand.Shuffle(blocks) + // Reorder blocks + blocks = f.blockPullReorderer.Reorder(blocks) f.evLogger.Log(events.ItemStarted, map[string]string{ "folder": f.folderID,