2
2
mirror of https://github.com/octoleo/restic.git synced 2024-06-26 04:22:36 +00:00

Merge pull request #3773 from MichaelEischer/efficient-dir-json

Reduce memory usage for large directories/files
This commit is contained in:
MichaelEischer 2022-07-23 17:47:32 +02:00 committed by GitHub
commit 4ffd479ba4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 377 additions and 336 deletions

View File

@ -0,0 +1,7 @@
Enhancement: Optimize memory usage for directories with many files
Backing up a directory with hundred thousands or more files causes restic to
require large amounts of memory. We have optimized `backup` command such that
it requires up to 30% less memory.
https://github.com/restic/restic/pull/3773

View File

@ -647,7 +647,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
} }
errorHandler := func(item string, err error) error { errorHandler := func(item string, err error) error {
return progressReporter.Error(item, nil, err) return progressReporter.Error(item, err)
} }
messageHandler := func(msg string, args ...interface{}) { messageHandler := func(msg string, args ...interface{}) {
@ -690,9 +690,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
arch.Select = selectFilter arch.Select = selectFilter
arch.WithAtime = opts.WithAtime arch.WithAtime = opts.WithAtime
success := true success := true
arch.Error = func(item string, fi os.FileInfo, err error) error { arch.Error = func(item string, err error) error {
success = false success = false
return progressReporter.Error(item, fi, err) return progressReporter.Error(item, err)
} }
arch.CompleteItem = progressReporter.CompleteItem arch.CompleteItem = progressReporter.CompleteItem
arch.StartFile = progressReporter.StartFile arch.StartFile = progressReporter.StartFile

View File

@ -2,7 +2,6 @@ package archiver
import ( import (
"context" "context"
"encoding/json"
"os" "os"
"path" "path"
"runtime" "runtime"
@ -27,7 +26,7 @@ type SelectFunc func(item string, fi os.FileInfo) bool
// ErrorFunc is called when an error during archiving occurs. When nil is // ErrorFunc is called when an error during archiving occurs. When nil is
// returned, the archiver continues, otherwise it aborts and passes the error // returned, the archiver continues, otherwise it aborts and passes the error
// up the call stack. // up the call stack.
type ErrorFunc func(file string, fi os.FileInfo, err error) error type ErrorFunc func(file string, err error) error
// ItemStats collects some statistics about a particular file or directory. // ItemStats collects some statistics about a particular file or directory.
type ItemStats struct { type ItemStats struct {
@ -157,7 +156,7 @@ func New(repo restic.Repository, fs fs.FS, opts Options) *Archiver {
} }
// error calls arch.Error if it is set and the error is different from context.Canceled. // error calls arch.Error if it is set and the error is different from context.Canceled.
func (arch *Archiver) error(item string, fi os.FileInfo, err error) error { func (arch *Archiver) error(item string, err error) error {
if arch.Error == nil || err == nil { if arch.Error == nil || err == nil {
return err return err
} }
@ -166,7 +165,7 @@ func (arch *Archiver) error(item string, fi os.FileInfo, err error) error {
return err return err
} }
errf := arch.Error(item, fi, err) errf := arch.Error(item, err)
if err != errf { if err != errf {
debug.Log("item %v: error was filtered by handler, before: %q, after: %v", item, err, errf) debug.Log("item %v: error was filtered by handler, before: %q, after: %v", item, err, errf)
} }
@ -175,31 +174,27 @@ func (arch *Archiver) error(item string, fi os.FileInfo, err error) error {
// saveTree stores a tree in the repo. It checks the index and the known blobs // saveTree stores a tree in the repo. It checks the index and the known blobs
// before saving anything. // before saving anything.
func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID, ItemStats, error) { func (arch *Archiver) saveTree(ctx context.Context, t *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) {
var s ItemStats var s ItemStats
buf, err := json.Marshal(t) buf, err := t.Finalize()
if err != nil { if err != nil {
return restic.ID{}, s, errors.Wrap(err, "MarshalJSON") return restic.ID{}, s, err
} }
// append a newline so that the data is always consistent (json.Encoder
// adds a newline after each object)
buf = append(buf, '\n')
b := &Buffer{Data: buf} b := &Buffer{Data: buf}
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
res.Wait(ctx) sbr := res.Take(ctx)
if !res.Known() { if !sbr.known {
s.TreeBlobs++ s.TreeBlobs++
s.TreeSize += uint64(res.Length()) s.TreeSize += uint64(sbr.length)
s.TreeSizeInRepo += uint64(res.SizeInRepo()) s.TreeSizeInRepo += uint64(sbr.sizeInRepo)
} }
// The context was canceled in the meantime, res.ID() might be invalid // The context was canceled in the meantime, id might be invalid
if ctx.Err() != nil { if ctx.Err() != nil {
return restic.ID{}, s, ctx.Err() return restic.ID{}, s, ctx.Err()
} }
return res.ID(), s, nil return sbr.id, s, nil
} }
// nodeFromFileInfo returns the restic node from an os.FileInfo. // nodeFromFileInfo returns the restic node from an os.FileInfo.
@ -239,17 +234,17 @@ func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error {
// SaveDir stores a directory in the repo and returns the node. snPath is the // SaveDir stores a directory in the repo and returns the node. snPath is the
// path within the current snapshot. // path within the current snapshot.
func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree, complete CompleteFunc) (d FutureTree, err error) { func (arch *Archiver) SaveDir(ctx context.Context, snPath string, dir string, fi os.FileInfo, previous *restic.Tree, complete CompleteFunc) (d FutureNode, err error) {
debug.Log("%v %v", snPath, dir) debug.Log("%v %v", snPath, dir)
treeNode, err := arch.nodeFromFileInfo(dir, fi) treeNode, err := arch.nodeFromFileInfo(dir, fi)
if err != nil { if err != nil {
return FutureTree{}, err return FutureNode{}, err
} }
names, err := readdirnames(arch.FS, dir, fs.O_NOFOLLOW) names, err := readdirnames(arch.FS, dir, fs.O_NOFOLLOW)
if err != nil { if err != nil {
return FutureTree{}, err return FutureNode{}, err
} }
sort.Strings(names) sort.Strings(names)
@ -259,7 +254,7 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo
// test if context has been cancelled // test if context has been cancelled
if ctx.Err() != nil { if ctx.Err() != nil {
debug.Log("context has been cancelled, aborting") debug.Log("context has been cancelled, aborting")
return FutureTree{}, ctx.Err() return FutureNode{}, ctx.Err()
} }
pathname := arch.FS.Join(dir, name) pathname := arch.FS.Join(dir, name)
@ -269,13 +264,13 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo
// return error early if possible // return error early if possible
if err != nil { if err != nil {
err = arch.error(pathname, fi, err) err = arch.error(pathname, err)
if err == nil { if err == nil {
// ignore error // ignore error
continue continue
} }
return FutureTree{}, err return FutureNode{}, err
} }
if excluded { if excluded {
@ -285,54 +280,58 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo
nodes = append(nodes, fn) nodes = append(nodes, fn)
} }
ft := arch.treeSaver.Save(ctx, snPath, treeNode, nodes, complete) fn := arch.treeSaver.Save(ctx, snPath, dir, treeNode, nodes, complete)
return ft, nil return fn, nil
} }
// FutureNode holds a reference to a node, FutureFile, or FutureTree. // FutureNode holds a reference to a channel that returns a FutureNodeResult
// or a reference to an already existing result. If the result is available
// immediatelly, then storing a reference directly requires less memory than
// using the indirection via a channel.
type FutureNode struct { type FutureNode struct {
snPath, target string ch <-chan futureNodeResult
res *futureNodeResult
}
// kept to call the error callback function type futureNodeResult struct {
absTarget string snPath, target string
fi os.FileInfo
node *restic.Node node *restic.Node
stats ItemStats stats ItemStats
err error err error
isFile bool
file FutureFile
isTree bool
tree FutureTree
} }
func (fn *FutureNode) wait(ctx context.Context) { func newFutureNode() (FutureNode, chan<- futureNodeResult) {
switch { ch := make(chan futureNodeResult, 1)
case fn.isFile: return FutureNode{ch: ch}, ch
// wait for and collect the data for the file }
fn.file.Wait(ctx)
fn.node = fn.file.Node()
fn.err = fn.file.Err()
fn.stats = fn.file.Stats()
// ensure the other stuff can be garbage-collected func newFutureNodeWithResult(res futureNodeResult) FutureNode {
fn.file = FutureFile{} return FutureNode{
fn.isFile = false res: &res,
case fn.isTree:
// wait for and collect the data for the dir
fn.tree.Wait(ctx)
fn.node = fn.tree.Node()
fn.stats = fn.tree.Stats()
// ensure the other stuff can be garbage-collected
fn.tree = FutureTree{}
fn.isTree = false
} }
} }
func (fn *FutureNode) take(ctx context.Context) futureNodeResult {
if fn.res != nil {
res := fn.res
// free result
fn.res = nil
return *res
}
select {
case res, ok := <-fn.ch:
if ok {
// free channel
fn.ch = nil
return res
}
case <-ctx.Done():
}
return futureNodeResult{}
}
// allBlobsPresent checks if all blobs (contents) of the given node are // allBlobsPresent checks if all blobs (contents) of the given node are
// present in the index. // present in the index.
func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool { func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool {
@ -355,19 +354,12 @@ func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool {
func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) { func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) {
start := time.Now() start := time.Now()
fn = FutureNode{
snPath: snPath,
target: target,
}
debug.Log("%v target %q, previous %v", snPath, target, previous) debug.Log("%v target %q, previous %v", snPath, target, previous)
abstarget, err := arch.FS.Abs(target) abstarget, err := arch.FS.Abs(target)
if err != nil { if err != nil {
return FutureNode{}, false, err return FutureNode{}, false, err
} }
fn.absTarget = abstarget
// exclude files by path before running Lstat to reduce number of lstat calls // exclude files by path before running Lstat to reduce number of lstat calls
if !arch.SelectByName(abstarget) { if !arch.SelectByName(abstarget) {
debug.Log("%v is excluded by path", target) debug.Log("%v is excluded by path", target)
@ -378,7 +370,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
fi, err := arch.FS.Lstat(target) fi, err := arch.FS.Lstat(target)
if err != nil { if err != nil {
debug.Log("lstat() for %v returned error: %v", target, err) debug.Log("lstat() for %v returned error: %v", target, err)
err = arch.error(abstarget, fi, err) err = arch.error(abstarget, err)
if err != nil { if err != nil {
return FutureNode{}, false, errors.Wrap(err, "Lstat") return FutureNode{}, false, errors.Wrap(err, "Lstat")
} }
@ -401,21 +393,26 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
debug.Log("%v hasn't changed, using old list of blobs", target) debug.Log("%v hasn't changed, using old list of blobs", target)
arch.CompleteItem(snPath, previous, previous, ItemStats{}, time.Since(start)) arch.CompleteItem(snPath, previous, previous, ItemStats{}, time.Since(start))
arch.CompleteBlob(snPath, previous.Size) arch.CompleteBlob(snPath, previous.Size)
fn.node, err = arch.nodeFromFileInfo(target, fi) node, err := arch.nodeFromFileInfo(target, fi)
if err != nil { if err != nil {
return FutureNode{}, false, err return FutureNode{}, false, err
} }
// copy list of blobs // copy list of blobs
fn.node.Content = previous.Content node.Content = previous.Content
fn = newFutureNodeWithResult(futureNodeResult{
snPath: snPath,
target: target,
node: node,
})
return fn, false, nil return fn, false, nil
} }
debug.Log("%v hasn't changed, but contents are missing!", target) debug.Log("%v hasn't changed, but contents are missing!", target)
// There are contents missing - inform user! // There are contents missing - inform user!
err := errors.Errorf("parts of %v not found in the repository index; storing the file again", target) err := errors.Errorf("parts of %v not found in the repository index; storing the file again", target)
err = arch.error(abstarget, fi, err) err = arch.error(abstarget, err)
if err != nil { if err != nil {
return FutureNode{}, false, err return FutureNode{}, false, err
} }
@ -426,7 +423,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
file, err := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW, 0) file, err := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW, 0)
if err != nil { if err != nil {
debug.Log("Openfile() for %v returned error: %v", target, err) debug.Log("Openfile() for %v returned error: %v", target, err)
err = arch.error(abstarget, fi, err) err = arch.error(abstarget, err)
if err != nil { if err != nil {
return FutureNode{}, false, errors.Wrap(err, "Lstat") return FutureNode{}, false, errors.Wrap(err, "Lstat")
} }
@ -437,7 +434,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
if err != nil { if err != nil {
debug.Log("stat() on opened file %v returned error: %v", target, err) debug.Log("stat() on opened file %v returned error: %v", target, err)
_ = file.Close() _ = file.Close()
err = arch.error(abstarget, fi, err) err = arch.error(abstarget, err)
if err != nil { if err != nil {
return FutureNode{}, false, errors.Wrap(err, "Lstat") return FutureNode{}, false, errors.Wrap(err, "Lstat")
} }
@ -448,16 +445,15 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
if !fs.IsRegularFile(fi) { if !fs.IsRegularFile(fi) {
err = errors.Errorf("file %v changed type, refusing to archive") err = errors.Errorf("file %v changed type, refusing to archive")
_ = file.Close() _ = file.Close()
err = arch.error(abstarget, fi, err) err = arch.error(abstarget, err)
if err != nil { if err != nil {
return FutureNode{}, false, err return FutureNode{}, false, err
} }
return FutureNode{}, true, nil return FutureNode{}, true, nil
} }
fn.isFile = true
// Save will close the file, we don't need to do that // Save will close the file, we don't need to do that
fn.file = arch.fileSaver.Save(ctx, snPath, file, fi, func() { fn = arch.fileSaver.Save(ctx, snPath, target, file, fi, func() {
arch.StartFile(snPath) arch.StartFile(snPath)
}, func(node *restic.Node, stats ItemStats) { }, func(node *restic.Node, stats ItemStats) {
arch.CompleteItem(snPath, previous, node, stats, time.Since(start)) arch.CompleteItem(snPath, previous, node, stats, time.Since(start))
@ -470,14 +466,13 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
start := time.Now() start := time.Now()
oldSubtree, err := arch.loadSubtree(ctx, previous) oldSubtree, err := arch.loadSubtree(ctx, previous)
if err != nil { if err != nil {
err = arch.error(abstarget, fi, err) err = arch.error(abstarget, err)
} }
if err != nil { if err != nil {
return FutureNode{}, false, err return FutureNode{}, false, err
} }
fn.isTree = true fn, err = arch.SaveDir(ctx, snPath, target, fi, oldSubtree,
fn.tree, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree,
func(node *restic.Node, stats ItemStats) { func(node *restic.Node, stats ItemStats) {
arch.CompleteItem(snItem, previous, node, stats, time.Since(start)) arch.CompleteItem(snItem, previous, node, stats, time.Since(start))
}) })
@ -493,10 +488,15 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
default: default:
debug.Log(" %v other", target) debug.Log(" %v other", target)
fn.node, err = arch.nodeFromFileInfo(target, fi) node, err := arch.nodeFromFileInfo(target, fi)
if err != nil { if err != nil {
return FutureNode{}, false, err return FutureNode{}, false, err
} }
fn = newFutureNodeWithResult(futureNodeResult{
snPath: snPath,
target: target,
node: node,
})
} }
debug.Log("return after %.3f", time.Since(start).Seconds()) debug.Log("return after %.3f", time.Since(start).Seconds())
@ -579,7 +579,7 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
fn, excluded, err := arch.Save(ctx, join(snPath, name), subatree.Path, previous.Find(name)) fn, excluded, err := arch.Save(ctx, join(snPath, name), subatree.Path, previous.Find(name))
if err != nil { if err != nil {
err = arch.error(subatree.Path, fn.fi, err) err = arch.error(subatree.Path, err)
if err == nil { if err == nil {
// ignore error // ignore error
continue continue
@ -603,7 +603,7 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
oldNode := previous.Find(name) oldNode := previous.Find(name)
oldSubtree, err := arch.loadSubtree(ctx, oldNode) oldSubtree, err := arch.loadSubtree(ctx, oldNode)
if err != nil { if err != nil {
err = arch.error(join(snPath, name), nil, err) err = arch.error(join(snPath, name), err)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -615,7 +615,11 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
return nil, err return nil, err
} }
id, nodeStats, err := arch.saveTree(ctx, subtree) tb, err := restic.TreeToBuilder(subtree)
if err != nil {
return nil, err
}
id, nodeStats, err := arch.saveTree(ctx, tb)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -653,28 +657,28 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
// process all futures // process all futures
for name, fn := range futureNodes { for name, fn := range futureNodes {
fn.wait(ctx) fnr := fn.take(ctx)
// return the error, or ignore it // return the error, or ignore it
if fn.err != nil { if fnr.err != nil {
fn.err = arch.error(fn.target, fn.fi, fn.err) fnr.err = arch.error(fnr.target, fnr.err)
if fn.err == nil { if fnr.err == nil {
// ignore error // ignore error
continue continue
} }
return nil, fn.err return nil, fnr.err
} }
// when the error is ignored, the node could not be saved, so ignore it // when the error is ignored, the node could not be saved, so ignore it
if fn.node == nil { if fnr.node == nil {
debug.Log("%v excluded: %v", fn.snPath, fn.target) debug.Log("%v excluded: %v", fnr.snPath, fnr.target)
continue continue
} }
fn.node.Name = name fnr.node.Name = name
err := tree.Insert(fn.node) err := tree.Insert(fnr.node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -765,7 +769,7 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID)
tree, err := restic.LoadTree(ctx, arch.Repo, *sn.Tree) tree, err := restic.LoadTree(ctx, arch.Repo, *sn.Tree)
if err != nil { if err != nil {
debug.Log("unable to load tree %v: %v", *sn.Tree, err) debug.Log("unable to load tree %v: %v", *sn.Tree, err)
_ = arch.error("/", nil, arch.wrapLoadTreeError(*sn.Tree, err)) _ = arch.error("/", arch.wrapLoadTreeError(*sn.Tree, err))
return nil return nil
} }
return tree return tree
@ -829,7 +833,11 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
return errors.New("snapshot is empty") return errors.New("snapshot is empty")
} }
rootTreeID, stats, err = arch.saveTree(wgCtx, tree) tb, err := restic.TreeToBuilder(tree)
if err != nil {
return err
}
rootTreeID, stats, err = arch.saveTree(wgCtx, tb)
arch.stopWorkers() arch.stopWorkers()
return err return err
}) })

View File

@ -47,7 +47,7 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
arch := New(repo, filesystem, Options{}) arch := New(repo, filesystem, Options{})
arch.runWorkers(ctx, wg) arch.runWorkers(ctx, wg)
arch.Error = func(item string, fi os.FileInfo, err error) error { arch.Error = func(item string, err error) error {
t.Errorf("archiver error for %v: %v", item, err) t.Errorf("archiver error for %v: %v", item, err)
return err return err
} }
@ -80,11 +80,11 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
t.Fatal(err) t.Fatal(err)
} }
res := arch.fileSaver.Save(ctx, "/", file, fi, start, complete) res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, complete)
res.Wait(ctx) fnr := res.take(ctx)
if res.Err() != nil { if fnr.err != nil {
t.Fatal(res.Err()) t.Fatal(fnr.err)
} }
arch.stopWorkers() arch.stopWorkers()
@ -109,15 +109,15 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
t.Errorf("no node returned for complete callback") t.Errorf("no node returned for complete callback")
} }
if completeCallbackNode != nil && !res.Node().Equals(*completeCallbackNode) { if completeCallbackNode != nil && !fnr.node.Equals(*completeCallbackNode) {
t.Errorf("different node returned for complete callback") t.Errorf("different node returned for complete callback")
} }
if completeCallbackStats != res.Stats() { if completeCallbackStats != fnr.stats {
t.Errorf("different stats return for complete callback, want:\n %v\ngot:\n %v", res.Stats(), completeCallbackStats) t.Errorf("different stats return for complete callback, want:\n %v\ngot:\n %v", fnr.stats, completeCallbackStats)
} }
return res.Node(), res.Stats() return fnr.node, fnr.stats
} }
func TestArchiverSaveFile(t *testing.T) { func TestArchiverSaveFile(t *testing.T) {
@ -217,7 +217,7 @@ func TestArchiverSave(t *testing.T) {
repo.StartPackUploader(ctx, wg) repo.StartPackUploader(ctx, wg)
arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{})
arch.Error = func(item string, fi os.FileInfo, err error) error { arch.Error = func(item string, err error) error {
t.Errorf("archiver error for %v: %v", item, err) t.Errorf("archiver error for %v: %v", item, err)
return err return err
} }
@ -232,16 +232,16 @@ func TestArchiverSave(t *testing.T) {
t.Errorf("Save() excluded the node, that's unexpected") t.Errorf("Save() excluded the node, that's unexpected")
} }
node.wait(ctx) fnr := node.take(ctx)
if node.err != nil { if fnr.err != nil {
t.Fatal(node.err) t.Fatal(fnr.err)
} }
if node.node == nil { if fnr.node == nil {
t.Fatalf("returned node is nil") t.Fatalf("returned node is nil")
} }
stats := node.stats stats := fnr.stats
arch.stopWorkers() arch.stopWorkers()
err = repo.Flush(ctx) err = repo.Flush(ctx)
@ -249,7 +249,7 @@ func TestArchiverSave(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
TestEnsureFileContent(ctx, t, repo, "file", node.node, testfile) TestEnsureFileContent(ctx, t, repo, "file", fnr.node, testfile)
if stats.DataSize != uint64(len(testfile.Content)) { if stats.DataSize != uint64(len(testfile.Content)) {
t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize) t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize)
} }
@ -295,7 +295,7 @@ func TestArchiverSaveReaderFS(t *testing.T) {
} }
arch := New(repo, readerFs, Options{}) arch := New(repo, readerFs, Options{})
arch.Error = func(item string, fi os.FileInfo, err error) error { arch.Error = func(item string, err error) error {
t.Errorf("archiver error for %v: %v", item, err) t.Errorf("archiver error for %v: %v", item, err)
return err return err
} }
@ -311,16 +311,16 @@ func TestArchiverSaveReaderFS(t *testing.T) {
t.Errorf("Save() excluded the node, that's unexpected") t.Errorf("Save() excluded the node, that's unexpected")
} }
node.wait(ctx) fnr := node.take(ctx)
if node.err != nil { if fnr.err != nil {
t.Fatal(node.err) t.Fatal(fnr.err)
} }
if node.node == nil { if fnr.node == nil {
t.Fatalf("returned node is nil") t.Fatalf("returned node is nil")
} }
stats := node.stats stats := fnr.stats
arch.stopWorkers() arch.stopWorkers()
err = repo.Flush(ctx) err = repo.Flush(ctx)
@ -328,7 +328,7 @@ func TestArchiverSaveReaderFS(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
TestEnsureFileContent(ctx, t, repo, "file", node.node, TestFile{Content: test.Data}) TestEnsureFileContent(ctx, t, repo, "file", fnr.node, TestFile{Content: test.Data})
if stats.DataSize != uint64(len(test.Data)) { if stats.DataSize != uint64(len(test.Data)) {
t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize) t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize)
} }
@ -851,13 +851,13 @@ func TestArchiverSaveDir(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
ft, err := arch.SaveDir(ctx, "/", fi, test.target, nil, nil) ft, err := arch.SaveDir(ctx, "/", test.target, fi, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ft.Wait(ctx) fnr := ft.take(ctx)
node, stats := ft.Node(), ft.Stats() node, stats := fnr.node, fnr.stats
t.Logf("stats: %v", stats) t.Logf("stats: %v", stats)
if stats.DataSize != 0 { if stats.DataSize != 0 {
@ -928,13 +928,13 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
ft, err := arch.SaveDir(ctx, "/", fi, tempdir, nil, nil) ft, err := arch.SaveDir(ctx, "/", tempdir, fi, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ft.Wait(ctx) fnr := ft.take(ctx)
node, stats := ft.Node(), ft.Stats() node, stats := fnr.node, fnr.stats
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1723,7 +1723,7 @@ func TestArchiverParent(t *testing.T) {
func TestArchiverErrorReporting(t *testing.T) { func TestArchiverErrorReporting(t *testing.T) {
ignoreErrorForBasename := func(basename string) ErrorFunc { ignoreErrorForBasename := func(basename string) ErrorFunc {
return func(item string, fi os.FileInfo, err error) error { return func(item string, err error) error {
if filepath.Base(item) == "targetfile" { if filepath.Base(item) == "targetfile" {
t.Logf("ignoring error for targetfile: %v", err) t.Logf("ignoring error for targetfile: %v", err)
return nil return nil
@ -2248,7 +2248,7 @@ func TestRacyFileSwap(t *testing.T) {
repo.StartPackUploader(ctx, wg) repo.StartPackUploader(ctx, wg)
arch := New(repo, fs.Track{FS: statfs}, Options{}) arch := New(repo, fs.Track{FS: statfs}, Options{})
arch.Error = func(item string, fi os.FileInfo, err error) error { arch.Error = func(item string, err error) error {
t.Logf("archiver error as expected for %v: %v", item, err) t.Logf("archiver error as expected for %v: %v", item, err)
return err return err
} }

View File

@ -11,7 +11,6 @@ import (
// Saver allows saving a blob. // Saver allows saving a blob.
type Saver interface { type Saver interface {
SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error)
Index() restic.MasterIndex
} }
// BlobSaver concurrently saves incoming blobs to the repo. // BlobSaver concurrently saves incoming blobs to the repo.
@ -45,9 +44,7 @@ func (s *BlobSaver) TriggerShutdown() {
// Save stores a blob in the repo. It checks the index and the known blobs // Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. It takes ownership of the buffer passed in. // before saving anything. It takes ownership of the buffer passed in.
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
// buf might be freed once the job was submitted, thus calculate the length now ch := make(chan SaveBlobResponse, 1)
length := len(buf.Data)
ch := make(chan saveBlobResponse, 1)
select { select {
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
case <-ctx.Done(): case <-ctx.Done():
@ -56,72 +53,62 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) Fu
return FutureBlob{ch: ch} return FutureBlob{ch: ch}
} }
return FutureBlob{ch: ch, length: length} return FutureBlob{ch: ch}
} }
// FutureBlob is returned by SaveBlob and will return the data once it has been processed. // FutureBlob is returned by SaveBlob and will return the data once it has been processed.
type FutureBlob struct { type FutureBlob struct {
ch <-chan saveBlobResponse ch <-chan SaveBlobResponse
length int
res saveBlobResponse
} }
// Wait blocks until the result is available or the context is cancelled. func (s *FutureBlob) Poll() *SaveBlobResponse {
func (s *FutureBlob) Wait(ctx context.Context) {
select { select {
case <-ctx.Done():
return
case res, ok := <-s.ch: case res, ok := <-s.ch:
if ok { if ok {
s.res = res return &res
} }
default:
} }
return nil
} }
// ID returns the ID of the blob after it has been saved. // Take blocks until the result is available or the context is cancelled.
func (s *FutureBlob) ID() restic.ID { func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse {
return s.res.id select {
} case res, ok := <-s.ch:
if ok {
// Known returns whether or not the blob was already known. return res
func (s *FutureBlob) Known() bool { }
return s.res.known case <-ctx.Done():
} }
return SaveBlobResponse{}
// Length returns the raw length of the blob.
func (s *FutureBlob) Length() int {
return s.length
}
// SizeInRepo returns the number of bytes added to the repo (including
// compression and crypto overhead).
func (s *FutureBlob) SizeInRepo() int {
return s.res.size
} }
type saveBlobJob struct { type saveBlobJob struct {
restic.BlobType restic.BlobType
buf *Buffer buf *Buffer
ch chan<- saveBlobResponse ch chan<- SaveBlobResponse
} }
type saveBlobResponse struct { type SaveBlobResponse struct {
id restic.ID id restic.ID
known bool length int
size int sizeInRepo int
known bool
} }
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) { func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (SaveBlobResponse, error) {
id, known, size, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false) id, known, sizeInRepo, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false)
if err != nil { if err != nil {
return saveBlobResponse{}, err return SaveBlobResponse{}, err
} }
return saveBlobResponse{ return SaveBlobResponse{
id: id, id: id,
known: known, length: len(buf),
size: size, sizeInRepo: sizeInRepo,
known: known,
}, nil }, nil
} }

View File

@ -54,8 +54,8 @@ func TestBlobSaver(t *testing.T) {
} }
for i, blob := range results { for i, blob := range results {
blob.Wait(ctx) sbr := blob.Take(ctx)
if blob.Known() { if sbr.known {
t.Errorf("blob %v is known, that should not be the case", i) t.Errorf("blob %v is known, that should not be the case", i)
} }
} }

View File

@ -13,41 +13,6 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
// FutureFile is returned by Save and will return the data once it
// has been processed.
type FutureFile struct {
ch <-chan saveFileResponse
res saveFileResponse
}
// Wait blocks until the result of the save operation is received or ctx is
// cancelled.
func (s *FutureFile) Wait(ctx context.Context) {
select {
case res, ok := <-s.ch:
if ok {
s.res = res
}
case <-ctx.Done():
return
}
}
// Node returns the node once it is available.
func (s *FutureFile) Node() *restic.Node {
return s.res.node
}
// Stats returns the stats for the file once they are available.
func (s *FutureFile) Stats() ItemStats {
return s.res.stats
}
// Err returns the error in case an error occurred.
func (s *FutureFile) Err() error {
return s.res.err
}
// SaveBlobFn saves a blob to a repo. // SaveBlobFn saves a blob to a repo.
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob
@ -102,10 +67,11 @@ type CompleteFunc func(*restic.Node, ItemStats)
// Save stores the file f and returns the data once it has been completed. The // Save stores the file f and returns the data once it has been completed. The
// file is closed by Save. // file is closed by Save.
func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureFile { func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureNode {
ch := make(chan saveFileResponse, 1) fn, ch := newFutureNode()
job := saveFileJob{ job := saveFileJob{
snPath: snPath, snPath: snPath,
target: target,
file: file, file: file,
fi: fi, fi: fi,
start: start, start: start,
@ -121,47 +87,57 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os
close(ch) close(ch)
} }
return FutureFile{ch: ch} return fn
} }
type saveFileJob struct { type saveFileJob struct {
snPath string snPath string
target string
file fs.File file fs.File
fi os.FileInfo fi os.FileInfo
ch chan<- saveFileResponse ch chan<- futureNodeResult
complete CompleteFunc complete CompleteFunc
start func() start func()
} }
type saveFileResponse struct {
node *restic.Node
stats ItemStats
err error
}
// saveFile stores the file f in the repo, then closes it. // saveFile stores the file f in the repo, then closes it.
func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, f fs.File, fi os.FileInfo, start func()) saveFileResponse { func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult {
start() start()
stats := ItemStats{} stats := ItemStats{}
fnr := futureNodeResult{
snPath: snPath,
target: target,
}
debug.Log("%v", snPath) debug.Log("%v", snPath)
node, err := s.NodeFromFileInfo(f.Name(), fi) node, err := s.NodeFromFileInfo(f.Name(), fi)
if err != nil { if err != nil {
_ = f.Close() _ = f.Close()
return saveFileResponse{err: err} fnr.err = err
return fnr
} }
if node.Type != "file" { if node.Type != "file" {
_ = f.Close() _ = f.Close()
return saveFileResponse{err: errors.Errorf("node type %q is wrong", node.Type)} fnr.err = errors.Errorf("node type %q is wrong", node.Type)
return fnr
} }
// reuse the chunker // reuse the chunker
chnker.Reset(f, s.pol) chnker.Reset(f, s.pol)
var results []FutureBlob var results []FutureBlob
complete := func(sbr SaveBlobResponse) {
if !sbr.known {
stats.DataBlobs++
stats.DataSize += uint64(sbr.length)
stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
}
node.Content = append(node.Content, sbr.id)
}
node.Content = []restic.ID{} node.Content = []restic.ID{}
var size uint64 var size uint64
@ -179,13 +155,15 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
if err != nil { if err != nil {
_ = f.Close() _ = f.Close()
return saveFileResponse{err: err} fnr.err = err
return fnr
} }
// test if the context has been cancelled, return the error // test if the context has been cancelled, return the error
if ctx.Err() != nil { if ctx.Err() != nil {
_ = f.Close() _ = f.Close()
return saveFileResponse{err: ctx.Err()} fnr.err = ctx.Err()
return fnr
} }
res := s.saveBlob(ctx, restic.DataBlob, buf) res := s.saveBlob(ctx, restic.DataBlob, buf)
@ -194,34 +172,40 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
// test if the context has been cancelled, return the error // test if the context has been cancelled, return the error
if ctx.Err() != nil { if ctx.Err() != nil {
_ = f.Close() _ = f.Close()
return saveFileResponse{err: ctx.Err()} fnr.err = ctx.Err()
return fnr
} }
s.CompleteBlob(f.Name(), uint64(len(chunk.Data))) s.CompleteBlob(f.Name(), uint64(len(chunk.Data)))
// collect already completed blobs
for len(results) > 0 {
sbr := results[0].Poll()
if sbr == nil {
break
}
results[0] = FutureBlob{}
results = results[1:]
complete(*sbr)
}
} }
err = f.Close() err = f.Close()
if err != nil { if err != nil {
return saveFileResponse{err: err} fnr.err = err
return fnr
} }
for _, res := range results { for i, res := range results {
res.Wait(ctx) results[i] = FutureBlob{}
if !res.Known() { sbr := res.Take(ctx)
stats.DataBlobs++ complete(sbr)
stats.DataSize += uint64(res.Length())
stats.DataSizeInRepo += uint64(res.SizeInRepo())
}
node.Content = append(node.Content, res.ID())
} }
node.Size = size node.Size = size
fnr.node = node
return saveFileResponse{ fnr.stats = stats
node: node, return fnr
stats: stats,
}
} }
func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
@ -239,7 +223,8 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
return return
} }
} }
res := s.saveFile(ctx, chnker, job.snPath, job.file, job.fi, job.start)
res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start)
if job.complete != nil { if job.complete != nil {
job.complete(res.node, res.stats) job.complete(res.node, res.stats)
} }

View File

@ -34,7 +34,7 @@ func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Cont
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan saveBlobResponse) ch := make(chan SaveBlobResponse)
close(ch) close(ch)
return FutureBlob{ch: ch} return FutureBlob{ch: ch}
} }
@ -64,7 +64,7 @@ func TestFileSaver(t *testing.T) {
testFs := fs.Local{} testFs := fs.Local{}
s, ctx, wg := startFileSaver(ctx, t) s, ctx, wg := startFileSaver(ctx, t)
var results []FutureFile var results []FutureNode
for _, filename := range files { for _, filename := range files {
f, err := testFs.Open(filename) f, err := testFs.Open(filename)
@ -77,14 +77,14 @@ func TestFileSaver(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
ff := s.Save(ctx, filename, f, fi, startFn, completeFn) ff := s.Save(ctx, filename, filename, f, fi, startFn, completeFn)
results = append(results, ff) results = append(results, ff)
} }
for _, file := range results { for _, file := range results {
file.Wait(ctx) fnr := file.take(ctx)
if file.Err() != nil { if fnr.err != nil {
t.Errorf("unable to save file: %v", file.Err()) t.Errorf("unable to save file: %v", fnr.err)
} }
} }

View File

@ -27,7 +27,7 @@ func NewScanner(fs fs.FS) *Scanner {
FS: fs, FS: fs,
SelectByName: func(item string) bool { return true }, SelectByName: func(item string) bool { return true },
Select: func(item string, fi os.FileInfo) bool { return true }, Select: func(item string, fi os.FileInfo) bool { return true },
Error: func(item string, fi os.FileInfo, err error) error { return err }, Error: func(item string, err error) error { return err },
Result: func(item string, s ScanStats) {}, Result: func(item string, s ScanStats) {},
} }
} }
@ -111,7 +111,7 @@ func (s *Scanner) scan(ctx context.Context, stats ScanStats, target string) (Sca
// get file information // get file information
fi, err := s.FS.Lstat(target) fi, err := s.FS.Lstat(target)
if err != nil { if err != nil {
return stats, s.Error(target, fi, err) return stats, s.Error(target, err)
} }
// run remaining select functions that require file information // run remaining select functions that require file information
@ -126,7 +126,7 @@ func (s *Scanner) scan(ctx context.Context, stats ScanStats, target string) (Sca
case fi.Mode().IsDir(): case fi.Mode().IsDir():
names, err := readdirnames(s.FS, target, fs.O_NOFOLLOW) names, err := readdirnames(s.FS, target, fs.O_NOFOLLOW)
if err != nil { if err != nil {
return stats, s.Error(target, fi, err) return stats, s.Error(target, err)
} }
sort.Strings(names) sort.Strings(names)

View File

@ -133,7 +133,7 @@ func TestScannerError(t *testing.T) {
src TestDir src TestDir
result ScanStats result ScanStats
selFn SelectFunc selFn SelectFunc
errFn func(t testing.TB, item string, fi os.FileInfo, err error) error errFn func(t testing.TB, item string, err error) error
resFn func(t testing.TB, item string, s ScanStats) resFn func(t testing.TB, item string, s ScanStats)
prepare func(t testing.TB) prepare func(t testing.TB)
}{ }{
@ -173,7 +173,7 @@ func TestScannerError(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
}, },
errFn: func(t testing.TB, item string, fi os.FileInfo, err error) error { errFn: func(t testing.TB, item string, err error) error {
if item == filepath.FromSlash("work/subdir") { if item == filepath.FromSlash("work/subdir") {
return nil return nil
} }
@ -198,7 +198,7 @@ func TestScannerError(t *testing.T) {
} }
} }
}, },
errFn: func(t testing.TB, item string, fi os.FileInfo, err error) error { errFn: func(t testing.TB, item string, err error) error {
if item == "foo" { if item == "foo" {
t.Logf("ignoring error for %v: %v", item, err) t.Logf("ignoring error for %v: %v", item, err)
return nil return nil
@ -257,13 +257,13 @@ func TestScannerError(t *testing.T) {
} }
} }
if test.errFn != nil { if test.errFn != nil {
sc.Error = func(item string, fi os.FileInfo, err error) error { sc.Error = func(item string, err error) error {
p, relErr := filepath.Rel(cur, item) p, relErr := filepath.Rel(cur, item)
if relErr != nil { if relErr != nil {
panic(relErr) panic(relErr)
} }
return test.errFn(t, p, fi, err) return test.errFn(t, p, err)
} }
} }

View File

@ -8,38 +8,9 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
// FutureTree is returned by Save and will return the data once it
// has been processed.
type FutureTree struct {
ch <-chan saveTreeResponse
res saveTreeResponse
}
// Wait blocks until the data has been received or ctx is cancelled.
func (s *FutureTree) Wait(ctx context.Context) {
select {
case <-ctx.Done():
return
case res, ok := <-s.ch:
if ok {
s.res = res
}
}
}
// Node returns the node.
func (s *FutureTree) Node() *restic.Node {
return s.res.node
}
// Stats returns the stats for the file.
func (s *FutureTree) Stats() ItemStats {
return s.res.stats
}
// TreeSaver concurrently saves incoming trees to the repo. // TreeSaver concurrently saves incoming trees to the repo.
type TreeSaver struct { type TreeSaver struct {
saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error)
errFn ErrorFunc errFn ErrorFunc
ch chan<- saveTreeJob ch chan<- saveTreeJob
@ -47,7 +18,7 @@ type TreeSaver struct {
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
// started, it is stopped when ctx is cancelled. // started, it is stopped when ctx is cancelled.
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
ch := make(chan saveTreeJob) ch := make(chan saveTreeJob)
s := &TreeSaver{ s := &TreeSaver{
@ -70,10 +41,11 @@ func (s *TreeSaver) TriggerShutdown() {
} }
// Save stores the dir d and returns the data once it has been completed. // Save stores the dir d and returns the data once it has been completed.
func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode, complete CompleteFunc) FutureTree { func (s *TreeSaver) Save(ctx context.Context, snPath string, target string, node *restic.Node, nodes []FutureNode, complete CompleteFunc) FutureNode {
ch := make(chan saveTreeResponse, 1) fn, ch := newFutureNode()
job := saveTreeJob{ job := saveTreeJob{
snPath: snPath, snPath: snPath,
target: target,
node: node, node: node,
nodes: nodes, nodes: nodes,
ch: ch, ch: ch,
@ -86,57 +58,59 @@ func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node,
close(ch) close(ch)
} }
return FutureTree{ch: ch} return fn
} }
type saveTreeJob struct { type saveTreeJob struct {
snPath string snPath string
nodes []FutureNode target string
node *restic.Node node *restic.Node
ch chan<- saveTreeResponse nodes []FutureNode
ch chan<- futureNodeResult
complete CompleteFunc complete CompleteFunc
} }
type saveTreeResponse struct {
node *restic.Node
stats ItemStats
}
// save stores the nodes as a tree in the repo. // save stores the nodes as a tree in the repo.
func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) (*restic.Node, ItemStats, error) { func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, ItemStats, error) {
var stats ItemStats var stats ItemStats
node := job.node
nodes := job.nodes
// allow GC of nodes array once the loop is finished
job.nodes = nil
tree := restic.NewTree(len(nodes)) builder := restic.NewTreeJSONBuilder()
for _, fn := range nodes { for i, fn := range nodes {
fn.wait(ctx) // fn is a copy, so clear the original value explicitly
nodes[i] = FutureNode{}
fnr := fn.take(ctx)
// return the error if it wasn't ignored // return the error if it wasn't ignored
if fn.err != nil { if fnr.err != nil {
debug.Log("err for %v: %v", fn.snPath, fn.err) debug.Log("err for %v: %v", fnr.snPath, fnr.err)
fn.err = s.errFn(fn.target, fn.fi, fn.err) fnr.err = s.errFn(fnr.target, fnr.err)
if fn.err == nil { if fnr.err == nil {
// ignore error // ignore error
continue continue
} }
return nil, stats, fn.err return nil, stats, fnr.err
} }
// when the error is ignored, the node could not be saved, so ignore it // when the error is ignored, the node could not be saved, so ignore it
if fn.node == nil { if fnr.node == nil {
debug.Log("%v excluded: %v", fn.snPath, fn.target) debug.Log("%v excluded: %v", fnr.snPath, fnr.target)
continue continue
} }
debug.Log("insert %v", fn.node.Name) debug.Log("insert %v", fnr.node.Name)
err := tree.Insert(fn.node) err := builder.AddNode(fnr.node)
if err != nil { if err != nil {
return nil, stats, err return nil, stats, err
} }
} }
id, treeStats, err := s.saveTree(ctx, tree) id, treeStats, err := s.saveTree(ctx, builder)
stats.Add(treeStats) stats.Add(treeStats)
if err != nil { if err != nil {
return nil, stats, err return nil, stats, err
@ -158,7 +132,8 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error {
return nil return nil
} }
} }
node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes)
node, stats, err := s.save(ctx, &job)
if err != nil { if err != nil {
debug.Log("error saving tree blob: %v", err) debug.Log("error saving tree blob: %v", err)
close(job.ch) close(job.ch)
@ -168,9 +143,11 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error {
if job.complete != nil { if job.complete != nil {
job.complete(node, stats) job.complete(node, stats)
} }
job.ch <- saveTreeResponse{ job.ch <- futureNodeResult{
node: node, snPath: job.snPath,
stats: stats, target: job.target,
node: node,
stats: stats,
} }
close(job.ch) close(job.ch)
} }

View File

@ -3,7 +3,6 @@ package archiver
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"runtime" "runtime"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -19,29 +18,29 @@ func TestTreeSaver(t *testing.T) {
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) {
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
} }
errFn := func(snPath string, fi os.FileInfo, err error) error { errFn := func(snPath string, err error) error {
return nil return nil
} }
b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn)
var results []FutureTree var results []FutureNode
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
node := &restic.Node{ node := &restic.Node{
Name: fmt.Sprintf("file-%d", i), Name: fmt.Sprintf("file-%d", i),
} }
fb := b.Save(ctx, "/", node, nil, nil) fb := b.Save(ctx, "/", node.Name, node, nil, nil)
results = append(results, fb) results = append(results, fb)
} }
for _, tree := range results { for _, tree := range results {
tree.Wait(ctx) tree.take(ctx)
} }
b.TriggerShutdown() b.TriggerShutdown()
@ -74,7 +73,7 @@ func TestTreeSaverError(t *testing.T) {
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
var num int32 var num int32
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) {
val := atomic.AddInt32(&num, 1) val := atomic.AddInt32(&num, 1)
if val == test.failAt { if val == test.failAt {
t.Logf("sending error for request %v\n", test.failAt) t.Logf("sending error for request %v\n", test.failAt)
@ -83,26 +82,26 @@ func TestTreeSaverError(t *testing.T) {
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
} }
errFn := func(snPath string, fi os.FileInfo, err error) error { errFn := func(snPath string, err error) error {
t.Logf("ignoring error %v\n", err) t.Logf("ignoring error %v\n", err)
return nil return nil
} }
b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn)
var results []FutureTree var results []FutureNode
for i := 0; i < test.trees; i++ { for i := 0; i < test.trees; i++ {
node := &restic.Node{ node := &restic.Node{
Name: fmt.Sprintf("file-%d", i), Name: fmt.Sprintf("file-%d", i),
} }
fb := b.Save(ctx, "/", node, nil, nil) fb := b.Save(ctx, "/", node.Name, node, nil, nil)
results = append(results, fb) results = append(results, fb)
} }
for _, tree := range results { for _, tree := range results {
tree.Wait(ctx) tree.take(ctx)
} }
b.TriggerShutdown() b.TriggerShutdown()

View File

@ -1,6 +1,7 @@
package restic package restic
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -143,3 +144,52 @@ func SaveTree(ctx context.Context, r BlobSaver, t *Tree) (ID, error) {
id, _, _, err := r.SaveBlob(ctx, TreeBlob, buf, ID{}, false) id, _, _, err := r.SaveBlob(ctx, TreeBlob, buf, ID{}, false)
return id, err return id, err
} }
type TreeJSONBuilder struct {
buf bytes.Buffer
lastName string
}
func NewTreeJSONBuilder() *TreeJSONBuilder {
tb := &TreeJSONBuilder{}
_, _ = tb.buf.WriteString(`{"nodes":[`)
return tb
}
func (builder *TreeJSONBuilder) AddNode(node *Node) error {
if node.Name <= builder.lastName {
return errors.Errorf("nodes are not ordered got %q, last %q", node.Name, builder.lastName)
}
if builder.lastName != "" {
_ = builder.buf.WriteByte(',')
}
builder.lastName = node.Name
val, err := json.Marshal(node)
if err != nil {
return err
}
_, _ = builder.buf.Write(val)
return nil
}
func (builder *TreeJSONBuilder) Finalize() ([]byte, error) {
// append a newline so that the data is always consistent (json.Encoder
// adds a newline after each object)
_, _ = builder.buf.WriteString("]}\n")
buf := builder.buf.Bytes()
// drop reference to buffer
builder.buf = bytes.Buffer{}
return buf, nil
}
func TreeToBuilder(t *Tree) (*TreeJSONBuilder, error) {
builder := NewTreeJSONBuilder()
for _, node := range t.Nodes {
err := builder.AddNode(node)
if err != nil {
return nil, err
}
}
return builder, nil
}

