diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go new file mode 100644 index 000000000..143c81e34 --- /dev/null +++ b/internal/archiver/archiver.go @@ -0,0 +1,788 @@ +package archiver + +import ( + "context" + "encoding/json" + "os" + "path" + "runtime" + "sort" + "syscall" + "time" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/restic" +) + +// SelectFunc returns true for all items that should be included (files and +// dirs). If false is returned, files are ignored and dirs are not even walked. +type SelectFunc func(item string, fi os.FileInfo) bool + +// ErrorFunc is called when an error during archiving occurs. When nil is +// returned, the archiver continues, otherwise it aborts and passes the error +// up the call stack. +type ErrorFunc func(file string, fi os.FileInfo, err error) error + +// ItemStats collects some statistics about a particular file or directory. +type ItemStats struct { + DataBlobs int // number of new data blobs added for this item + DataSize uint64 // sum of the sizes of all new data blobs + TreeBlobs int // number of new tree blobs added for this item + TreeSize uint64 // sum of the sizes of all new tree blobs +} + +// Add adds other to the current ItemStats. +func (s *ItemStats) Add(other ItemStats) { + s.DataBlobs += other.DataBlobs + s.DataSize += other.DataSize + s.TreeBlobs += other.TreeBlobs + s.TreeSize += other.TreeSize +} + +// Archiver saves a directory structure to the repo. +type Archiver struct { + Repo restic.Repository + Select SelectFunc + FS fs.FS + Options Options + + blobSaver *BlobSaver + fileSaver *FileSaver + + // Error is called for all errors that occur during backup. + Error ErrorFunc + + // CompleteItem is called for all files and dirs once they have been + // processed successfully. The parameter item contains the path as it will + // be in the snapshot after saving. s contains some statistics about this + // particular file/dir. + // + // CompleteItem may be called asynchronously from several different + // goroutines! + CompleteItem func(item string, previous, current *restic.Node, s ItemStats, d time.Duration) + + // StartFile is called when a file is being processed by a worker. + StartFile func(filename string) + + // CompleteBlob is called for all saved blobs for files. + CompleteBlob func(filename string, bytes uint64) + + // WithAtime configures if the access time for files and directories should + // be saved. Enabling it may result in much metadata, so it's off by + // default. + WithAtime bool +} + +// Options is used to configure the archiver. +type Options struct { + // FileReadConcurrency sets how many files are read in concurrently. If + // it's set to zero, at most two files are read in concurrently (which + // turned out to be a good default for most situations). + FileReadConcurrency uint + + // SaveBlobConcurrency sets how many blobs are hashed and saved + // concurrently. If it's set to zero, the default is the number of CPUs + // available in the system. + SaveBlobConcurrency uint +} + +// ApplyDefaults returns a copy of o with the default options set for all unset +// fields. +func (o Options) ApplyDefaults() Options { + if o.FileReadConcurrency == 0 { + // two is a sweet spot for almost all situations. We've done some + // experiments documented here: + // https://github.com/borgbackup/borg/issues/3500 + o.FileReadConcurrency = 2 + } + + if o.SaveBlobConcurrency == 0 { + o.SaveBlobConcurrency = uint(runtime.NumCPU()) + } + + return o +} + +// New initializes a new archiver. +func New(repo restic.Repository, fs fs.FS, opts Options) *Archiver { + arch := &Archiver{ + Repo: repo, + Select: func(string, os.FileInfo) bool { return true }, + FS: fs, + Options: opts.ApplyDefaults(), + + CompleteItem: func(string, *restic.Node, *restic.Node, ItemStats, time.Duration) {}, + StartFile: func(string) {}, + CompleteBlob: func(string, uint64) {}, + } + + return arch +} + +// Valid returns an error if anything is missing. +func (arch *Archiver) Valid() error { + if arch.blobSaver == nil { + return errors.New("blobSaver is nil") + } + + if arch.fileSaver == nil { + return errors.New("fileSaver is nil") + } + + if arch.Repo == nil { + return errors.New("repo is not set") + } + + if arch.Select == nil { + return errors.New("Select is not set") + } + + if arch.FS == nil { + return errors.New("FS is not set") + } + + return nil +} + +// error calls arch.Error if it is set. +func (arch *Archiver) error(item string, fi os.FileInfo, err error) error { + if arch.Error == nil || err == nil { + return err + } + + errf := arch.Error(item, fi, err) + if err != errf { + debug.Log("item %v: error was filtered by handler, before: %q, after: %v", item, err, errf) + } + return errf +} + +// saveTree stores a tree in the repo. It checks the index and the known blobs +// before saving anything. +func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID, ItemStats, error) { + var s ItemStats + buf, err := json.Marshal(t) + if err != nil { + return restic.ID{}, s, errors.Wrap(err, "MarshalJSON") + } + + // append a newline so that the data is always consistent (json.Encoder + // adds a newline after each object) + buf = append(buf, '\n') + + b := Buffer{Data: buf} + res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) + if res.Err() != nil { + return restic.ID{}, s, res.Err() + } + + if !res.Known() { + s.TreeBlobs++ + s.TreeSize += uint64(len(buf)) + } + return res.ID(), s, nil +} + +// nodeFromFileInfo returns the restic node from a os.FileInfo. +func (arch *Archiver) nodeFromFileInfo(filename string, fi os.FileInfo) (*restic.Node, error) { + node, err := restic.NodeFromFileInfo(filename, fi) + if !arch.WithAtime { + node.AccessTime = node.ModTime + } + return node, errors.Wrap(err, "NodeFromFileInfo") +} + +// loadSubtree tries to load the subtree referenced by node. In case of an error, nil is returned. +func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) *restic.Tree { + if node == nil || node.Type != "dir" || node.Subtree == nil { + return nil + } + + tree, err := arch.Repo.LoadTree(ctx, *node.Subtree) + if err != nil { + debug.Log("unable to load tree %v: %v", node.Subtree.Str(), err) + // TODO: handle error + return nil + } + + return tree +} + +// SaveDir stores a directory in the repo and returns the node. snPath is the +// path within the current snapshot. +func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (*restic.Node, ItemStats, error) { + debug.Log("%v %v", snPath, dir) + + var s ItemStats + + treeNode, err := arch.nodeFromFileInfo(dir, fi) + if err != nil { + return nil, s, err + } + + names, err := readdirnames(arch.FS, dir) + if err != nil { + return nil, s, err + } + + var futures []FutureNode + + tree := restic.NewTree() + + for _, name := range names { + pathname := arch.FS.Join(dir, name) + oldNode := previous.Find(name) + snItem := join(snPath, name) + fn, excluded, err := arch.Save(ctx, snItem, pathname, oldNode) + + // return error early if possible + if err != nil { + err = arch.error(pathname, fi, err) + if err == nil { + // ignore error + continue + } + + return nil, s, err + } + + if excluded { + continue + } + + futures = append(futures, fn) + } + + for _, fn := range futures { + fn.wait() + + // return the error if it wasn't ignored + if fn.err != nil { + fn.err = arch.error(fn.target, fn.fi, fn.err) + if fn.err == nil { + // ignore error + continue + } + + return nil, s, fn.err + } + + // when the error is ignored, the node could not be saved, so ignore it + if fn.node == nil { + debug.Log("%v excluded: %v", fn.snPath, fn.target) + continue + } + + err := tree.Insert(fn.node) + if err != nil { + return nil, s, err + } + } + + id, treeStats, err := arch.saveTree(ctx, tree) + if err != nil { + return nil, ItemStats{}, err + } + + s.Add(treeStats) + + treeNode.Subtree = &id + return treeNode, s, nil +} + +// FutureNode holds a reference to a node or a FutureFile. +type FutureNode struct { + snPath, target string + + // kept to call the error callback function + absTarget string + fi os.FileInfo + + node *restic.Node + stats ItemStats + err error + + isFile bool + file FutureFile +} + +func (fn *FutureNode) wait() { + if fn.isFile { + // wait for and collect the data for the file + fn.node = fn.file.Node() + fn.err = fn.file.Err() + fn.stats = fn.file.Stats() + } +} + +// Save saves a target (file or directory) to the repo. If the item is +// excluded,this function returns a nil node and error. +// +// Errors and completion is needs to be handled by the caller. +// +// snPath is the path within the current snapshot. +func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) { + fn = FutureNode{ + snPath: snPath, + target: target, + } + + debug.Log("%v target %q, previous %v", snPath, target, previous) + abstarget, err := arch.FS.Abs(target) + if err != nil { + return FutureNode{}, false, err + } + + fn.absTarget = abstarget + + var fi os.FileInfo + var errFI error + + file, errOpen := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW, 0) + if errOpen == nil { + fi, errFI = file.Stat() + } + + if !arch.Select(abstarget, fi) { + debug.Log("%v is excluded", target) + if file != nil { + _ = file.Close() + } + return FutureNode{}, true, nil + } + + if errOpen != nil { + debug.Log(" open error %#v", errOpen) + // test if the open failed because target is a symbolic link or a socket + if e, ok := errOpen.(*os.PathError); ok && (e.Err == syscall.ELOOP || e.Err == syscall.ENXIO) { + // in this case, redo the stat and carry on + fi, errFI = arch.FS.Lstat(target) + } else { + return FutureNode{}, false, errors.Wrap(errOpen, "OpenFile") + } + } + + if errFI != nil { + _ = file.Close() + return FutureNode{}, false, errors.Wrap(errFI, "Stat") + } + + switch { + case fs.IsRegularFile(fi): + debug.Log(" %v regular file", target) + start := time.Now() + + // use previous node if the file hasn't changed + if previous != nil && !fileChanged(fi, previous) { + debug.Log("%v hasn't changed, returning old node", target) + arch.CompleteItem(snPath, previous, previous, ItemStats{}, time.Since(start)) + arch.CompleteBlob(snPath, previous.Size) + fn.node = previous + _ = file.Close() + return fn, false, nil + } + + fn.isFile = true + // Save will close the file, we don't need to do that + fn.file = arch.fileSaver.Save(ctx, snPath, file, fi, func() { + arch.StartFile(snPath) + }, func(node *restic.Node, stats ItemStats) { + arch.CompleteItem(snPath, previous, node, stats, time.Since(start)) + }) + + file = nil + + case fi.IsDir(): + debug.Log(" %v dir", target) + + snItem := snPath + "/" + start := time.Now() + oldSubtree := arch.loadSubtree(ctx, previous) + fn.node, fn.stats, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree) + if err == nil { + arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start)) + } else { + _ = file.Close() + return FutureNode{}, false, err + } + + case fi.Mode()&os.ModeSocket > 0: + debug.Log(" %v is a socket, ignoring", target) + return FutureNode{}, true, nil + + default: + debug.Log(" %v other", target) + + fn.node, err = arch.nodeFromFileInfo(target, fi) + if err != nil { + _ = file.Close() + return FutureNode{}, false, err + } + } + + if file != nil { + err = file.Close() + if err != nil { + return fn, false, errors.Wrap(err, "Close") + } + } + + return fn, false, nil +} + +// fileChanged returns true if the file's content has changed since the node +// was created. +func fileChanged(fi os.FileInfo, node *restic.Node) bool { + if node == nil { + return true + } + + // check type change + if node.Type != "file" { + return true + } + + // check modification timestamp + if !fi.ModTime().Equal(node.ModTime) { + return true + } + + // check size + extFI := fs.ExtendedStat(fi) + if uint64(fi.Size()) != node.Size || uint64(extFI.Size) != node.Size { + return true + } + + // check inode + if node.Inode != extFI.Inode { + return true + } + + return false +} + +// join returns all elements separated with a forward slash. +func join(elem ...string) string { + return path.Join(elem...) +} + +// statDir returns the file info for the directory. Symbolic links are +// resolved. If the target directory is not a directory, an error is returned. +func (arch *Archiver) statDir(dir string) (os.FileInfo, error) { + fi, err := arch.FS.Stat(dir) + if err != nil { + return nil, errors.Wrap(err, "Lstat") + } + + tpe := fi.Mode() & (os.ModeType | os.ModeCharDevice) + if tpe != os.ModeDir { + return fi, errors.Errorf("path is not a directory: %v", dir) + } + + return fi, nil +} + +// SaveTree stores a Tree in the repo, returned is the tree. snPath is the path +// within the current snapshot. +func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, previous *restic.Tree) (*restic.Tree, error) { + debug.Log("%v (%v nodes), parent %v", snPath, len(atree.Nodes), previous) + + tree := restic.NewTree() + + futureNodes := make(map[string]FutureNode) + + for name, subatree := range atree.Nodes { + + // this is a leaf node + if subatree.Path != "" { + fn, excluded, err := arch.Save(ctx, join(snPath, name), subatree.Path, previous.Find(name)) + + if err != nil { + err = arch.error(subatree.Path, fn.fi, err) + if err == nil { + // ignore error + continue + } + return nil, err + } + + if err != nil { + return nil, err + } + + if !excluded { + futureNodes[name] = fn + } + continue + } + + snItem := join(snPath, name) + "/" + start := time.Now() + + oldNode := previous.Find(name) + oldSubtree := arch.loadSubtree(ctx, oldNode) + + // not a leaf node, archive subtree + subtree, err := arch.SaveTree(ctx, join(snPath, name), &subatree, oldSubtree) + if err != nil { + return nil, err + } + + id, nodeStats, err := arch.saveTree(ctx, subtree) + if err != nil { + return nil, err + } + + if subatree.FileInfoPath == "" { + return nil, errors.Errorf("FileInfoPath for %v/%v is empty", snPath, name) + } + + debug.Log("%v, saved subtree %v as %v", snPath, subtree, id.Str()) + + fi, err := arch.statDir(subatree.FileInfoPath) + if err != nil { + return nil, err + } + + debug.Log("%v, dir node data loaded from %v", snPath, subatree.FileInfoPath) + + node, err := arch.nodeFromFileInfo(subatree.FileInfoPath, fi) + if err != nil { + return nil, err + } + + node.Name = name + node.Subtree = &id + + err = tree.Insert(node) + if err != nil { + return nil, err + } + + arch.CompleteItem(snItem, oldNode, node, nodeStats, time.Since(start)) + } + + // process all futures + for name, fn := range futureNodes { + fn.wait() + + // return the error, or ignore it + if fn.err != nil { + fn.err = arch.error(fn.target, fn.fi, fn.err) + if fn.err == nil { + // ignore error + continue + } + + return nil, fn.err + } + + // when the error is ignored, the node could not be saved, so ignore it + if fn.node == nil { + debug.Log("%v excluded: %v", fn.snPath, fn.target) + continue + } + + fn.node.Name = name + + err := tree.Insert(fn.node) + if err != nil { + return nil, err + } + } + + return tree, nil +} + +type fileInfoSlice []os.FileInfo + +func (fi fileInfoSlice) Len() int { + return len(fi) +} + +func (fi fileInfoSlice) Swap(i, j int) { + fi[i], fi[j] = fi[j], fi[i] +} + +func (fi fileInfoSlice) Less(i, j int) bool { + return fi[i].Name() < fi[j].Name() +} + +func readdir(filesystem fs.FS, dir string) ([]os.FileInfo, error) { + f, err := filesystem.OpenFile(dir, fs.O_RDONLY|fs.O_NOFOLLOW, 0) + if err != nil { + return nil, errors.Wrap(err, "Open") + } + + entries, err := f.Readdir(-1) + if err != nil { + _ = f.Close() + return nil, errors.Wrap(err, "Readdir") + } + + err = f.Close() + if err != nil { + return nil, err + } + + sort.Sort(fileInfoSlice(entries)) + return entries, nil +} + +func readdirnames(filesystem fs.FS, dir string) ([]string, error) { + f, err := filesystem.OpenFile(dir, fs.O_RDONLY|fs.O_NOFOLLOW, 0) + if err != nil { + return nil, errors.Wrap(err, "Open") + } + + entries, err := f.Readdirnames(-1) + if err != nil { + _ = f.Close() + return nil, errors.Wrap(err, "Readdirnames") + } + + err = f.Close() + if err != nil { + return nil, err + } + + sort.Sort(sort.StringSlice(entries)) + return entries, nil +} + +// resolveRelativeTargets replaces targets that only contain relative +// directories ("." or "../../") with the contents of the directory. Each +// element of target is processed with fs.Clean(). +func resolveRelativeTargets(fs fs.FS, targets []string) ([]string, error) { + debug.Log("targets before resolving: %v", targets) + result := make([]string, 0, len(targets)) + for _, target := range targets { + target = fs.Clean(target) + pc, _ := pathComponents(fs, target, false) + if len(pc) > 0 { + result = append(result, target) + continue + } + + debug.Log("replacing %q with readdir(%q)", target, target) + entries, err := readdirnames(fs, target) + if err != nil { + return nil, err + } + + for _, name := range entries { + result = append(result, fs.Join(target, name)) + } + } + + debug.Log("targets after resolving: %v", result) + return result, nil +} + +// SnapshotOptions collect attributes for a new snapshot. +type SnapshotOptions struct { + Tags []string + Hostname string + Excludes []string + Time time.Time + ParentSnapshot restic.ID +} + +// loadParentTree loads a tree referenced by snapshot id. If id is null, nil is returned. +func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID) *restic.Tree { + if snapshotID.IsNull() { + return nil + } + + debug.Log("load parent snapshot %v", snapshotID) + sn, err := restic.LoadSnapshot(ctx, arch.Repo, snapshotID) + if err != nil { + debug.Log("unable to load snapshot %v: %v", snapshotID, err) + return nil + } + + if sn.Tree == nil { + debug.Log("snapshot %v has empty tree %v", snapshotID) + return nil + } + + debug.Log("load parent tree %v", *sn.Tree) + tree, err := arch.Repo.LoadTree(ctx, *sn.Tree) + if err != nil { + debug.Log("unable to load tree %v: %v", *sn.Tree, err) + return nil + } + return tree +} + +// runWorkers starts the worker pools, which are stopped when the context is cancelled. +func (arch *Archiver) runWorkers(ctx context.Context) { + arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency) + arch.fileSaver = NewFileSaver(ctx, arch.FS, arch.blobSaver, arch.Repo.Config().ChunkerPolynomial, arch.Options.FileReadConcurrency) + arch.fileSaver.CompleteBlob = arch.CompleteBlob + + arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo +} + +// Snapshot saves several targets and returns a snapshot. +func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts SnapshotOptions) (*restic.Snapshot, restic.ID, error) { + workerCtx, cancel := context.WithCancel(ctx) + defer cancel() + + arch.runWorkers(workerCtx) + + err := arch.Valid() + if err != nil { + return nil, restic.ID{}, err + } + + cleanTargets, err := resolveRelativeTargets(arch.FS, targets) + if err != nil { + return nil, restic.ID{}, err + } + + atree, err := NewTree(arch.FS, cleanTargets) + if err != nil { + return nil, restic.ID{}, err + } + + start := time.Now() + tree, err := arch.SaveTree(ctx, "/", atree, arch.loadParentTree(ctx, opts.ParentSnapshot)) + if err != nil { + return nil, restic.ID{}, err + } + + rootTreeID, stats, err := arch.saveTree(ctx, tree) + if err != nil { + return nil, restic.ID{}, err + } + + arch.CompleteItem("/", nil, nil, stats, time.Since(start)) + + err = arch.Repo.Flush(ctx) + if err != nil { + return nil, restic.ID{}, err + } + + err = arch.Repo.SaveIndex(ctx) + if err != nil { + return nil, restic.ID{}, err + } + + sn, err := restic.NewSnapshot(targets, opts.Tags, opts.Hostname, opts.Time) + sn.Excludes = opts.Excludes + if !opts.ParentSnapshot.IsNull() { + id := opts.ParentSnapshot + sn.Parent = &id + } + sn.Tree = &rootTreeID + + id, err := arch.Repo.SaveJSONUnpacked(ctx, restic.SnapshotFile, sn) + if err != nil { + return nil, restic.ID{}, err + } + + return sn, id, nil +} diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go new file mode 100644 index 000000000..3f4daf5ec --- /dev/null +++ b/internal/archiver/archiver_test.go @@ -0,0 +1,1569 @@ +package archiver + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/restic/restic/internal/checker" + "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/repository" + "github.com/restic/restic/internal/restic" + restictest "github.com/restic/restic/internal/test" +) + +func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo restic.Repository, cleanup func()) { + tempdir, removeTempdir := restictest.TempDir(t) + repo, removeRepository := repository.TestRepository(t) + + TestCreateFiles(t, tempdir, src) + + cleanup = func() { + removeRepository() + removeTempdir() + } + + return tempdir, repo, cleanup +} + +func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + arch := New(repo, filesystem, Options{}) + arch.runWorkers(ctx) + + var ( + completeCallbackNode *restic.Node + completeCallbackStats ItemStats + completeCallback bool + + startCallback bool + ) + + complete := func(node *restic.Node, stats ItemStats) { + completeCallback = true + completeCallbackNode = node + completeCallbackStats = stats + } + + start := func() { + startCallback = true + } + + file, err := arch.FS.OpenFile(filename, fs.O_RDONLY|fs.O_NOFOLLOW, 0) + if err != nil { + t.Fatal(err) + } + + fi, err := file.Stat() + if err != nil { + t.Fatal(err) + } + + res := arch.fileSaver.Save(ctx, "/", file, fi, start, complete) + if res.Err() != nil { + t.Fatal(res.Err()) + } + + err = repo.Flush(ctx) + if err != nil { + t.Fatal(err) + } + + err = repo.SaveIndex(ctx) + if err != nil { + t.Fatal(err) + } + + if !startCallback { + t.Errorf("start callback did not happen") + } + + if !completeCallback { + t.Errorf("complete callback did not happen") + } + + if completeCallbackNode == nil { + t.Errorf("no node returned for complete callback") + } + + if completeCallbackNode != nil && !res.Node().Equals(*completeCallbackNode) { + t.Errorf("different node returned for complete callback") + } + + if completeCallbackStats != res.Stats() { + t.Errorf("different stats return for complete callback, want:\n %v\ngot:\n %v", res.Stats(), completeCallbackStats) + } + + return res.Node(), res.Stats() +} + +func TestArchiverSaveFile(t *testing.T) { + var tests = []TestFile{ + TestFile{Content: ""}, + TestFile{Content: "foo"}, + TestFile{Content: string(restictest.Random(23, 12*1024*1024+1287898))}, + } + + for _, testfile := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, repo, cleanup := prepareTempdirRepoSrc(t, TestDir{"file": testfile}) + defer cleanup() + + node, stats := saveFile(t, repo, filepath.Join(tempdir, "file"), fs.Track{fs.Local{}}) + + TestEnsureFileContent(ctx, t, repo, "file", node, testfile) + if stats.DataSize != uint64(len(testfile.Content)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize) + } + if stats.DataBlobs <= 0 && len(testfile.Content) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + } + if stats.TreeSize != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + } + if stats.TreeBlobs != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + } + }) + } +} + +func TestArchiverSaveFileReaderFS(t *testing.T) { + var tests = []struct { + Data string + }{ + {Data: ""}, + {Data: "foo"}, + {Data: string(restictest.Random(23, 12*1024*1024+1287898))}, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + repo, cleanup := repository.TestRepository(t) + defer cleanup() + + ts := time.Now() + filename := "xx" + readerFs := &fs.Reader{ + ModTime: ts, + Mode: 0123, + Name: filename, + ReadCloser: ioutil.NopCloser(strings.NewReader(test.Data)), + } + + node, stats := saveFile(t, repo, filename, readerFs) + + TestEnsureFileContent(ctx, t, repo, "file", node, TestFile{Content: test.Data}) + if stats.DataSize != uint64(len(test.Data)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize) + } + if stats.DataBlobs <= 0 && len(test.Data) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + } + if stats.TreeSize != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + } + if stats.TreeBlobs != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + } + }) + } +} + +func BenchmarkArchiverSaveFileSmall(b *testing.B) { + const fileSize = 4 * 1024 + d := TestDir{"file": TestFile{ + Content: string(restictest.Random(23, fileSize)), + }} + + b.SetBytes(fileSize) + + for i := 0; i < b.N; i++ { + b.StopTimer() + tempdir, repo, cleanup := prepareTempdirRepoSrc(b, d) + b.StartTimer() + + _, stats := saveFile(b, repo, filepath.Join(tempdir, "file"), fs.Track{fs.Local{}}) + + b.StopTimer() + if stats.DataSize != fileSize { + b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize) + } + if stats.DataBlobs <= 0 { + b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + } + if stats.TreeSize != 0 { + b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + } + if stats.TreeBlobs != 0 { + b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + } + cleanup() + b.StartTimer() + } +} + +func BenchmarkArchiverSaveFileLarge(b *testing.B) { + const fileSize = 40*1024*1024 + 1287898 + d := TestDir{"file": TestFile{ + Content: string(restictest.Random(23, fileSize)), + }} + + b.SetBytes(fileSize) + + for i := 0; i < b.N; i++ { + b.StopTimer() + tempdir, repo, cleanup := prepareTempdirRepoSrc(b, d) + b.StartTimer() + + _, stats := saveFile(b, repo, filepath.Join(tempdir, "file"), fs.Track{fs.Local{}}) + + b.StopTimer() + if stats.DataSize != fileSize { + b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize) + } + if stats.DataBlobs <= 0 { + b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + } + if stats.TreeSize != 0 { + b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + } + if stats.TreeBlobs != 0 { + b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + } + cleanup() + b.StartTimer() + } +} + +type blobCountingRepo struct { + restic.Repository + + m sync.Mutex + saved map[restic.BlobHandle]uint +} + +func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { + id, err := repo.Repository.SaveBlob(ctx, t, buf, id) + h := restic.BlobHandle{ID: id, Type: t} + repo.m.Lock() + repo.saved[h]++ + repo.m.Unlock() + return id, err +} + +func (repo *blobCountingRepo) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, error) { + id, err := repo.Repository.SaveTree(ctx, t) + h := restic.BlobHandle{ID: id, Type: restic.TreeBlob} + repo.m.Lock() + repo.saved[h]++ + repo.m.Unlock() + fmt.Printf("savetree %v", h) + return id, err +} + +func appendToFile(t testing.TB, filename string, data []byte) { + f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + t.Fatal(err) + } + + _, err = f.Write(data) + if err != nil { + _ = f.Close() + t.Fatal(err) + } + + err = f.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestArchiverSaveFileIncremental(t *testing.T) { + tempdir, removeTempdir := restictest.TempDir(t) + defer removeTempdir() + + testRepo, removeRepository := repository.TestRepository(t) + defer removeRepository() + + repo := &blobCountingRepo{ + Repository: testRepo, + saved: make(map[restic.BlobHandle]uint), + } + + data := restictest.Random(23, 512*1024+887898) + testfile := filepath.Join(tempdir, "testfile") + + for i := 0; i < 3; i++ { + appendToFile(t, testfile, data) + node, _ := saveFile(t, repo, testfile, fs.Track{fs.Local{}}) + + t.Logf("node blobs: %v", node.Content) + + for h, n := range repo.saved { + if n > 1 { + t.Errorf("iteration %v: blob %v saved more than once (%d times)", i, h, n) + } + } + } +} + +func save(t testing.TB, filename string, data []byte) { + f, err := os.Create(filename) + if err != nil { + t.Fatal(err) + } + + _, err = f.Write(data) + if err != nil { + t.Fatal(err) + } + + err = f.Sync() + if err != nil { + t.Fatal(err) + } + + err = f.Close() + if err != nil { + t.Fatal(err) + } +} + +func lstat(t testing.TB, name string) os.FileInfo { + fi, err := os.Lstat(name) + if err != nil { + t.Fatal(err) + } + + return fi +} + +func setTimestamp(t testing.TB, filename string, atime, mtime time.Time) { + var utimes = [...]syscall.Timespec{ + syscall.NsecToTimespec(atime.UnixNano()), + syscall.NsecToTimespec(mtime.UnixNano()), + } + + err := syscall.UtimesNano(filename, utimes[:]) + if err != nil { + t.Fatal(err) + } +} + +func remove(t testing.TB, filename string) { + err := os.Remove(filename) + if err != nil { + t.Fatal(err) + } +} + +func nodeFromFI(t testing.TB, filename string, fi os.FileInfo) *restic.Node { + node, err := restic.NodeFromFileInfo(filename, fi) + if err != nil { + t.Fatal(err) + } + + return node +} + +func TestFileChanged(t *testing.T) { + var defaultContent = []byte("foobar") + + var d = 50 * time.Millisecond + if runtime.GOOS == "darwin" { + // on older darwin instances the file system only supports one second + // granularity + d = time.Second + } + + sleep := func() { + time.Sleep(d) + } + + var tests = []struct { + Name string + Content []byte + Modify func(t testing.TB, filename string) + }{ + { + Name: "same-content-new-file", + Modify: func(t testing.TB, filename string) { + remove(t, filename) + sleep() + save(t, filename, defaultContent) + }, + }, + { + Name: "same-content-new-timestamp", + Modify: func(t testing.TB, filename string) { + sleep() + save(t, filename, defaultContent) + }, + }, + { + Name: "other-content", + Modify: func(t testing.TB, filename string) { + remove(t, filename) + sleep() + save(t, filename, []byte("xxxxxx")) + }, + }, + { + Name: "longer-content", + Modify: func(t testing.TB, filename string) { + save(t, filename, []byte("xxxxxxxxxxxxxxxxxxxxxx")) + }, + }, + { + Name: "new-file", + Modify: func(t testing.TB, filename string) { + remove(t, filename) + sleep() + save(t, filename, defaultContent) + }, + }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + tempdir, cleanup := restictest.TempDir(t) + defer cleanup() + + filename := filepath.Join(tempdir, "file") + content := defaultContent + if test.Content != nil { + content = test.Content + } + save(t, filename, content) + + fiBefore := lstat(t, filename) + node := nodeFromFI(t, filename, fiBefore) + + if fileChanged(fiBefore, node) { + t.Fatalf("unchanged file detected as changed") + } + + test.Modify(t, filename) + + fiAfter := lstat(t, filename) + if !fileChanged(fiAfter, node) { + t.Fatalf("modified file detected as unchanged") + } + }) + } +} + +func TestFilChangedSpecialCases(t *testing.T) { + tempdir, cleanup := restictest.TempDir(t) + defer cleanup() + + filename := filepath.Join(tempdir, "file") + content := []byte("foobar") + save(t, filename, content) + + t.Run("nil-node", func(t *testing.T) { + fi := lstat(t, filename) + if !fileChanged(fi, nil) { + t.Fatal("nil node detected as unchanged") + } + }) + + t.Run("type-change", func(t *testing.T) { + fi := lstat(t, filename) + node := nodeFromFI(t, filename, fi) + node.Type = "symlink" + if !fileChanged(fi, node) { + t.Fatal("node with changed type detected as unchanged") + } + }) +} + +func TestArchiverSaveDir(t *testing.T) { + const targetNodeName = "targetdir" + + var tests = []struct { + src TestDir + chdir string + target string + want TestDir + }{ + { + src: TestDir{ + "targetfile": TestFile{Content: string(restictest.Random(888, 2*1024*1024+5000))}, + }, + target: ".", + want: TestDir{ + "targetdir": TestDir{ + "targetfile": TestFile{Content: string(restictest.Random(888, 2*1024*1024+5000))}, + }, + }, + }, + { + src: TestDir{ + "targetdir": TestDir{ + "foo": TestFile{Content: "foo"}, + "emptyfile": TestFile{Content: ""}, + "bar": TestFile{Content: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}, + "largefile": TestFile{Content: string(restictest.Random(888, 2*1024*1024+5000))}, + "largerfile": TestFile{Content: string(restictest.Random(234, 5*1024*1024+5000))}, + }, + }, + target: "targetdir", + }, + { + src: TestDir{ + "foo": TestFile{Content: "foo"}, + "emptyfile": TestFile{Content: ""}, + "bar": TestFile{Content: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}, + }, + target: ".", + want: TestDir{ + "targetdir": TestDir{ + "foo": TestFile{Content: "foo"}, + "emptyfile": TestFile{Content: ""}, + "bar": TestFile{Content: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}, + }, + }, + }, + { + src: TestDir{ + "foo": TestDir{ + "subdir": TestDir{ + "x": TestFile{Content: "xxx"}, + "y": TestFile{Content: "yyyyyyyyyyyyyyyy"}, + "z": TestFile{Content: "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"}, + }, + "file": TestFile{Content: "just a test"}, + }, + }, + chdir: "foo/subdir", + target: "../../", + want: TestDir{ + "targetdir": TestDir{ + "foo": TestDir{ + "subdir": TestDir{ + "x": TestFile{Content: "xxx"}, + "y": TestFile{Content: "yyyyyyyyyyyyyyyy"}, + "z": TestFile{Content: "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"}, + }, + "file": TestFile{Content: "just a test"}, + }, + }, + }, + }, + { + src: TestDir{ + "foo": TestDir{ + "file": TestFile{Content: "just a test"}, + "file2": TestFile{Content: "again"}, + }, + }, + target: "./foo", + want: TestDir{ + "targetdir": TestDir{ + "file": TestFile{Content: "just a test"}, + "file2": TestFile{Content: "again"}, + }, + }, + }, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) + defer cleanup() + + arch := New(repo, fs.Track{fs.Local{}}, Options{}) + arch.runWorkers(ctx) + + chdir := tempdir + if test.chdir != "" { + chdir = filepath.Join(chdir, test.chdir) + } + + back := fs.TestChdir(t, chdir) + defer back() + + fi, err := fs.Lstat(test.target) + if err != nil { + t.Fatal(err) + } + + node, stats, err := arch.SaveDir(ctx, "/", fi, test.target, nil) + if err != nil { + t.Fatal(err) + } + + t.Logf("stats: %v", stats) + if stats.DataSize != 0 { + t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) + } + if stats.DataBlobs != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + } + if stats.TreeSize <= 0 { + t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize) + } + if stats.TreeBlobs <= 0 { + t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs) + } + + node.Name = targetNodeName + tree := &restic.Tree{Nodes: []*restic.Node{node}} + treeID, err := repo.SaveTree(ctx, tree) + if err != nil { + t.Fatal(err) + } + + err = repo.Flush(ctx) + if err != nil { + t.Fatal(err) + } + + err = repo.SaveIndex(ctx) + if err != nil { + t.Fatal(err) + } + + want := test.want + if want == nil { + want = test.src + } + TestEnsureTree(ctx, t, "/", repo, treeID, want) + }) + } +} + +func TestArchiverSaveDirIncremental(t *testing.T) { + tempdir, removeTempdir := restictest.TempDir(t) + defer removeTempdir() + + testRepo, removeRepository := repository.TestRepository(t) + defer removeRepository() + + repo := &blobCountingRepo{ + Repository: testRepo, + saved: make(map[restic.BlobHandle]uint), + } + + appendToFile(t, filepath.Join(tempdir, "testfile"), []byte("foobar")) + + // save the empty directory several times in a row, then have a look if the + // archiver did save the same tree several times + for i := 0; i < 5; i++ { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + arch := New(repo, fs.Track{fs.Local{}}, Options{}) + arch.runWorkers(ctx) + + fi, err := fs.Lstat(tempdir) + if err != nil { + t.Fatal(err) + } + + node, stats, err := arch.SaveDir(ctx, "/", fi, tempdir, nil) + if err != nil { + t.Fatal(err) + } + + if i == 0 { + // operation must have added new tree data + if stats.DataSize != 0 { + t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) + } + if stats.DataBlobs != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + } + if stats.TreeSize <= 0 { + t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize) + } + if stats.TreeBlobs <= 0 { + t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs) + } + } else { + // operation must not have added any new data + if stats.DataSize != 0 { + t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) + } + if stats.DataBlobs != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + } + if stats.TreeSize != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + } + if stats.TreeBlobs != 0 { + t.Errorf("wrong stats returned in TreeBlobs, want 0, got %d", stats.TreeBlobs) + } + } + + t.Logf("node subtree %v", node.Subtree) + + err = repo.Flush(ctx) + if err != nil { + t.Fatal(err) + } + + err = repo.SaveIndex(ctx) + if err != nil { + t.Fatal(err) + } + + for h, n := range repo.saved { + if n > 1 { + t.Errorf("iteration %v: blob %v saved more than once (%d times)", i, h, n) + } + } + } +} + +func TestArchiverSaveTree(t *testing.T) { + symlink := func(from, to string) func(t testing.TB) { + return func(t testing.TB) { + err := os.Symlink(from, to) + if err != nil { + t.Fatal(err) + } + } + } + + var tests = []struct { + src TestDir + prepare func(t testing.TB) + targets []string + want TestDir + }{ + { + src: TestDir{ + "targetfile": TestFile{Content: string("foobar")}, + }, + targets: []string{"targetfile"}, + want: TestDir{ + "targetfile": TestFile{Content: string("foobar")}, + }, + }, + { + src: TestDir{ + "targetfile": TestFile{Content: string("foobar")}, + }, + prepare: symlink("targetfile", "filesymlink"), + targets: []string{"targetfile", "filesymlink"}, + want: TestDir{ + "targetfile": TestFile{Content: string("foobar")}, + "filesymlink": TestSymlink{Target: "targetfile"}, + }, + }, + { + src: TestDir{ + "dir": TestDir{ + "subdir": TestDir{ + "subsubdir": TestDir{ + "targetfile": TestFile{Content: string("foobar")}, + }, + }, + "otherfile": TestFile{Content: string("xxx")}, + }, + }, + prepare: symlink("subdir", filepath.FromSlash("dir/symlink")), + targets: []string{filepath.FromSlash("dir/symlink")}, + want: TestDir{ + "dir": TestDir{ + "symlink": TestSymlink{Target: "subdir"}, + }, + }, + }, + { + src: TestDir{ + "dir": TestDir{ + "subdir": TestDir{ + "subsubdir": TestDir{ + "targetfile": TestFile{Content: string("foobar")}, + }, + }, + "otherfile": TestFile{Content: string("xxx")}, + }, + }, + prepare: symlink("subdir", filepath.FromSlash("dir/symlink")), + targets: []string{filepath.FromSlash("dir/symlink/subsubdir")}, + want: TestDir{ + "dir": TestDir{ + "symlink": TestDir{ + "subsubdir": TestDir{ + "targetfile": TestFile{Content: string("foobar")}, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) + defer cleanup() + + testFS := fs.Track{fs.Local{}} + + arch := New(repo, testFS, Options{}) + arch.runWorkers(ctx) + + back := fs.TestChdir(t, tempdir) + defer back() + + if test.prepare != nil { + test.prepare(t) + } + + atree, err := NewTree(testFS, test.targets) + if err != nil { + t.Fatal(err) + } + + tree, err := arch.SaveTree(ctx, "/", atree, nil) + if err != nil { + t.Fatal(err) + } + + treeID, err := repo.SaveTree(ctx, tree) + if err != nil { + t.Fatal(err) + } + + err = repo.Flush(ctx) + if err != nil { + t.Fatal(err) + } + + err = repo.SaveIndex(ctx) + if err != nil { + t.Fatal(err) + } + + want := test.want + if want == nil { + want = test.src + } + TestEnsureTree(ctx, t, "/", repo, treeID, want) + }) + } +} + +func TestArchiverSnapshot(t *testing.T) { + var tests = []struct { + name string + src TestDir + want TestDir + chdir string + targets []string + }{ + { + name: "single-file", + src: TestDir{ + "foo": TestFile{Content: "foo"}, + }, + targets: []string{"foo"}, + }, + { + name: "file-current-dir", + src: TestDir{ + "foo": TestFile{Content: "foo"}, + }, + targets: []string{"./foo"}, + }, + { + name: "dir", + src: TestDir{ + "target": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + }, + targets: []string{"target"}, + }, + { + name: "dir-current-dir", + src: TestDir{ + "target": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + }, + targets: []string{"./target"}, + }, + { + name: "content-dir-current-dir", + src: TestDir{ + "target": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + }, + targets: []string{"./target/."}, + }, + { + name: "current-dir", + src: TestDir{ + "target": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + }, + targets: []string{"."}, + }, + { + name: "subdir", + src: TestDir{ + "subdir": TestDir{ + "foo": TestFile{Content: "foo"}, + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo in subsubdir"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + targets: []string{"subdir"}, + want: TestDir{ + "subdir": TestDir{ + "foo": TestFile{Content: "foo"}, + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo in subsubdir"}, + }, + }, + }, + }, + { + name: "subsubdir", + src: TestDir{ + "subdir": TestDir{ + "foo": TestFile{Content: "foo"}, + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo in subsubdir"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + targets: []string{"subdir/subsubdir"}, + want: TestDir{ + "subdir": TestDir{ + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo in subsubdir"}, + }, + }, + }, + }, + { + name: "parent-dir", + src: TestDir{ + "subdir": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + "other": TestFile{Content: "another file"}, + }, + chdir: "subdir", + targets: []string{".."}, + }, + { + name: "parent-parent-dir", + src: TestDir{ + "subdir": TestDir{ + "foo": TestFile{Content: "foo"}, + "subsubdir": TestDir{ + "empty": TestFile{Content: ""}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + chdir: "subdir/subsubdir", + targets: []string{"../.."}, + }, + { + name: "parent-parent-dir-slash", + src: TestDir{ + "subdir": TestDir{ + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + chdir: "subdir/subsubdir", + targets: []string{"../../"}, + want: TestDir{ + "subdir": TestDir{ + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + }, + { + name: "parent-subdir", + src: TestDir{ + "subdir": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + "other": TestFile{Content: "another file"}, + }, + chdir: "subdir", + targets: []string{"../subdir"}, + want: TestDir{ + "subdir": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + }, + }, + { + name: "parent-parent-dir-subdir", + src: TestDir{ + "subdir": TestDir{ + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + chdir: "subdir/subsubdir", + targets: []string{"../../subdir/subsubdir"}, + want: TestDir{ + "subdir": TestDir{ + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + }, + }, + }, + { + name: "included-multiple1", + src: TestDir{ + "subdir": TestDir{ + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + "other": TestFile{Content: "another file"}, + }, + }, + targets: []string{"subdir", "subdir/subsubdir"}, + }, + { + name: "included-multiple2", + src: TestDir{ + "subdir": TestDir{ + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo"}, + }, + "other": TestFile{Content: "another file"}, + }, + }, + targets: []string{"subdir/subsubdir", "subdir"}, + }, + { + name: "collision", + src: TestDir{ + "subdir": TestDir{ + "foo": TestFile{Content: "foo in subdir"}, + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo in subsubdir"}, + }, + }, + "foo": TestFile{Content: "another file"}, + }, + chdir: "subdir", + targets: []string{".", "../foo"}, + want: TestDir{ + + "foo": TestFile{Content: "foo in subdir"}, + "subsubdir": TestDir{ + "foo": TestFile{Content: "foo in subsubdir"}, + }, + "foo-1": TestFile{Content: "another file"}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) + defer cleanup() + + arch := New(repo, fs.Track{fs.Local{}}, Options{}) + + chdir := tempdir + if test.chdir != "" { + chdir = filepath.Join(chdir, filepath.FromSlash(test.chdir)) + } + + back := fs.TestChdir(t, chdir) + defer back() + + var targets []string + for _, target := range test.targets { + targets = append(targets, os.ExpandEnv(target)) + } + + t.Logf("targets: %v", targets) + sn, snapshotID, err := arch.Snapshot(ctx, targets, SnapshotOptions{Time: time.Now()}) + if err != nil { + t.Fatal(err) + } + + t.Logf("saved as %v", snapshotID.Str()) + + want := test.want + if want == nil { + want = test.src + } + TestEnsureSnapshot(t, repo, snapshotID, want) + + checker.TestCheckRepo(t, repo) + + // check that the snapshot contains the targets with absolute paths + for i, target := range sn.Paths { + atarget, err := filepath.Abs(test.targets[i]) + if err != nil { + t.Fatal(err) + } + + if target != atarget { + t.Errorf("wrong path in snapshot: want %v, got %v", atarget, target) + } + } + }) + } +} + +func TestArchiverSnapshotSelect(t *testing.T) { + var tests = []struct { + name string + src TestDir + want TestDir + selFn SelectFunc + }{ + { + name: "include-all", + src: TestDir{ + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "foo.txt": TestFile{Content: "foo text file"}, + "subdir": TestDir{ + "other": TestFile{Content: "other in subdir"}, + "bar.txt": TestFile{Content: "bar.txt in subdir"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + selFn: func(item string, fi os.FileInfo) bool { + return true + }, + }, + { + name: "exclude-all", + src: TestDir{ + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "foo.txt": TestFile{Content: "foo text file"}, + "subdir": TestDir{ + "other": TestFile{Content: "other in subdir"}, + "bar.txt": TestFile{Content: "bar.txt in subdir"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + selFn: func(item string, fi os.FileInfo) bool { + return false + }, + want: TestDir{}, + }, + { + name: "exclude-txt-files", + src: TestDir{ + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "foo.txt": TestFile{Content: "foo text file"}, + "subdir": TestDir{ + "other": TestFile{Content: "other in subdir"}, + "bar.txt": TestFile{Content: "bar.txt in subdir"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + want: TestDir{ + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "subdir": TestDir{ + "other": TestFile{Content: "other in subdir"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + selFn: func(item string, fi os.FileInfo) bool { + if filepath.Ext(item) == ".txt" { + return false + } + return true + }, + }, + { + name: "exclude-dir", + src: TestDir{ + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "foo.txt": TestFile{Content: "foo text file"}, + "subdir": TestDir{ + "other": TestFile{Content: "other in subdir"}, + "bar.txt": TestFile{Content: "bar.txt in subdir"}, + }, + }, + "other": TestFile{Content: "another file"}, + }, + want: TestDir{ + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "foo.txt": TestFile{Content: "foo text file"}, + }, + "other": TestFile{Content: "another file"}, + }, + selFn: func(item string, fi os.FileInfo) bool { + if filepath.Base(item) == "subdir" { + return false + } + return true + }, + }, + { + name: "select-absolute-paths", + src: TestDir{ + "foo": TestFile{Content: "foo"}, + }, + selFn: func(item string, fi os.FileInfo) bool { + return filepath.IsAbs(item) + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) + defer cleanup() + + arch := New(repo, fs.Track{fs.Local{}}, Options{}) + arch.Select = test.selFn + + back := fs.TestChdir(t, tempdir) + defer back() + + targets := []string{"."} + _, snapshotID, err := arch.Snapshot(ctx, targets, SnapshotOptions{Time: time.Now()}) + if err != nil { + t.Fatal(err) + } + + t.Logf("saved as %v", snapshotID.Str()) + + want := test.want + if want == nil { + want = test.src + } + TestEnsureSnapshot(t, repo, snapshotID, want) + + checker.TestCheckRepo(t, repo) + }) + } +} + +// MockFS keeps track which files are read. +type MockFS struct { + fs.FS + + m sync.Mutex + bytesRead map[string]int // tracks bytes read from all opened files +} + +func (m *MockFS) Open(name string) (fs.File, error) { + f, err := m.FS.Open(name) + if err != nil { + return f, err + } + + return MockFile{File: f, fs: m, filename: name}, nil +} + +func (m *MockFS) OpenFile(name string, flag int, perm os.FileMode) (fs.File, error) { + f, err := m.FS.OpenFile(name, flag, perm) + if err != nil { + return f, err + } + + return MockFile{File: f, fs: m, filename: name}, nil +} + +type MockFile struct { + fs.File + filename string + + fs *MockFS +} + +func (f MockFile) Read(p []byte) (int, error) { + n, err := f.File.Read(p) + if n > 0 { + f.fs.m.Lock() + f.fs.bytesRead[f.filename] += n + f.fs.m.Unlock() + } + return n, err +} + +func TestArchiverParent(t *testing.T) { + var tests = []struct { + src TestDir + read map[string]int // tracks number of times a file must have been read + }{ + { + src: TestDir{ + "targetfile": TestFile{Content: string(restictest.Random(888, 2*1024*1024+5000))}, + }, + read: map[string]int{ + "targetfile": 1, + }, + }, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) + defer cleanup() + + testFS := &MockFS{ + FS: fs.Track{fs.Local{}}, + bytesRead: make(map[string]int), + } + + arch := New(repo, testFS, Options{}) + + back := fs.TestChdir(t, tempdir) + defer back() + + _, firstSnapshotID, err := arch.Snapshot(ctx, []string{"."}, SnapshotOptions{Time: time.Now()}) + if err != nil { + t.Fatal(err) + } + + t.Logf("first backup saved as %v", firstSnapshotID.Str()) + t.Logf("testfs: %v", testFS) + + // check that all files have been read exactly once + TestWalkFiles(t, ".", test.src, func(filename string, item interface{}) error { + file, ok := item.(TestFile) + if !ok { + return nil + } + + n, ok := testFS.bytesRead[filename] + if !ok { + t.Fatalf("file %v was not read at all", filename) + } + + if n != len(file.Content) { + t.Fatalf("file %v: read %v bytes, wanted %v bytes", filename, n, len(file.Content)) + } + return nil + }) + + opts := SnapshotOptions{ + Time: time.Now(), + ParentSnapshot: firstSnapshotID, + } + _, secondSnapshotID, err := arch.Snapshot(ctx, []string{"."}, opts) + if err != nil { + t.Fatal(err) + } + + // check that all files still been read exactly once + TestWalkFiles(t, ".", test.src, func(filename string, item interface{}) error { + file, ok := item.(TestFile) + if !ok { + return nil + } + + n, ok := testFS.bytesRead[filename] + if !ok { + t.Fatalf("file %v was not read at all", filename) + } + + if n != len(file.Content) { + t.Fatalf("file %v: read %v bytes, wanted %v bytes", filename, n, len(file.Content)) + } + return nil + }) + + t.Logf("second backup saved as %v", secondSnapshotID.Str()) + t.Logf("testfs: %v", testFS) + + checker.TestCheckRepo(t, repo) + }) + } +} + +func TestArchiverErrorReporting(t *testing.T) { + ignoreErrorForBasename := func(basename string) ErrorFunc { + return func(item string, fi os.FileInfo, err error) error { + if filepath.Base(item) == "targetfile" { + t.Logf("ignoring error for targetfile: %v", err) + return nil + } + + t.Errorf("error handler called for unexpected file %v: %v", item, err) + return err + } + } + + chmodUnreadable := func(filename string) func(testing.TB) { + return func(t testing.TB) { + if runtime.GOOS == "windows" { + t.Skip("Skipping this test for windows") + } + + err := os.Chmod(filepath.FromSlash(filename), 0004) + if err != nil { + t.Fatal(err) + } + } + } + + var tests = []struct { + name string + src TestDir + want TestDir + prepare func(t testing.TB) + errFn ErrorFunc + mustError bool + }{ + { + name: "no-error", + src: TestDir{ + "targetfile": TestFile{Content: "foobar"}, + }, + }, + { + name: "file-unreadable", + src: TestDir{ + "targetfile": TestFile{Content: "foobar"}, + }, + prepare: chmodUnreadable("targetfile"), + mustError: true, + }, + { + name: "file-unreadable-ignore-error", + src: TestDir{ + "targetfile": TestFile{Content: "foobar"}, + "other": TestFile{Content: "xxx"}, + }, + want: TestDir{ + "other": TestFile{Content: "xxx"}, + }, + prepare: chmodUnreadable("targetfile"), + errFn: ignoreErrorForBasename("targetfile"), + }, + { + name: "file-subdir-unreadable", + src: TestDir{ + "subdir": TestDir{ + "targetfile": TestFile{Content: "foobar"}, + }, + }, + prepare: chmodUnreadable("subdir/targetfile"), + mustError: true, + }, + { + name: "file-subdir-unreadable-ignore-error", + src: TestDir{ + "subdir": TestDir{ + "targetfile": TestFile{Content: "foobar"}, + "other": TestFile{Content: "xxx"}, + }, + }, + want: TestDir{ + "subdir": TestDir{ + "other": TestFile{Content: "xxx"}, + }, + }, + prepare: chmodUnreadable("subdir/targetfile"), + errFn: ignoreErrorForBasename("targetfile"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) + defer cleanup() + + back := fs.TestChdir(t, tempdir) + defer back() + + if test.prepare != nil { + test.prepare(t) + } + + arch := New(repo, fs.Track{fs.Local{}}, Options{}) + arch.Error = test.errFn + + _, snapshotID, err := arch.Snapshot(ctx, []string{"."}, SnapshotOptions{Time: time.Now()}) + if test.mustError { + if err != nil { + t.Logf("found expected error (%v), skipping further checks", err) + return + } + + t.Fatalf("expected error not returned by archiver") + return + } + + if err != nil { + t.Fatalf("unexpected error of type %T found: %v", err, err) + } + + t.Logf("saved as %v", snapshotID.Str()) + + want := test.want + if want == nil { + want = test.src + } + TestEnsureSnapshot(t, repo, snapshotID, want) + + checker.TestCheckRepo(t, repo) + }) + } +} diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go new file mode 100644 index 000000000..5e45d7175 --- /dev/null +++ b/internal/archiver/blob_saver.go @@ -0,0 +1,158 @@ +package archiver + +import ( + "context" + "sync" + + "github.com/restic/restic/internal/restic" +) + +// Saver allows saving a blob. +type Saver interface { + SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID) (restic.ID, error) + Index() restic.Index +} + +// BlobSaver concurrently saves incoming blobs to the repo. +type BlobSaver struct { + repo Saver + + m sync.Mutex + knownBlobs restic.BlobSet + + ch chan<- saveBlobJob + wg sync.WaitGroup +} + +// NewBlobSaver returns a new blob. A worker pool is started, it is stopped +// when ctx is cancelled. +func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { + ch := make(chan saveBlobJob, 2*int(workers)) + s := &BlobSaver{ + repo: repo, + knownBlobs: restic.NewBlobSet(), + ch: ch, + } + + for i := uint(0); i < workers; i++ { + s.wg.Add(1) + go s.worker(ctx, &s.wg, ch) + } + + return s +} + +// Save stores a blob in the repo. It checks the index and the known blobs +// before saving anything. The second return parameter is true if the blob was +// previously unknown. +func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf Buffer) FutureBlob { + ch := make(chan saveBlobResponse, 1) + s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch} + + return FutureBlob{ch: ch, length: len(buf.Data)} +} + +// FutureBlob is returned by SaveBlob and will return the data once it has been processed. +type FutureBlob struct { + ch <-chan saveBlobResponse + length int + res saveBlobResponse +} + +func (s *FutureBlob) wait() { + res, ok := <-s.ch + if ok { + s.res = res + } +} + +// ID returns the ID of the blob after it has been saved. +func (s *FutureBlob) ID() restic.ID { + s.wait() + return s.res.id +} + +// Known returns whether or not the blob was already known. +func (s *FutureBlob) Known() bool { + s.wait() + return s.res.known +} + +// Err returns the error which may have occurred during save. +func (s *FutureBlob) Err() error { + s.wait() + return s.res.err +} + +// Length returns the length of the blob. +func (s *FutureBlob) Length() int { + return s.length +} + +type saveBlobJob struct { + restic.BlobType + buf Buffer + ch chan<- saveBlobResponse +} + +type saveBlobResponse struct { + id restic.ID + known bool + err error +} + +func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) saveBlobResponse { + id := restic.Hash(buf) + h := restic.BlobHandle{ID: id, Type: t} + + // check if another goroutine has already saved this blob + known := false + s.m.Lock() + if s.knownBlobs.Has(h) { + known = true + } else { + s.knownBlobs.Insert(h) + known = false + } + s.m.Unlock() + + // blob is already known, nothing to do + if known { + return saveBlobResponse{ + id: id, + known: true, + } + } + + // check if the repo knows this blob + if s.repo.Index().Has(id, t) { + return saveBlobResponse{ + id: id, + known: true, + } + } + + // otherwise we're responsible for saving it + _, err := s.repo.SaveBlob(ctx, t, buf, id) + return saveBlobResponse{ + id: id, + known: false, + err: err, + } +} + +func (s *BlobSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveBlobJob) { + defer wg.Done() + for { + var job saveBlobJob + select { + case <-ctx.Done(): + return + case job = <-jobs: + } + + job.ch <- s.saveBlob(ctx, job.BlobType, job.buf.Data) + close(job.ch) + job.buf.Release() + } +} diff --git a/internal/archiver/buffer.go b/internal/archiver/buffer.go new file mode 100644 index 000000000..c97d990cf --- /dev/null +++ b/internal/archiver/buffer.go @@ -0,0 +1,90 @@ +package archiver + +import ( + "context" + "sync" +) + +// Buffer is a reusable buffer. After the buffer has been used, Release should +// be called so the underlying slice is put back into the pool. +type Buffer struct { + Data []byte + Put func([]byte) +} + +// Release puts the buffer back into the pool it came from. +func (b Buffer) Release() { + if b.Put != nil { + b.Put(b.Data) + } +} + +// BufferPool implements a limited set of reusable buffers. +type BufferPool struct { + ch chan []byte + chM sync.Mutex + defaultSize int + clearOnce sync.Once +} + +// NewBufferPool initializes a new buffer pool. When the context is cancelled, +// all buffers are released. The pool stores at most max items. New buffers are +// created with defaultSize, buffers that are larger are released and not put +// back. +func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool { + b := &BufferPool{ + ch: make(chan []byte, max), + defaultSize: defaultSize, + } + go func() { + <-ctx.Done() + b.clear() + }() + return b +} + +// Get returns a new buffer, either from the pool or newly allocated. +func (pool *BufferPool) Get() Buffer { + b := Buffer{Put: pool.put} + + pool.chM.Lock() + defer pool.chM.Unlock() + select { + case buf := <-pool.ch: + b.Data = buf + default: + b.Data = make([]byte, pool.defaultSize) + } + + return b +} + +func (pool *BufferPool) put(b []byte) { + pool.chM.Lock() + defer pool.chM.Unlock() + select { + case pool.ch <- b: + default: + } +} + +// Put returns a buffer to the pool for reuse. +func (pool *BufferPool) Put(b Buffer) { + if cap(b.Data) > pool.defaultSize { + return + } + pool.put(b.Data) +} + +// clear empties the buffer so that all items can be garbage collected. +func (pool *BufferPool) clear() { + pool.clearOnce.Do(func() { + ch := pool.ch + pool.chM.Lock() + pool.ch = nil + pool.chM.Unlock() + close(ch) + for range ch { + } + }) +} diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go new file mode 100644 index 000000000..9a923c6c7 --- /dev/null +++ b/internal/archiver/file_saver.go @@ -0,0 +1,228 @@ +package archiver + +import ( + "context" + "io" + "os" + "sync" + + "github.com/restic/chunker" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/restic" +) + +// FutureFile is returned by SaveFile and will return the data once it +// has been processed. +type FutureFile struct { + ch <-chan saveFileResponse + res saveFileResponse +} + +func (s *FutureFile) wait() { + res, ok := <-s.ch + if ok { + s.res = res + } +} + +// Node returns the node once it is available. +func (s *FutureFile) Node() *restic.Node { + s.wait() + return s.res.node +} + +// Stats returns the stats for the file once they are available. +func (s *FutureFile) Stats() ItemStats { + s.wait() + return s.res.stats +} + +// Err returns the error in case an error occurred. +func (s *FutureFile) Err() error { + s.wait() + return s.res.err +} + +// FileSaver concurrently saves incoming files to the repo. +type FileSaver struct { + fs fs.FS + blobSaver *BlobSaver + saveFilePool *BufferPool + + pol chunker.Pol + + ch chan<- saveFileJob + wg sync.WaitGroup + + CompleteBlob func(filename string, bytes uint64) + + NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error) +} + +// NewFileSaver returns a new file saver. A worker pool with workers is +// started, it is stopped when ctx is cancelled. +func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, workers uint) *FileSaver { + ch := make(chan saveFileJob, workers) + + s := &FileSaver{ + fs: fs, + blobSaver: blobSaver, + saveFilePool: NewBufferPool(ctx, 3*int(workers), chunker.MaxSize/4), + pol: pol, + ch: ch, + + CompleteBlob: func(string, uint64) {}, + } + + for i := uint(0); i < workers; i++ { + s.wg.Add(1) + go s.worker(ctx, &s.wg, ch) + } + + return s +} + +// CompleteFunc is called when the file has been saved. +type CompleteFunc func(*restic.Node, ItemStats) + +// Save stores the file f and returns the data once it has been completed. The +// file is closed by Save. +func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureFile { + ch := make(chan saveFileResponse, 1) + s.ch <- saveFileJob{ + snPath: snPath, + file: file, + fi: fi, + start: start, + complete: complete, + ch: ch, + } + + return FutureFile{ch: ch} +} + +type saveFileJob struct { + snPath string + file fs.File + fi os.FileInfo + ch chan<- saveFileResponse + complete CompleteFunc + start func() +} + +type saveFileResponse struct { + node *restic.Node + stats ItemStats + err error +} + +// saveFile stores the file f in the repo, then closes it. +func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, f fs.File, fi os.FileInfo, start func()) saveFileResponse { + start() + + stats := ItemStats{} + + debug.Log("%v", snPath) + + node, err := s.NodeFromFileInfo(f.Name(), fi) + if err != nil { + _ = f.Close() + return saveFileResponse{err: err} + } + + if node.Type != "file" { + _ = f.Close() + return saveFileResponse{err: errors.Errorf("node type %q is wrong", node.Type)} + } + + // reuse the chunker + chnker.Reset(f, s.pol) + + var results []FutureBlob + + node.Content = []restic.ID{} + var size uint64 + for { + buf := s.saveFilePool.Get() + chunk, err := chnker.Next(buf.Data) + if errors.Cause(err) == io.EOF { + buf.Release() + break + } + buf.Data = chunk.Data + + size += uint64(chunk.Length) + + if err != nil { + _ = f.Close() + return saveFileResponse{err: err} + } + + // test if the context has been cancelled, return the error + if ctx.Err() != nil { + _ = f.Close() + return saveFileResponse{err: ctx.Err()} + } + + res := s.blobSaver.Save(ctx, restic.DataBlob, buf) + results = append(results, res) + + // test if the context has been cancelled, return the error + if ctx.Err() != nil { + _ = f.Close() + return saveFileResponse{err: ctx.Err()} + } + + s.CompleteBlob(f.Name(), uint64(len(chunk.Data))) + } + + err = f.Close() + if err != nil { + return saveFileResponse{err: err} + } + + for _, res := range results { + // test if the context has been cancelled, return the error + if res.Err() != nil { + return saveFileResponse{err: ctx.Err()} + } + + if !res.Known() { + stats.DataBlobs++ + stats.DataSize += uint64(res.Length()) + } + + node.Content = append(node.Content, res.ID()) + } + + node.Size = size + + return saveFileResponse{ + node: node, + stats: stats, + } +} + +func (s *FileSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveFileJob) { + // a worker has one chunker which is reused for each file (because it contains a rather large buffer) + chnker := chunker.New(nil, s.pol) + + defer wg.Done() + for { + var job saveFileJob + select { + case <-ctx.Done(): + return + case job = <-jobs: + } + + res := s.saveFile(ctx, chnker, job.snPath, job.file, job.fi, job.start) + if job.complete != nil { + job.complete(res.node, res.stats) + } + job.ch <- res + close(job.ch) + } +} diff --git a/internal/archiver/index_uploader.go b/internal/archiver/index_uploader.go new file mode 100644 index 000000000..c6edb7a01 --- /dev/null +++ b/internal/archiver/index_uploader.go @@ -0,0 +1,53 @@ +package archiver + +import ( + "context" + "time" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/repository" + "github.com/restic/restic/internal/restic" +) + +// IndexUploader polls the repo for full indexes and uploads them. +type IndexUploader struct { + restic.Repository + + // Start is called when an index is to be uploaded. + Start func() + + // Complete is called when uploading an index has finished. + Complete func(id restic.ID) +} + +// Upload periodically uploads full indexes to the repo. When shutdown is +// cancelled, the last index upload will finish and then Upload returns. +func (u IndexUploader) Upload(ctx, shutdown context.Context, interval time.Duration) error { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-shutdown.Done(): + return nil + case <-ticker.C: + full := u.Repository.Index().(*repository.MasterIndex).FullIndexes() + for _, idx := range full { + if u.Start != nil { + u.Start() + } + + id, err := repository.SaveIndex(ctx, u.Repository, idx) + if err != nil { + debug.Log("save indexes returned an error: %v", err) + return err + } + if u.Complete != nil { + u.Complete(id) + } + } + } + } +} diff --git a/internal/archiver/scanner.go b/internal/archiver/scanner.go new file mode 100644 index 000000000..000d2d875 --- /dev/null +++ b/internal/archiver/scanner.go @@ -0,0 +1,112 @@ +package archiver + +import ( + "context" + "os" + "path/filepath" + + "github.com/restic/restic/internal/fs" +) + +// Scanner traverses the targets and calls the function Result with cumulated +// stats concerning the files and folders found. Select is used to decide which +// items should be included. Error is called when an error occurs. +type Scanner struct { + FS fs.FS + Select SelectFunc + Error ErrorFunc + Result func(item string, s ScanStats) +} + +// NewScanner initializes a new Scanner. +func NewScanner(fs fs.FS) *Scanner { + return &Scanner{ + FS: fs, + Select: func(item string, fi os.FileInfo) bool { + return true + }, + Error: func(item string, fi os.FileInfo, err error) error { + return err + }, + Result: func(item string, s ScanStats) {}, + } +} + +// ScanStats collect statistics. +type ScanStats struct { + Files, Dirs, Others uint + Bytes uint64 +} + +// Scan traverses the targets. The function Result is called for each new item +// found, the complete result is also returned by Scan. +func (s *Scanner) Scan(ctx context.Context, targets []string) error { + var stats ScanStats + for _, target := range targets { + abstarget, err := s.FS.Abs(target) + if err != nil { + return err + } + + stats, err = s.scan(ctx, stats, abstarget) + if err != nil { + return err + } + + if ctx.Err() != nil { + return ctx.Err() + } + } + + s.Result("", stats) + return nil +} + +func (s *Scanner) scan(ctx context.Context, stats ScanStats, target string) (ScanStats, error) { + if ctx.Err() != nil { + return stats, ctx.Err() + } + + fi, err := s.FS.Lstat(target) + if err != nil { + // ignore error if the target is to be excluded anyway + if !s.Select(target, nil) { + return stats, nil + } + + // else return filtered error + return stats, s.Error(target, fi, err) + } + + if !s.Select(target, fi) { + return stats, nil + } + + switch { + case fi.Mode().IsRegular(): + stats.Files++ + stats.Bytes += uint64(fi.Size()) + case fi.Mode().IsDir(): + if ctx.Err() != nil { + return stats, ctx.Err() + } + + names, err := readdirnames(s.FS, target) + if err != nil { + return stats, s.Error(target, fi, err) + } + + for _, name := range names { + stats, err = s.scan(ctx, stats, filepath.Join(target, name)) + if err != nil { + return stats, err + } + } + stats.Dirs++ + default: + stats.Others++ + } + + s.Result(target, stats) + return stats, nil +} diff --git a/internal/archiver/scanner_test.go b/internal/archiver/scanner_test.go new file mode 100644 index 000000000..91b8d7f63 --- /dev/null +++ b/internal/archiver/scanner_test.go @@ -0,0 +1,333 @@ +package archiver + +import ( + "context" + "os" + "path/filepath" + "runtime" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/restic/restic/internal/fs" + restictest "github.com/restic/restic/internal/test" +) + +func TestScanner(t *testing.T) { + var tests = []struct { + name string + src TestDir + want map[string]ScanStats + selFn SelectFunc + }{ + { + name: "include-all", + src: TestDir{ + "other": TestFile{Content: "another file"}, + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "foo.txt": TestFile{Content: "foo text file"}, + "subdir": TestDir{ + "other": TestFile{Content: "other in subdir"}, + "bar.txt": TestFile{Content: "bar.txt in subdir"}, + }, + }, + }, + want: map[string]ScanStats{ + filepath.FromSlash("other"): ScanStats{Files: 1, Bytes: 12}, + filepath.FromSlash("work/foo"): ScanStats{Files: 2, Bytes: 15}, + filepath.FromSlash("work/foo.txt"): ScanStats{Files: 3, Bytes: 28}, + filepath.FromSlash("work/subdir/bar.txt"): ScanStats{Files: 4, Bytes: 45}, + filepath.FromSlash("work/subdir/other"): ScanStats{Files: 5, Bytes: 60}, + filepath.FromSlash("work/subdir"): ScanStats{Files: 5, Dirs: 1, Bytes: 60}, + filepath.FromSlash("work"): ScanStats{Files: 5, Dirs: 2, Bytes: 60}, + filepath.FromSlash("."): ScanStats{Files: 5, Dirs: 3, Bytes: 60}, + filepath.FromSlash(""): ScanStats{Files: 5, Dirs: 3, Bytes: 60}, + }, + }, + { + name: "select-txt", + src: TestDir{ + "other": TestFile{Content: "another file"}, + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "foo.txt": TestFile{Content: "foo text file"}, + "subdir": TestDir{ + "other": TestFile{Content: "other in subdir"}, + "bar.txt": TestFile{Content: "bar.txt in subdir"}, + }, + }, + }, + selFn: func(item string, fi os.FileInfo) bool { + if fi.IsDir() { + return true + } + + if filepath.Ext(item) == ".txt" { + return true + } + return false + }, + want: map[string]ScanStats{ + filepath.FromSlash("work/foo.txt"): ScanStats{Files: 1, Bytes: 13}, + filepath.FromSlash("work/subdir/bar.txt"): ScanStats{Files: 2, Bytes: 30}, + filepath.FromSlash("work/subdir"): ScanStats{Files: 2, Dirs: 1, Bytes: 30}, + filepath.FromSlash("work"): ScanStats{Files: 2, Dirs: 2, Bytes: 30}, + filepath.FromSlash("."): ScanStats{Files: 2, Dirs: 3, Bytes: 30}, + filepath.FromSlash(""): ScanStats{Files: 2, Dirs: 3, Bytes: 30}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, cleanup := restictest.TempDir(t) + defer cleanup() + + TestCreateFiles(t, tempdir, test.src) + + back := fs.TestChdir(t, tempdir) + defer back() + + cur, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + + sc := NewScanner(fs.Track{fs.Local{}}) + if test.selFn != nil { + sc.Select = test.selFn + } + + results := make(map[string]ScanStats) + sc.Result = func(item string, s ScanStats) { + var p string + var err error + + if item != "" { + p, err = filepath.Rel(cur, item) + if err != nil { + panic(err) + } + } + + results[p] = s + } + + err = sc.Scan(ctx, []string{"."}) + if err != nil { + t.Fatal(err) + } + + if !cmp.Equal(test.want, results) { + t.Error(cmp.Diff(test.want, results)) + } + }) + } +} + +func TestScannerError(t *testing.T) { + var tests = []struct { + name string + unix bool + src TestDir + result ScanStats + selFn SelectFunc + errFn func(t testing.TB, item string, fi os.FileInfo, err error) error + resFn func(t testing.TB, item string, s ScanStats) + prepare func(t testing.TB) + }{ + { + name: "no-error", + src: TestDir{ + "other": TestFile{Content: "another file"}, + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "foo.txt": TestFile{Content: "foo text file"}, + "subdir": TestDir{ + "other": TestFile{Content: "other in subdir"}, + "bar.txt": TestFile{Content: "bar.txt in subdir"}, + }, + }, + }, + result: ScanStats{Files: 5, Dirs: 3, Bytes: 60}, + }, + { + name: "unreadable-dir", + unix: true, + src: TestDir{ + "other": TestFile{Content: "another file"}, + "work": TestDir{ + "foo": TestFile{Content: "foo"}, + "foo.txt": TestFile{Content: "foo text file"}, + "subdir": TestDir{ + "other": TestFile{Content: "other in subdir"}, + "bar.txt": TestFile{Content: "bar.txt in subdir"}, + }, + }, + }, + result: ScanStats{Files: 3, Dirs: 2, Bytes: 28}, + prepare: func(t testing.TB) { + err := os.Chmod(filepath.Join("work", "subdir"), 0000) + if err != nil { + t.Fatal(err) + } + }, + errFn: func(t testing.TB, item string, fi os.FileInfo, err error) error { + if item == filepath.FromSlash("work/subdir") { + return nil + } + + return err + }, + }, + { + name: "removed-item", + src: TestDir{ + "bar": TestFile{Content: "bar"}, + "baz": TestFile{Content: "baz"}, + "foo": TestFile{Content: "foo"}, + "other": TestFile{Content: "other"}, + }, + result: ScanStats{Files: 3, Dirs: 1, Bytes: 11}, + resFn: func(t testing.TB, item string, s ScanStats) { + if item == "bar" { + err := os.Remove("foo") + if err != nil { + t.Fatal(err) + } + } + }, + errFn: func(t testing.TB, item string, fi os.FileInfo, err error) error { + if item == "foo" { + t.Logf("ignoring error for %v: %v", item, err) + return nil + } + + return err + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.unix && runtime.GOOS == "windows" { + t.Skipf("skip on windows") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, cleanup := restictest.TempDir(t) + defer cleanup() + + TestCreateFiles(t, tempdir, test.src) + + back := fs.TestChdir(t, tempdir) + defer back() + + cur, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + + if test.prepare != nil { + test.prepare(t) + } + + sc := NewScanner(fs.Track{fs.Local{}}) + if test.selFn != nil { + sc.Select = test.selFn + } + + var stats ScanStats + + sc.Result = func(item string, s ScanStats) { + if item == "" { + stats = s + return + } + + if test.resFn != nil { + p, relErr := filepath.Rel(cur, item) + if relErr != nil { + panic(relErr) + } + test.resFn(t, p, s) + } + } + if test.errFn != nil { + sc.Error = func(item string, fi os.FileInfo, err error) error { + p, relErr := filepath.Rel(cur, item) + if relErr != nil { + panic(relErr) + } + + return test.errFn(t, p, fi, err) + } + } + + err = sc.Scan(ctx, []string{"."}) + if err != nil { + t.Fatal(err) + } + + if stats != test.result { + t.Errorf("wrong final result, want\n %#v\ngot:\n %#v", test.result, stats) + } + }) + } +} + +func TestScannerCancel(t *testing.T) { + src := TestDir{ + "bar": TestFile{Content: "bar"}, + "baz": TestFile{Content: "baz"}, + "foo": TestFile{Content: "foo"}, + "other": TestFile{Content: "other"}, + } + + result := ScanStats{Files: 2, Bytes: 6} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, cleanup := restictest.TempDir(t) + defer cleanup() + + TestCreateFiles(t, tempdir, src) + + back := fs.TestChdir(t, tempdir) + defer back() + + cur, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + + sc := NewScanner(fs.Track{fs.Local{}}) + var lastStats ScanStats + sc.Result = func(item string, s ScanStats) { + lastStats = s + + if item == filepath.Join(cur, "baz") { + t.Logf("found baz") + cancel() + } + } + + err = sc.Scan(ctx, []string{"."}) + if err == nil { + t.Errorf("did not find expected error") + } + + if err != context.Canceled { + t.Errorf("unexpected error found, want %v, got %v", context.Canceled, err) + } + + if lastStats != result { + t.Errorf("wrong final result, want\n %#v\ngot:\n %#v", result, lastStats) + } +} diff --git a/internal/archiver/testing.go b/internal/archiver/testing.go index d700135b4..e4bace51a 100644 --- a/internal/archiver/testing.go +++ b/internal/archiver/testing.go @@ -2,10 +2,19 @@ package archiver import ( "context" + "io/ioutil" + "os" + "path" + "path/filepath" + "runtime" + "strings" "testing" "time" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" ) // TestSnapshot creates a new snapshot of path. @@ -17,3 +26,310 @@ func TestSnapshot(t testing.TB, repo restic.Repository, path string, parent *res } return sn } + +// TestDir describes a directory structure to create for a test. +type TestDir map[string]interface{} + +func (d TestDir) String() string { + return "