2016-08-01 18:55:07 +02:00
|
|
|
package repository
|
|
|
|
|
|
|
|
import (
|
2017-06-04 11:16:55 +02:00
|
|
|
"context"
|
2020-09-20 00:45:11 +02:00
|
|
|
"sync"
|
2016-08-29 19:18:57 +02:00
|
|
|
|
2017-07-23 14:21:03 +02:00
|
|
|
"github.com/restic/restic/internal/debug"
|
2022-04-23 11:28:18 +02:00
|
|
|
"github.com/restic/restic/internal/errors"
|
2017-07-24 17:42:25 +02:00
|
|
|
"github.com/restic/restic/internal/restic"
|
2020-11-04 14:11:29 +01:00
|
|
|
"github.com/restic/restic/internal/ui/progress"
|
|
|
|
|
2020-09-20 00:45:11 +02:00
|
|
|
"golang.org/x/sync/errgroup"
|
2016-08-01 18:55:07 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
// Repack takes a list of packs together with a list of blobs contained in
|
|
|
|
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
|
2017-06-15 14:40:34 +02:00
|
|
|
// into a new pack. Returned is the list of obsolete packs which can then
|
|
|
|
// be removed.
|
2020-11-05 10:33:38 +01:00
|
|
|
//
|
|
|
|
// The map keepBlobs is modified by Repack, it is used to keep track of which
|
|
|
|
// blobs have been processed.
|
2021-09-12 00:03:41 +02:00
|
|
|
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
|
2016-09-27 22:35:08 +02:00
|
|
|
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
|
2016-08-01 18:55:07 +02:00
|
|
|
|
2022-04-23 11:28:18 +02:00
|
|
|
if repo == dstRepo && dstRepo.Backend().Connections() < 2 {
|
|
|
|
return nil, errors.Fatal("repack step requires a backend connection limit of at least two")
|
|
|
|
}
|
|
|
|
|
2020-11-05 17:04:42 +01:00
|
|
|
wg, wgCtx := errgroup.WithContext(ctx)
|
2017-01-23 17:05:30 +01:00
|
|
|
|
2021-08-07 22:52:05 +02:00
|
|
|
dstRepo.StartPackUploader(wgCtx, wg)
|
|
|
|
wg.Go(func() error {
|
|
|
|
var err error
|
|
|
|
obsoletePacks, err = repack(wgCtx, repo, dstRepo, packs, keepBlobs, p)
|
|
|
|
return err
|
|
|
|
})
|
|
|
|
|
|
|
|
if err := wg.Wait(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return obsoletePacks, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
|
|
|
|
wg, wgCtx := errgroup.WithContext(ctx)
|
|
|
|
|
|
|
|
var keepMutex sync.Mutex
|
2021-08-20 10:10:35 +02:00
|
|
|
downloadQueue := make(chan restic.PackBlobs)
|
2020-09-20 00:45:11 +02:00
|
|
|
wg.Go(func() error {
|
|
|
|
defer close(downloadQueue)
|
2021-09-12 00:03:41 +02:00
|
|
|
for pbs := range repo.Index().ListPacks(wgCtx, packs) {
|
2021-08-20 10:10:35 +02:00
|
|
|
var packBlobs []restic.Blob
|
|
|
|
keepMutex.Lock()
|
|
|
|
// filter out unnecessary blobs
|
|
|
|
for _, entry := range pbs.Blobs {
|
|
|
|
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
|
|
|
|
if keepBlobs.Has(h) {
|
|
|
|
packBlobs = append(packBlobs, entry)
|
|
|
|
}
|
2016-08-01 18:55:07 +02:00
|
|
|
}
|
2021-08-20 10:10:35 +02:00
|
|
|
keepMutex.Unlock()
|
2016-08-01 18:55:07 +02:00
|
|
|
|
2020-09-20 00:45:11 +02:00
|
|
|
select {
|
2021-08-20 10:10:35 +02:00
|
|
|
case downloadQueue <- restic.PackBlobs{PackID: pbs.PackID, Blobs: packBlobs}:
|
2020-11-05 17:04:42 +01:00
|
|
|
case <-wgCtx.Done():
|
|
|
|
return wgCtx.Err()
|
2017-01-23 17:05:30 +01:00
|
|
|
}
|
2020-09-20 00:45:11 +02:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
worker := func() error {
|
2021-08-20 10:10:35 +02:00
|
|
|
for t := range downloadQueue {
|
|
|
|
err := StreamPack(wgCtx, repo.Backend().Load, repo.Key(), t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
2020-09-20 00:45:11 +02:00
|
|
|
if err != nil {
|
2022-07-17 12:11:54 +02:00
|
|
|
var ierr error
|
|
|
|
// check whether we can get a valid copy somewhere else
|
|
|
|
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
|
|
|
|
if ierr != nil {
|
|
|
|
// no luck, return the original error
|
|
|
|
return err
|
|
|
|
}
|
2020-09-20 00:45:11 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
keepMutex.Lock()
|
|
|
|
// recheck whether some other worker was faster
|
2021-08-20 10:10:35 +02:00
|
|
|
shouldKeep := keepBlobs.Has(blob)
|
2020-09-20 00:45:11 +02:00
|
|
|
if shouldKeep {
|
2021-08-20 10:10:35 +02:00
|
|
|
keepBlobs.Delete(blob)
|
2020-09-20 00:45:11 +02:00
|
|
|
}
|
|
|
|
keepMutex.Unlock()
|
|
|
|
|
|
|
|
if !shouldKeep {
|
2021-08-20 10:10:35 +02:00
|
|
|
return nil
|
2020-09-20 00:45:11 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// We do want to save already saved blobs!
|
2022-05-01 14:26:57 +02:00
|
|
|
_, _, _, err = dstRepo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true)
|
2020-09-20 00:45:11 +02:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-08-20 10:10:35 +02:00
|
|
|
debug.Log(" saved blob %v", blob.ID)
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2020-09-20 00:45:11 +02:00
|
|
|
}
|
2020-11-04 14:11:29 +01:00
|
|
|
p.Add(1)
|
2016-08-01 18:55:07 +02:00
|
|
|
}
|
2020-09-20 00:45:11 +02:00
|
|
|
return nil
|
|
|
|
}
|
2017-01-23 17:05:30 +01:00
|
|
|
|
2021-08-08 00:38:17 +02:00
|
|
|
// as packs are streamed the concurrency is limited by IO
|
|
|
|
// reduce by one to ensure that uploading is always possible
|
|
|
|
repackWorkerCount := int(repo.Connections() - 1)
|
|
|
|
for i := 0; i < repackWorkerCount; i++ {
|
2020-09-20 00:45:11 +02:00
|
|
|
wg.Go(worker)
|
|
|
|
}
|
2017-01-23 17:05:30 +01:00
|
|
|
|
2020-09-20 00:45:11 +02:00
|
|
|
if err := wg.Wait(); err != nil {
|
|
|
|
return nil, err
|
2016-08-01 18:55:07 +02:00
|
|
|
}
|
|
|
|
|
2021-09-12 00:03:41 +02:00
|
|
|
if err := dstRepo.Flush(ctx); err != nil {
|
2017-06-15 14:40:34 +02:00
|
|
|
return nil, err
|
2016-08-01 18:55:07 +02:00
|
|
|
}
|
|
|
|
|
2017-06-15 14:40:34 +02:00
|
|
|
return packs, nil
|
2016-08-01 18:55:07 +02:00
|
|
|
}
|