diff --git a/cmd/restic/delete.go b/cmd/restic/delete.go index 98dd91ece..d97b9e617 100644 --- a/cmd/restic/delete.go +++ b/cmd/restic/delete.go @@ -18,8 +18,6 @@ func DeleteFilesChecked(gopts GlobalOptions, repo restic.Repository, fileList re return deleteFiles(gopts, false, repo, fileList, fileType) } -const numDeleteWorkers = 8 - // deleteFiles deletes the given fileList of fileType in parallel // if ignoreError=true, it will print a warning if there was an error, else it will abort. func deleteFiles(gopts GlobalOptions, ignoreError bool, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) error { @@ -40,7 +38,9 @@ func deleteFiles(gopts GlobalOptions, ignoreError bool, repo restic.Repository, bar := newProgressMax(!gopts.JSON && !gopts.Quiet, uint64(totalCount), "files deleted") defer bar.Done() - for i := 0; i < numDeleteWorkers; i++ { + // deleting files is IO-bound + workerCount := repo.Connections() + for i := 0; i < int(workerCount); i++ { wg.Go(func() error { for id := range fileChan { h := restic.Handle{Type: fileType, Name: id.String()} diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 2cb0f1e36..b972408f5 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "io/ioutil" + "runtime" "sort" "sync" @@ -55,8 +56,6 @@ func New(repo restic.Repository, trackUnused bool) *Checker { return c } -const defaultParallelism = 5 - // ErrDuplicatePacks is returned when a pack is found in more than one index. type ErrDuplicatePacks struct { PackID restic.ID @@ -322,7 +321,9 @@ func (c *Checker) Structure(ctx context.Context, p *progress.Counter, errChan ch }, p) defer close(errChan) - for i := 0; i < defaultParallelism; i++ { + // The checkTree worker only processes already decoded trees and is thus CPU-bound + workerCount := runtime.GOMAXPROCS(0) + for i := 0; i < workerCount; i++ { wg.Go(func() error { c.checkTreeWorker(ctx, treeStream, errChan) return nil @@ -574,8 +575,10 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p } ch := make(chan checkTask) + // as packs are streamed the concurrency is limited by IO + workerCount := int(c.repo.Connections()) // run workers - for i := 0; i < defaultParallelism; i++ { + for i := 0; i < workerCount; i++ { g.Go(func() error { // create a buffer that is large enough to be reused by repository.StreamPack // this ensures that we can read the pack header later on diff --git a/internal/repository/index_parallel.go b/internal/repository/index_parallel.go index 47aaba85a..779387385 100644 --- a/internal/repository/index_parallel.go +++ b/internal/repository/index_parallel.go @@ -2,6 +2,7 @@ package repository import ( "context" + "runtime" "sync" "github.com/restic/restic/internal/debug" @@ -9,8 +10,6 @@ import ( "golang.org/x/sync/errgroup" ) -const loadIndexParallelism = 5 - // ForAllIndexes loads all index files in parallel and calls the given callback. // It is guaranteed that the function is not run concurrently. If the callback // returns an error, this function is cancelled and also returns that error. @@ -68,8 +67,11 @@ func ForAllIndexes(ctx context.Context, repo restic.Repository, return nil } + // decoding an index can take quite some time such that this can be both CPU- or IO-bound + // as the whole index is kept in memory anyways, a few workers too much don't matter + workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) // run workers on ch - for i := 0; i < loadIndexParallelism; i++ { + for i := 0; i < workerCount; i++ { wg.Go(worker) } diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index d6f0962d3..955080e82 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "runtime" "sync" "github.com/restic/restic/internal/debug" @@ -291,8 +292,6 @@ func (mi *MasterIndex) MergeFinalIndexes() error { return nil } -const saveIndexParallelism = 4 - // Save saves all known indexes to index files, leaving out any // packs whose ID is contained in packBlacklist from finalized indexes. // The new index contains the IDs of all known indexes in the "supersedes" @@ -376,8 +375,10 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverUnpacked, pack return nil } + // encoding an index can take quite some time such that this can be both CPU- or IO-bound + workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) // run workers on ch - for i := 0; i < saveIndexParallelism; i++ { + for i := 0; i < workerCount; i++ { wg.Go(worker) } err = wg.Wait() diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 7840e714a..0a50bfcf4 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -12,8 +12,6 @@ import ( "golang.org/x/sync/errgroup" ) -const numRepackWorkers = 8 - // Repack takes a list of packs together with a list of blobs contained in // these packs. Each pack is loaded and the blobs listed in keepBlobs is saved // into a new pack. Returned is the list of obsolete packs which can then @@ -107,11 +105,10 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito return nil } - connectionLimit := dstRepo.Backend().Connections() - 1 - if connectionLimit > numRepackWorkers { - connectionLimit = numRepackWorkers - } - for i := 0; i < int(connectionLimit); i++ { + // as packs are streamed the concurrency is limited by IO + // reduce by one to ensure that uploading is always possible + repackWorkerCount := int(repo.Connections() - 1) + for i := 0; i < repackWorkerCount; i++ { wg.Go(worker) } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index a185032b5..8d3fd9414 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -558,6 +558,10 @@ func (r *Repository) Backend() restic.Backend { return r.be } +func (r *Repository) Connections() uint { + return r.be.Connections() +} + // Index returns the currently used MasterIndex. func (r *Repository) Index() restic.MasterIndex { return r.idx @@ -606,8 +610,6 @@ func (r *Repository) LoadIndex(ctx context.Context) error { return r.PrepareCache() } -const listPackParallelism = 10 - // CreateIndexFromPacks creates a new index by reading all given pack files (with sizes). // The index is added to the MasterIndex but not marked as finalized. // Returned is the list of pack files which could not be read. @@ -656,8 +658,10 @@ func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[rest return nil } + // decoding the pack header is usually quite fast, thus we are primarily IO-bound + workerCount := int(r.Connections()) // run workers on ch - for i := 0; i < listPackParallelism; i++ { + for i := 0; i < workerCount; i++ { wg.Go(worker) } diff --git a/internal/restic/find.go b/internal/restic/find.go index c7406d750..695bbbb91 100644 --- a/internal/restic/find.go +++ b/internal/restic/find.go @@ -12,6 +12,7 @@ import ( type TreeLoader interface { LoadTree(context.Context, ID) (*Tree, error) LookupBlobSize(id ID, tpe BlobType) (uint, bool) + Connections() uint } // FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data diff --git a/internal/restic/find_test.go b/internal/restic/find_test.go index 4d9bc5a13..46f607299 100644 --- a/internal/restic/find_test.go +++ b/internal/restic/find_test.go @@ -170,6 +170,10 @@ func (r ForbiddenRepo) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, return 0, false } +func (r ForbiddenRepo) Connections() uint { + return 2 +} + func TestFindUsedBlobsSkipsSeenBlobs(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() diff --git a/internal/restic/lock.go b/internal/restic/lock.go index 27daa34a6..8411d5ad5 100644 --- a/internal/restic/lock.go +++ b/internal/restic/lock.go @@ -287,8 +287,6 @@ func RemoveAllLocks(ctx context.Context, repo Repository) error { }) } -const loadLockParallelism = 5 - // ForAllLocks reads all locks in parallel and calls the given callback. // It is guaranteed that the function is not run concurrently. If the // callback returns an error, this function is cancelled and also returns that error. @@ -336,7 +334,8 @@ func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID return nil } - for i := 0; i < loadLockParallelism; i++ { + // For locks decoding is nearly for free, thus just assume were only limited by IO + for i := 0; i < int(repo.Connections()); i++ { wg.Go(worker) } diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 73f53adb1..1daeb0e4f 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -14,6 +14,8 @@ type Repository interface { // Backend returns the backend used by the repository Backend() Backend + // Connections returns the maximum number of concurrent backend operations + Connections() uint Key() *crypto.Key @@ -64,11 +66,15 @@ type Lister interface { // LoadJSONUnpackeder allows loading a JSON file not stored in a pack file type LoadJSONUnpackeder interface { + // Connections returns the maximum number of concurrent backend operations + Connections() uint LoadJSONUnpacked(ctx context.Context, t FileType, id ID, dest interface{}) error } // SaverUnpacked allows saving a blob not stored in a pack file type SaverUnpacked interface { + // Connections returns the maximum number of concurrent backend operations + Connections() uint SaveUnpacked(context.Context, FileType, []byte) (ID, error) } diff --git a/internal/restic/snapshot.go b/internal/restic/snapshot.go index a74893778..1652d85d8 100644 --- a/internal/restic/snapshot.go +++ b/internal/restic/snapshot.go @@ -69,8 +69,6 @@ func LoadSnapshot(ctx context.Context, loader LoadJSONUnpackeder, id ID) (*Snaps return sn, nil } -const loadSnapshotParallelism = 5 - // ForAllSnapshots reads all snapshots in parallel and calls the // given function. It is guaranteed that the function is not run concurrently. // If the called function returns an error, this function is cancelled and @@ -125,7 +123,8 @@ func ForAllSnapshots(ctx context.Context, be Lister, loader LoadJSONUnpackeder, return nil } - for i := 0; i < loadSnapshotParallelism; i++ { + // For most snapshots decoding is nearly for free, thus just assume were only limited by IO + for i := 0; i < int(loader.Connections()); i++ { wg.Go(worker) } diff --git a/internal/restic/tree_stream.go b/internal/restic/tree_stream.go index f6982efc2..39f1c6646 100644 --- a/internal/restic/tree_stream.go +++ b/internal/restic/tree_stream.go @@ -3,6 +3,7 @@ package restic import ( "context" "errors" + "runtime" "sync" "github.com/restic/restic/internal/debug" @@ -10,8 +11,6 @@ import ( "golang.org/x/sync/errgroup" ) -const streamTreeParallelism = 6 - // TreeItem is used to return either an error or the tree for a tree id type TreeItem struct { ID @@ -163,7 +162,10 @@ func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees var loadTreeWg sync.WaitGroup - for i := 0; i < streamTreeParallelism; i++ { + // decoding a tree can take quite some time such that this can be both CPU- or IO-bound + // one extra worker to handle huge tree blobs + workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + 1 + for i := 0; i < workerCount; i++ { workerLoaderChan := loaderChan if i == 0 { workerLoaderChan = hugeTreeChan diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index d255dad15..362d821d2 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -20,8 +20,6 @@ import ( // con: each worker needs to keep one pack in memory const ( - workerCount = 8 - largeFileBlobCount = 25 ) @@ -51,6 +49,7 @@ type fileRestorer struct { idx func(restic.BlobHandle) []restic.PackedBlob packLoader repository.BackendLoadFn + workerCount int filesWriter *filesWriter dst string @@ -61,13 +60,18 @@ type fileRestorer struct { func newFileRestorer(dst string, packLoader repository.BackendLoadFn, key *crypto.Key, - idx func(restic.BlobHandle) []restic.PackedBlob) *fileRestorer { + idx func(restic.BlobHandle) []restic.PackedBlob, + connections uint) *fileRestorer { + + // as packs are streamed the concurrency is limited by IO + workerCount := int(connections) return &fileRestorer{ key: key, idx: idx, packLoader: packLoader, filesWriter: newFilesWriter(workerCount), + workerCount: workerCount, dst: dst, Error: restorerAbortOnAllErrors, } @@ -150,7 +154,7 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { } return nil } - for i := 0; i < workerCount; i++ { + for i := 0; i < r.workerCount; i++ { wg.Go(worker) } diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index f5760f54a..b5b52778c 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -150,7 +150,7 @@ func newTestRepo(content []TestFile) *TestRepo { func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files map[string]bool) { repo := newTestRepo(content) - r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup) + r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2) if files == nil { r.files = repo.files @@ -264,7 +264,7 @@ func TestErrorRestoreFiles(t *testing.T) { return loadError } - r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup) + r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2) r.files = repo.files err := r.restoreFiles(context.TODO()) @@ -304,7 +304,7 @@ func testPartialDownloadError(t *testing.T, part int) { return loader(ctx, h, length, offset, fn) } - r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup) + r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2) r.files = repo.files r.Error = func(s string, e error) error { // ignore errors as in the `restore` command diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index a1e5b3628..5a3f91368 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -219,7 +219,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { } idx := restic.NewHardlinkIndex() - filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup) + filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup, res.repo.Connections()) filerestorer.Error = res.Error debug.Log("first pass for %q", dst) diff --git a/internal/walker/walker_test.go b/internal/walker/walker_test.go index 7a939b1e2..eb4545d63 100644 --- a/internal/walker/walker_test.go +++ b/internal/walker/walker_test.go @@ -76,6 +76,10 @@ func (t TreeMap) LoadTree(ctx context.Context, id restic.ID) (*restic.Tree, erro return tree, nil } +func (t TreeMap) Connections() uint { + return 2 +} + // checkFunc returns a function suitable for walking the tree to check // something, and a function which will check the final result. type checkFunc func(t testing.TB) (walker WalkFunc, final func(testing.TB))