View File

@ -119,6 +119,37 @@ func TestEmptyLoadTree(t *testing.T) {
tree, tree2) tree, tree2)
} }
func TestTreeEqualSerialization(t *testing.T) {
files := []string{"node.go", "tree.go", "tree_test.go"}
for i := 1; i <= len(files); i++ {
tree := restic.NewTree(i)
builder := restic.NewTreeJSONBuilder()
for _, fn := range files[:i] {
fi, err := os.Lstat(fn)
rtest.OK(t, err)
node, err := restic.NodeFromFileInfo(fn, fi)
rtest.OK(t, err)
rtest.OK(t, tree.Insert(node))
rtest.OK(t, builder.AddNode(node))
rtest.Assert(t, tree.Insert(node) != nil, "no error on duplicate node")
rtest.Assert(t, builder.AddNode(node) != nil, "no error on duplicate node")
}
treeBytes, err := json.Marshal(tree)
treeBytes = append(treeBytes, '\n')
rtest.OK(t, err)
stiBytes, err := builder.Finalize()
rtest.OK(t, err)
// compare serialization of an individual node and the SaveTreeIterator
rtest.Equals(t, treeBytes, stiBytes)
}
}
func BenchmarkBuildTree(b *testing.B) { func BenchmarkBuildTree(b *testing.B) {
const size = 100 // Directories of this size are not uncommon. const size = 100 // Directories of this size are not uncommon.

View File

@ -3,7 +3,6 @@ package backup
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"os"
"sort" "sort"
"time" "time"
@ -79,7 +78,7 @@ func (b *JSONProgress) Update(total, processed Counter, errors uint, currentFile
// ScannerError is the error callback function for the scanner, it prints the // ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil. // error in verbose mode and returns nil.
func (b *JSONProgress) ScannerError(item string, fi os.FileInfo, err error) error { func (b *JSONProgress) ScannerError(item string, err error) error {
b.error(errorUpdate{ b.error(errorUpdate{
MessageType: "error", MessageType: "error",
Error: err, Error: err,
@ -90,7 +89,7 @@ func (b *JSONProgress) ScannerError(item string, fi os.FileInfo, err error) erro
} }
// Error is the error callback function for the archiver, it prints the error and returns nil. // Error is the error callback function for the archiver, it prints the error and returns nil.
func (b *JSONProgress) Error(item string, fi os.FileInfo, err error) error { func (b *JSONProgress) Error(item string, err error) error {
b.error(errorUpdate{ b.error(errorUpdate{
MessageType: "error", MessageType: "error",
Error: err, Error: err,

View File

@ -3,7 +3,6 @@ package backup
import ( import (
"context" "context"
"io" "io"
"os"
"sync" "sync"
"time" "time"
@ -14,8 +13,8 @@ import (
type ProgressPrinter interface { type ProgressPrinter interface {
Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64)
Error(item string, fi os.FileInfo, err error) error Error(item string, err error) error
ScannerError(item string, fi os.FileInfo, err error) error ScannerError(item string, err error) error
CompleteItem(messageType string, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) CompleteItem(messageType string, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration)
ReportTotal(item string, start time.Time, s archiver.ScanStats) ReportTotal(item string, start time.Time, s archiver.ScanStats)
Finish(snapshotID restic.ID, start time.Time, summary *Summary, dryRun bool) Finish(snapshotID restic.ID, start time.Time, summary *Summary, dryRun bool)
@ -44,11 +43,11 @@ type ProgressReporter interface {
CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration)
StartFile(filename string) StartFile(filename string)
CompleteBlob(filename string, bytes uint64) CompleteBlob(filename string, bytes uint64)
ScannerError(item string, fi os.FileInfo, err error) error ScannerError(item string, err error) error
ReportTotal(item string, s archiver.ScanStats) ReportTotal(item string, s archiver.ScanStats)
SetMinUpdatePause(d time.Duration) SetMinUpdatePause(d time.Duration)
Run(ctx context.Context) error Run(ctx context.Context) error
Error(item string, fi os.FileInfo, err error) error Error(item string, err error) error
Finish(snapshotID restic.ID) Finish(snapshotID restic.ID)
} }
@ -173,13 +172,13 @@ func (p *Progress) Run(ctx context.Context) error {
// ScannerError is the error callback function for the scanner, it prints the // ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil. // error in verbose mode and returns nil.
func (p *Progress) ScannerError(item string, fi os.FileInfo, err error) error { func (p *Progress) ScannerError(item string, err error) error {
return p.printer.ScannerError(item, fi, err) return p.printer.ScannerError(item, err)
} }
// Error is the error callback function for the archiver, it prints the error and returns nil. // Error is the error callback function for the archiver, it prints the error and returns nil.
func (p *Progress) Error(item string, fi os.FileInfo, err error) error { func (p *Progress) Error(item string, err error) error {
cbErr := p.printer.Error(item, fi, err) cbErr := p.printer.Error(item, err)
select { select {
case p.errCh <- struct{}{}: case p.errCh <- struct{}{}:

View File

@ -2,7 +2,6 @@ package backup
import ( import (
"fmt" "fmt"
"os"
"sort" "sort"
"time" "time"
@ -75,13 +74,13 @@ func (b *TextProgress) Update(total, processed Counter, errors uint, currentFile
// ScannerError is the error callback function for the scanner, it prints the // ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil. // error in verbose mode and returns nil.
func (b *TextProgress) ScannerError(item string, fi os.FileInfo, err error) error { func (b *TextProgress) ScannerError(item string, err error) error {
b.V("scan: %v\n", err) b.V("scan: %v\n", err)
return nil return nil
} }
// Error is the error callback function for the archiver, it prints the error and returns nil. // Error is the error callback function for the archiver, it prints the error and returns nil.
func (b *TextProgress) Error(item string, fi os.FileInfo, err error) error { func (b *TextProgress) Error(item string, err error) error {
b.E("error: %v\n", err) b.E("error: %v\n", err)
return nil return nil
} }