From 4f97492d28126d80311736301fbbe1a98694f6b0 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Aug 2021 22:20:49 +0200 Subject: [PATCH 1/6] Backend: Expose connections parameter --- internal/backend/azure/azure.go | 6 +++++ internal/backend/b2/b2.go | 4 ++++ internal/backend/dryrun/dry_backend.go | 4 ++++ internal/backend/gs/gs.go | 18 ++++++++++----- internal/backend/mem/mem_backend.go | 4 ++++ internal/backend/rest/rest.go | 20 +++++++++++------ internal/backend/s3/s3.go | 4 ++++ internal/backend/swift/swift.go | 20 +++++++++++------ internal/mock/backend.go | 31 +++++++++++++++++--------- internal/restic/backend.go | 3 +++ 10 files changed, 83 insertions(+), 31 deletions(-) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index b20579f2c..243a1eaef 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -24,6 +24,7 @@ import ( type Backend struct { accountName string container *storage.Container + connections uint sem *backend.Semaphore prefix string listMaxItems int @@ -55,6 +56,7 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { be := &Backend{ container: service.GetContainerReference(cfg.Container), accountName: cfg.AccountName, + connections: cfg.Connections, sem: sem, prefix: cfg.Prefix, Layout: &backend.DefaultLayout{ @@ -109,6 +111,10 @@ func (be *Backend) Join(p ...string) string { return path.Join(p...) } +func (be *Backend) Connections() uint { + return be.connections +} + // Location returns this backend's location (the container name). func (be *Backend) Location() string { return be.Join(be.container.Name, be.prefix) diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 90aeca3b2..6108aaf5c 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -133,6 +133,10 @@ func (be *b2Backend) SetListMaxItems(i int) { be.listMaxItems = i } +func (be *b2Backend) Connections() uint { + return be.cfg.Connections +} + // Location returns the location for the backend. func (be *b2Backend) Location() string { return be.cfg.Bucket diff --git a/internal/backend/dryrun/dry_backend.go b/internal/backend/dryrun/dry_backend.go index 8412bd26a..44eee9a45 100644 --- a/internal/backend/dryrun/dry_backend.go +++ b/internal/backend/dryrun/dry_backend.go @@ -45,6 +45,10 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { return nil } +func (be *Backend) Connections() uint { + return be.b.Connections() +} + // Location returns the location of the backend. func (be *Backend) Location() string { return "DRY:" + be.b.Location() diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 443de70e5..c87211be3 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -34,6 +34,7 @@ import ( type Backend struct { gcsClient *storage.Client projectID string + connections uint sem *backend.Semaphore bucketName string bucket *storage.BucketHandle @@ -102,12 +103,13 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { } be := &Backend{ - gcsClient: gcsClient, - projectID: cfg.ProjectID, - sem: sem, - bucketName: cfg.Bucket, - bucket: gcsClient.Bucket(cfg.Bucket), - prefix: cfg.Prefix, + gcsClient: gcsClient, + projectID: cfg.ProjectID, + connections: cfg.Connections, + sem: sem, + bucketName: cfg.Bucket, + bucket: gcsClient.Bucket(cfg.Bucket), + prefix: cfg.Prefix, Layout: &backend.DefaultLayout{ Path: cfg.Prefix, Join: path.Join, @@ -185,6 +187,10 @@ func (be *Backend) Join(p ...string) string { return path.Join(p...) } +func (be *Backend) Connections() uint { + return be.connections +} + // Location returns this backend's location (the bucket name). func (be *Backend) Location() string { return be.Join(be.bucketName, be.prefix) diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index 9e3cd0e74..8f3a52d02 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -229,6 +229,10 @@ func (be *MemoryBackend) List(ctx context.Context, t restic.FileType, fn func(re return ctx.Err() } +func (be *MemoryBackend) Connections() uint { + return 2 +} + // Location returns the location of the backend (RAM). func (be *MemoryBackend) Location() string { return "RAM" diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index c7675cba1..1e372229a 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -29,9 +29,10 @@ var _ restic.Backend = &Backend{} // Backend uses the REST protocol to access data stored on a server. type Backend struct { - url *url.URL - sem *backend.Semaphore - client *http.Client + url *url.URL + connections uint + sem *backend.Semaphore + client *http.Client backend.Layout } @@ -57,10 +58,11 @@ func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { } be := &Backend{ - url: cfg.URL, - client: client, - Layout: &backend.RESTLayout{URL: url, Join: path.Join}, - sem: sem, + url: cfg.URL, + client: client, + Layout: &backend.RESTLayout{URL: url, Join: path.Join}, + connections: cfg.Connections, + sem: sem, } return be, nil @@ -105,6 +107,10 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, er return be, nil } +func (b *Backend) Connections() uint { + return b.connections +} + // Location returns this backend's location (the server's URL). func (b *Backend) Location() string { return b.url.String() diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index 0d7c74bf4..1bdf2d795 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -255,6 +255,10 @@ func (be *Backend) ReadDir(ctx context.Context, dir string) (list []os.FileInfo, return list, nil } +func (be *Backend) Connections() uint { + return be.cfg.Connections +} + // Location returns this backend's location (the bucket name). func (be *Backend) Location() string { return be.Join(be.cfg.Bucket, be.cfg.Prefix) diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 8d82d90cb..6157002b5 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -24,10 +24,11 @@ import ( // beSwift is a backend which stores the data on a swift endpoint. type beSwift struct { - conn *swift.Connection - sem *backend.Semaphore - container string // Container name - prefix string // Prefix of object names in the container + conn *swift.Connection + connections uint + sem *backend.Semaphore + container string // Container name + prefix string // Prefix of object names in the container backend.Layout } @@ -68,9 +69,10 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend Transport: rt, }, - sem: sem, - container: cfg.Container, - prefix: cfg.Prefix, + connections: cfg.Connections, + sem: sem, + container: cfg.Container, + prefix: cfg.Prefix, Layout: &backend.DefaultLayout{ Path: cfg.Prefix, Join: path.Join, @@ -113,6 +115,10 @@ func (be *beSwift) createContainer(ctx context.Context, policy string) error { return be.conn.ContainerCreate(ctx, be.container, h) } +func (be *beSwift) Connections() uint { + return be.connections +} + // Location returns this backend's location (the container name). func (be *beSwift) Location() string { return be.container diff --git a/internal/mock/backend.go b/internal/mock/backend.go index 9f6036fdb..05fe1dc6e 100644 --- a/internal/mock/backend.go +++ b/internal/mock/backend.go @@ -11,17 +11,18 @@ import ( // Backend implements a mock backend. type Backend struct { - CloseFn func() error - IsNotExistFn func(err error) bool - SaveFn func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error - OpenReaderFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) - StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) - ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error - RemoveFn func(ctx context.Context, h restic.Handle) error - TestFn func(ctx context.Context, h restic.Handle) (bool, error) - DeleteFn func(ctx context.Context) error - LocationFn func() string - HasherFn func() hash.Hash + CloseFn func() error + IsNotExistFn func(err error) bool + SaveFn func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error + OpenReaderFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) + StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) + ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error + RemoveFn func(ctx context.Context, h restic.Handle) error + TestFn func(ctx context.Context, h restic.Handle) (bool, error) + DeleteFn func(ctx context.Context) error + ConnectionsFn func() uint + LocationFn func() string + HasherFn func() hash.Hash } // NewBackend returns new mock Backend instance @@ -39,6 +40,14 @@ func (m *Backend) Close() error { return m.CloseFn() } +func (m *Backend) Connections() uint { + if m.ConnectionsFn == nil { + return 2 + } + + return m.ConnectionsFn() +} + // Location returns a location string. func (m *Backend) Location() string { if m.LocationFn == nil { diff --git a/internal/restic/backend.go b/internal/restic/backend.go index 41292470a..1203bf3d3 100644 --- a/internal/restic/backend.go +++ b/internal/restic/backend.go @@ -18,6 +18,9 @@ type Backend interface { // repository. Location() string + // Connections returns the maxmimum number of concurrent backend operations. + Connections() uint + // Hasher may return a hash function for calculating a content hash for the backend Hasher() hash.Hash From ee627cd832622377e7640333f08f9c3de66b419d Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 23 Apr 2022 11:22:00 +0200 Subject: [PATCH 2/6] backend/mem: Actually enforce connection limit This will allow tests to detect deadlocks related to the connections limit. --- internal/backend/mem/mem_backend.go | 35 ++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index 8f3a52d02..69476b693 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -25,17 +25,26 @@ var _ restic.Backend = &MemoryBackend{} var errNotFound = errors.New("not found") +const connectionCount = 2 + // MemoryBackend is a mock backend that uses a map for storing all data in // memory. This should only be used for tests. type MemoryBackend struct { data memMap m sync.Mutex + sem *backend.Semaphore } // New returns a new backend that saves all data in a map in memory. func New() *MemoryBackend { + sem, err := backend.NewSemaphore(connectionCount) + if err != nil { + panic(err) + } + be := &MemoryBackend{ data: make(memMap), + sem: sem, } debug.Log("created new memory backend") @@ -45,6 +54,9 @@ func New() *MemoryBackend { // Test returns whether a file exists. func (be *MemoryBackend) Test(ctx context.Context, h restic.Handle) (bool, error) { + be.sem.GetToken() + defer be.sem.ReleaseToken() + be.m.Lock() defer be.m.Unlock() @@ -68,6 +80,9 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.Re return backoff.Permanent(err) } + be.sem.GetToken() + defer be.sem.ReleaseToken() + be.m.Lock() defer be.m.Unlock() @@ -120,6 +135,7 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length return nil, backoff.Permanent(err) } + be.sem.GetToken() be.m.Lock() defer be.m.Unlock() @@ -131,15 +147,18 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length debug.Log("Load %v offset %v len %v", h, offset, length) if offset < 0 { + be.sem.ReleaseToken() return nil, errors.New("offset is negative") } if _, ok := be.data[h]; !ok { + be.sem.ReleaseToken() return nil, errNotFound } buf := be.data[h] if offset > int64(len(buf)) { + be.sem.ReleaseToken() return nil, errors.New("offset beyond end of file") } @@ -148,18 +167,21 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length buf = buf[:length] } - return ioutil.NopCloser(bytes.NewReader(buf)), ctx.Err() + return be.sem.ReleaseTokenOnClose(ioutil.NopCloser(bytes.NewReader(buf)), nil), ctx.Err() } // Stat returns information about a file in the backend. func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - be.m.Lock() - defer be.m.Unlock() - if err := h.Valid(); err != nil { return restic.FileInfo{}, backoff.Permanent(err) } + be.sem.GetToken() + defer be.sem.ReleaseToken() + + be.m.Lock() + defer be.m.Unlock() + h.ContainedBlobType = restic.InvalidBlob if h.Type == restic.ConfigFile { h.Name = "" @@ -177,6 +199,9 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File // Remove deletes a file from the backend. func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error { + be.sem.GetToken() + defer be.sem.ReleaseToken() + be.m.Lock() defer be.m.Unlock() @@ -230,7 +255,7 @@ func (be *MemoryBackend) List(ctx context.Context, t restic.FileType, fn func(re } func (be *MemoryBackend) Connections() uint { - return 2 + return connectionCount } // Location returns the location of the backend (RAM). From e597b99b55d0f00a23340f5b09961cdc269623e6 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 23 Apr 2022 11:28:18 +0200 Subject: [PATCH 3/6] repository: Reduce repack workers to prevent deadlock As repack streams packs these occupy one backend connection. Uploading a new pack also requires a backend connection. To prevent a deadlock during repack when reaching the backend connections limit, simply limit the repackWorker count to always leave one connection for uploading. --- internal/repository/repack.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 14fef3f20..4d0ca8236 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" @@ -23,6 +24,10 @@ const numRepackWorkers = 8 func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs)) + if repo == dstRepo && dstRepo.Backend().Connections() < 2 { + return nil, errors.Fatal("repack step requires a backend connection limit of at least two") + } + var keepMutex sync.Mutex wg, wgCtx := errgroup.WithContext(ctx) @@ -86,7 +91,11 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito return nil } - for i := 0; i < numRepackWorkers; i++ { + connectionLimit := dstRepo.Backend().Connections() - 1 + if connectionLimit > numRepackWorkers { + connectionLimit = numRepackWorkers + } + for i := 0; i < int(connectionLimit); i++ { wg.Go(worker) } From f5609d1d3c3ec5e28e12c47e8eb1797931cacb1e Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 23 Apr 2022 11:32:52 +0200 Subject: [PATCH 4/6] prune: Fail early if too few backend connections --- cmd/restic/cmd_prune.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index 195b2554d..9447f8145 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -132,6 +132,10 @@ func runPrune(opts PruneOptions, gopts GlobalOptions) error { return err } + if repo.Backend().Connections() < 2 { + return errors.Fatal("prune requires a backend connection limit of at least two") + } + lock, err := lockRepoExclusive(gopts.ctx, repo) defer unlockRepo(lock) if err != nil { From 566ac11c655e01ddc112fd3c858dcd1c1eb46d62 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 23 Apr 2022 11:37:00 +0200 Subject: [PATCH 5/6] fix changelog name --- changelog/unreleased/{pr-3680 => pull-3680} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog/unreleased/{pr-3680 => pull-3680} (100%) diff --git a/changelog/unreleased/pr-3680 b/changelog/unreleased/pull-3680 similarity index 100% rename from changelog/unreleased/pr-3680 rename to changelog/unreleased/pull-3680 From 3b630d9998e77ac3ef24e45fd634a3492374bcf2 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 23 Apr 2022 11:43:45 +0200 Subject: [PATCH 6/6] add missing streamPacks changelog --- changelog/unreleased/pull-3484 | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 changelog/unreleased/pull-3484 diff --git a/changelog/unreleased/pull-3484 b/changelog/unreleased/pull-3484 new file mode 100644 index 000000000..be95e3283 --- /dev/null +++ b/changelog/unreleased/pull-3484 @@ -0,0 +1,11 @@ +Enhancement: Stream data in check and prune commands + +`check --read-data` and `prune` downloaded data files into temporary files +which can end up being written to disk. This could cause a large amount of data +being written to disk. The pack files are now streamed which no longer needs +temporary files. Please note that uploads during `backup` and `prune` still +require temporary files. + +https://github.com/restic/restic/pull/3484 +https://github.com/restic/restic/issues/3710 +https://github.com/restic/restic/pull/3717