diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index dda86c11e..37f1a378d 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -68,6 +68,9 @@ type Archiver struct { // be in the snapshot after saving. s contains some statistics about this // particular file/dir. // + // Once reading a file has completed successfully (but not saving it yet), + // CompleteItem will be called with current == nil. + // // CompleteItem may be called asynchronously from several different // goroutines! CompleteItem func(item string, previous, current *restic.Node, s ItemStats, d time.Duration) @@ -431,6 +434,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous // Save will close the file, we don't need to do that fn = arch.fileSaver.Save(ctx, snPath, target, file, fi, func() { arch.StartFile(snPath) + }, func() { + arch.CompleteItem(snPath, nil, nil, ItemStats{}, 0) }, func(node *restic.Node, stats ItemStats) { arch.CompleteItem(snPath, previous, node, stats, time.Since(start)) }) diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 86b5386be..60859857e 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -53,6 +53,8 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem } var ( + completeReadingCallback bool + completeCallbackNode *restic.Node completeCallbackStats ItemStats completeCallback bool @@ -60,6 +62,13 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem startCallback bool ) + completeReading := func() { + completeReadingCallback = true + if completeCallback { + t.Error("callbacks called in wrong order") + } + } + complete := func(node *restic.Node, stats ItemStats) { completeCallback = true completeCallbackNode = node @@ -80,7 +89,7 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Fatal(err) } - res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, complete) + res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, completeReading, complete) fnr := res.take(ctx) if fnr.err != nil { @@ -101,6 +110,10 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Errorf("start callback did not happen") } + if !completeReadingCallback { + t.Errorf("completeReading callback did not happen") + } + if !completeCallback { t.Errorf("complete callback did not happen") } diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 1c9352ef2..c7303b929 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -67,17 +67,21 @@ func (s *FileSaver) TriggerShutdown() { type CompleteFunc func(*restic.Node, ItemStats) // Save stores the file f and returns the data once it has been completed. The -// file is closed by Save. -func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureNode { +// file is closed by Save. completeReading is only called if the file was read +// successfully. complete is always called. If completeReading is called, then +// this will always happen before calling complete. +func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), completeReading func(), complete CompleteFunc) FutureNode { fn, ch := newFutureNode() job := saveFileJob{ - snPath: snPath, - target: target, - file: file, - fi: fi, - start: start, - complete: complete, - ch: ch, + snPath: snPath, + target: target, + file: file, + fi: fi, + ch: ch, + + start: start, + completeReading: completeReading, + complete: complete, } select { @@ -92,17 +96,19 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file } type saveFileJob struct { - snPath string - target string - file fs.File - fi os.FileInfo - ch chan<- futureNodeResult - complete CompleteFunc - start func() + snPath string + target string + file fs.File + fi os.FileInfo + ch chan<- futureNodeResult + + start func() + completeReading func() + complete CompleteFunc } // saveFile stores the file f in the repo, then closes it. -func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func(), finish func(res futureNodeResult)) { +func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func(), finishReading func(), finish func(res futureNodeResult)) { start() fnr := futureNodeResult{ @@ -227,6 +233,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat // after reaching the end of this method remaining += idx + 1 lock.Unlock() + finishReading() completeBlob() } @@ -246,7 +253,11 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { } } - s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start, func(res futureNodeResult) { + s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start, func() { + if job.completeReading != nil { + job.completeReading() + } + }, func(res futureNodeResult) { if job.complete != nil { job.complete(res.node, res.stats) } diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index dde4356fc..e32250496 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -60,6 +60,7 @@ func TestFileSaver(t *testing.T) { defer cleanup() startFn := func() {} + completeReadingFn := func() {} completeFn := func(*restic.Node, ItemStats) {} testFs := fs.Local{} @@ -78,7 +79,7 @@ func TestFileSaver(t *testing.T) { t.Fatal(err) } - ff := s.Save(ctx, filename, filename, f, fi, startFn, completeFn) + ff := s.Save(ctx, filename, filename, f, fi, startFn, completeReadingFn, completeFn) results = append(results, ff) }