diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 3dae576d9..4fcc8e30c 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -184,17 +184,17 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.TreeJSONBuilder) ( b := &Buffer{Data: buf} res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) - res.Wait(ctx) - if !res.Known() { + sbr := res.Take(ctx) + if !sbr.known { s.TreeBlobs++ - s.TreeSize += uint64(res.Length()) - s.TreeSizeInRepo += uint64(res.SizeInRepo()) + s.TreeSize += uint64(sbr.length) + s.TreeSizeInRepo += uint64(sbr.sizeInRepo) } - // The context was canceled in the meantime, res.ID() might be invalid + // The context was canceled in the meantime, id might be invalid if ctx.Err() != nil { return restic.ID{}, s, ctx.Err() } - return res.ID(), s, nil + return sbr.id, s, nil } // nodeFromFileInfo returns the restic node from an os.FileInfo. diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 0bd7d1fd9..b2b5e59bb 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -44,9 +44,7 @@ func (s *BlobSaver) TriggerShutdown() { // Save stores a blob in the repo. It checks the index and the known blobs // before saving anything. It takes ownership of the buffer passed in. func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { - // buf might be freed once the job was submitted, thus calculate the length now - length := len(buf.Data) - ch := make(chan saveBlobResponse, 1) + ch := make(chan SaveBlobResponse, 1) select { case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: case <-ctx.Done(): @@ -55,72 +53,62 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) Fu return FutureBlob{ch: ch} } - return FutureBlob{ch: ch, length: length} + return FutureBlob{ch: ch} } // FutureBlob is returned by SaveBlob and will return the data once it has been processed. type FutureBlob struct { - ch <-chan saveBlobResponse - length int - res saveBlobResponse + ch <-chan SaveBlobResponse } -// Wait blocks until the result is available or the context is cancelled. -func (s *FutureBlob) Wait(ctx context.Context) { +func (s *FutureBlob) Poll() *SaveBlobResponse { select { - case <-ctx.Done(): - return case res, ok := <-s.ch: if ok { - s.res = res + return &res } + default: } + return nil } -// ID returns the ID of the blob after it has been saved. -func (s *FutureBlob) ID() restic.ID { - return s.res.id -} - -// Known returns whether or not the blob was already known. -func (s *FutureBlob) Known() bool { - return s.res.known -} - -// Length returns the raw length of the blob. -func (s *FutureBlob) Length() int { - return s.length -} - -// SizeInRepo returns the number of bytes added to the repo (including -// compression and crypto overhead). -func (s *FutureBlob) SizeInRepo() int { - return s.res.size +// Take blocks until the result is available or the context is cancelled. +func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse { + select { + case res, ok := <-s.ch: + if ok { + return res + } + case <-ctx.Done(): + } + return SaveBlobResponse{} } type saveBlobJob struct { restic.BlobType buf *Buffer - ch chan<- saveBlobResponse + ch chan<- SaveBlobResponse } -type saveBlobResponse struct { - id restic.ID - known bool - size int +type SaveBlobResponse struct { + id restic.ID + length int + sizeInRepo int + known bool } -func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) { - id, known, size, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false) +func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (SaveBlobResponse, error) { + id, known, sizeInRepo, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false) if err != nil { - return saveBlobResponse{}, err + return SaveBlobResponse{}, err } - return saveBlobResponse{ - id: id, - known: known, - size: size, + return SaveBlobResponse{ + id: id, + length: len(buf), + sizeInRepo: sizeInRepo, + known: known, }, nil } diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go index 69cd4c2e2..481139a3f 100644 --- a/internal/archiver/blob_saver_test.go +++ b/internal/archiver/blob_saver_test.go @@ -54,8 +54,8 @@ func TestBlobSaver(t *testing.T) { } for i, blob := range results { - blob.Wait(ctx) - if blob.Known() { + sbr := blob.Take(ctx) + if sbr.known { t.Errorf("blob %v is known, that should not be the case", i) } } diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 1e6eea979..52dd59113 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -129,6 +129,15 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat chnker.Reset(f, s.pol) var results []FutureBlob + complete := func(sbr SaveBlobResponse) { + if !sbr.known { + stats.DataBlobs++ + stats.DataSize += uint64(sbr.length) + stats.DataSizeInRepo += uint64(sbr.sizeInRepo) + } + + node.Content = append(node.Content, sbr.id) + } node.Content = []restic.ID{} var size uint64 @@ -168,6 +177,17 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } s.CompleteBlob(f.Name(), uint64(len(chunk.Data))) + + // collect already completed blobs + for len(results) > 0 { + sbr := results[0].Poll() + if sbr == nil { + break + } + results[0] = FutureBlob{} + results = results[1:] + complete(*sbr) + } } err = f.Close() @@ -176,15 +196,10 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return fnr } - for _, res := range results { - res.Wait(ctx) - if !res.Known() { - stats.DataBlobs++ - stats.DataSize += uint64(res.Length()) - stats.DataSizeInRepo += uint64(res.SizeInRepo()) - } - - node.Content = append(node.Content, res.ID()) + for i, res := range results { + results[i] = FutureBlob{} + sbr := res.Take(ctx) + complete(sbr) } node.Size = size diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index 0bdb8ad50..e4d1dcdb8 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -34,7 +34,7 @@ func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Cont wg, ctx := errgroup.WithContext(ctx) saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan saveBlobResponse) + ch := make(chan SaveBlobResponse) close(ch) return FutureBlob{ch: ch} }