From c71729dfc42baa905fe91fff79054b38a9357162 Mon Sep 17 00:00:00 2001 From: greatroar <61184462+greatroar@users.noreply.github.com> Date: Fri, 24 Sep 2021 23:51:51 +0200 Subject: [PATCH] Refactor internal/dump + concurrent load/write Package internal/dump has been reworked so its API consists of a single type Dumper that handles tar and zip formats. Tree loading and node writing happen concurrently. --- cmd/restic/cmd_dump.go | 32 +++-------- internal/dump/common.go | 103 ++++++++++++++++++++++------------- internal/dump/common_test.go | 7 ++- internal/dump/tar.go | 38 ++++++------- internal/dump/tar_test.go | 2 +- internal/dump/zip.go | 37 ++++++------- internal/dump/zip_test.go | 2 +- 7 files changed, 111 insertions(+), 110 deletions(-) diff --git a/cmd/restic/cmd_dump.go b/cmd/restic/cmd_dump.go index 4c8ed5b1d..4cb96053c 100644 --- a/cmd/restic/cmd_dump.go +++ b/cmd/restic/cmd_dump.go @@ -67,42 +67,31 @@ func splitPath(p string) []string { return append(s, f) } -func printFromTree(ctx context.Context, tree *restic.Tree, repo restic.Repository, prefix string, pathComponents []string, writeDump dump.WriteDump) error { - if tree == nil { - return fmt.Errorf("called with a nil tree") - } - if repo == nil { - return fmt.Errorf("called with a nil repository") - } - l := len(pathComponents) - if l == 0 { - return fmt.Errorf("empty path components") - } - +func printFromTree(ctx context.Context, tree *restic.Tree, repo restic.Repository, prefix string, pathComponents []string, d *dump.Dumper) error { // If we print / we need to assume that there are multiple nodes at that // level in the tree. if pathComponents[0] == "" { if err := checkStdoutArchive(); err != nil { return err } - return writeDump(ctx, repo, tree, "/", os.Stdout) + return d.DumpTree(ctx, tree, "/") } item := filepath.Join(prefix, pathComponents[0]) + l := len(pathComponents) for _, node := range tree.Nodes { // If dumping something in the highest level it will just take the // first item it finds and dump that according to the switch case below. if node.Name == pathComponents[0] { switch { case l == 1 && dump.IsFile(node): - cache := dump.NewCache() - return dump.WriteNodeData(ctx, os.Stdout, repo, node, cache) + return d.WriteNode(ctx, node) case l > 1 && dump.IsDir(node): subtree, err := repo.LoadTree(ctx, *node.Subtree) if err != nil { return errors.Wrapf(err, "cannot load subtree for %q", item) } - return printFromTree(ctx, subtree, repo, item, pathComponents[1:], writeDump) + return printFromTree(ctx, subtree, repo, item, pathComponents[1:], d) case dump.IsDir(node): if err := checkStdoutArchive(); err != nil { return err @@ -111,7 +100,7 @@ func printFromTree(ctx context.Context, tree *restic.Tree, repo restic.Repositor if err != nil { return err } - return writeDump(ctx, repo, subtree, item, os.Stdout) + return d.DumpTree(ctx, subtree, item) case l > 1: return fmt.Errorf("%q should be a dir, but is a %q", item, node.Type) case !dump.IsFile(node): @@ -129,12 +118,8 @@ func runDump(opts DumpOptions, gopts GlobalOptions, args []string) error { return errors.Fatal("no file and no snapshot ID specified") } - var wd dump.WriteDump switch opts.Archive { - case "tar": - wd = dump.WriteTar - case "zip": - wd = dump.WriteZip + case "tar", "zip": default: return fmt.Errorf("unknown archive format %q", opts.Archive) } @@ -188,7 +173,8 @@ func runDump(opts DumpOptions, gopts GlobalOptions, args []string) error { Exitf(2, "loading tree for snapshot %q failed: %v", snapshotIDString, err) } - err = printFromTree(ctx, tree, repo, "/", splittedPath, wd) + d := dump.New(opts.Archive, repo, os.Stdout) + err = printFromTree(ctx, tree, repo, "/", splittedPath, d) if err != nil { Exitf(2, "cannot dump file: %v", err) } diff --git a/internal/dump/common.go b/internal/dump/common.go index 7ef0c93e4..c3ba69431 100644 --- a/internal/dump/common.go +++ b/internal/dump/common.go @@ -11,49 +11,66 @@ import ( "github.com/restic/restic/internal/walker" ) -// dumper implements saving node data. -type dumper interface { - io.Closer - dumpNode(ctx context.Context, node *restic.Node, repo restic.Repository) error +// A Dumper writes trees and files from a repository to a Writer +// in an archive format. +type Dumper struct { + cache *bloblru.Cache + format string + repo restic.Repository + w io.Writer } -// WriteDump will write the contents of the given tree to the given destination. -// It will loop over all nodes in the tree and dump them recursively. -type WriteDump func(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, dst io.Writer) error - -func NewCache() *bloblru.Cache { - return bloblru.New(64 << 20) +func New(format string, repo restic.Repository, w io.Writer) *Dumper { + return &Dumper{ + cache: bloblru.New(64 << 20), + format: format, + repo: repo, + w: w, + } } -func writeDump(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, dmp dumper) error { - for _, rootNode := range tree.Nodes { - rootNode.Path = rootPath - err := dumpTree(ctx, repo, rootNode, rootPath, dmp) - if err != nil { - // ignore subsequent errors - _ = dmp.Close() +func (d *Dumper) DumpTree(ctx context.Context, tree *restic.Tree, rootPath string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() - return err + // ch is buffered to deal with variable download/write speeds. + ch := make(chan *restic.Node, 10) + go sendTrees(ctx, d.repo, tree, rootPath, ch) + + switch d.format { + case "tar": + return d.dumpTar(ctx, ch) + case "zip": + return d.dumpZip(ctx, ch) + default: + panic("unknown dump format") + } +} + +func sendTrees(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, ch chan *restic.Node) { + defer close(ch) + + for _, root := range tree.Nodes { + root.Path = path.Join(rootPath, root.Name) + if sendNodes(ctx, repo, root, ch) != nil { + break } } - - return dmp.Close() } -func dumpTree(ctx context.Context, repo restic.Repository, rootNode *restic.Node, rootPath string, dmp dumper) error { - rootNode.Path = path.Join(rootNode.Path, rootNode.Name) - rootPath = rootNode.Path - - if err := dmp.dumpNode(ctx, rootNode, repo); err != nil { - return err +func sendNodes(ctx context.Context, repo restic.Repository, root *restic.Node, ch chan *restic.Node) error { + select { + case ch <- root: + case <-ctx.Done(): + return ctx.Err() } // If this is no directory we are finished - if !IsDir(rootNode) { + if !IsDir(root) { return nil } - err := walker.Walk(ctx, repo, *rootNode.Subtree, nil, func(_ restic.ID, nodepath string, node *restic.Node, err error) (bool, error) { + err := walker.Walk(ctx, repo, *root.Subtree, nil, func(_ restic.ID, nodepath string, node *restic.Node, err error) (bool, error) { if err != nil { return false, err } @@ -61,13 +78,16 @@ func dumpTree(ctx context.Context, repo restic.Repository, rootNode *restic.Node return false, nil } - node.Path = path.Join(rootPath, nodepath) + node.Path = path.Join(root.Path, nodepath) - if IsFile(node) || IsLink(node) || IsDir(node) { - err := dmp.dumpNode(ctx, node, repo) - if err != nil { - return false, err - } + if !IsFile(node) && !IsDir(node) && !IsLink(node) { + return false, nil + } + + select { + case ch <- node: + case <-ctx.Done(): + return false, ctx.Err() } return false, nil @@ -76,21 +96,26 @@ func dumpTree(ctx context.Context, repo restic.Repository, rootNode *restic.Node return err } -// WriteNodeData writes the contents of the node to the given Writer. -func WriteNodeData(ctx context.Context, w io.Writer, repo restic.Repository, node *restic.Node, cache *bloblru.Cache) error { +// WriteNode writes a file node's contents directly to d's Writer, +// without caring about d's format. +func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error { + return d.writeNode(ctx, d.w, node) +} + +func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error { var ( buf []byte err error ) for _, id := range node.Content { - blob, ok := cache.Get(id) + blob, ok := d.cache.Get(id) if !ok { - blob, err = repo.LoadBlob(ctx, restic.DataBlob, id, buf) + blob, err = d.repo.LoadBlob(ctx, restic.DataBlob, id, buf) if err != nil { return err } - buf = cache.Add(id, blob) // Reuse evicted buffer. + buf = d.cache.Add(id, blob) // Reuse evicted buffer. } if _, err := w.Write(blob); err != nil { diff --git a/internal/dump/common_test.go b/internal/dump/common_test.go index e15659701..22d059751 100644 --- a/internal/dump/common_test.go +++ b/internal/dump/common_test.go @@ -28,7 +28,7 @@ func prepareTempdirRepoSrc(t testing.TB, src archiver.TestDir) (tempdir string, type CheckDump func(t *testing.T, testDir string, testDump *bytes.Buffer) error -func WriteTest(t *testing.T, wd WriteDump, cd CheckDump) { +func WriteTest(t *testing.T, format string, cd CheckDump) { tests := []struct { name string args archiver.TestDir @@ -92,8 +92,9 @@ func WriteTest(t *testing.T, wd WriteDump, cd CheckDump) { rtest.OK(t, err) dst := &bytes.Buffer{} - if err := wd(ctx, repo, tree, tt.target, dst); err != nil { - t.Fatalf("WriteDump() error = %v", err) + d := New(format, repo, dst) + if err := d.DumpTree(ctx, tree, tt.target); err != nil { + t.Fatalf("Dumper.Run error = %v", err) } if err := cd(t, tmpdir, dst); err != nil { t.Errorf("WriteDump() = does not match: %v", err) diff --git a/internal/dump/tar.go b/internal/dump/tar.go index 57225cf66..65b68ee5b 100644 --- a/internal/dump/tar.go +++ b/internal/dump/tar.go @@ -3,35 +3,30 @@ package dump import ( "archive/tar" "context" - "io" "os" "path/filepath" "strings" - "github.com/restic/restic/internal/bloblru" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" ) -type tarDumper struct { - cache *bloblru.Cache - w *tar.Writer -} +func (d *Dumper) dumpTar(ctx context.Context, ch <-chan *restic.Node) (err error) { + w := tar.NewWriter(d.w) -// Statically ensure that tarDumper implements dumper. -var _ dumper = &tarDumper{} + defer func() { + if err == nil { + err = w.Close() + err = errors.Wrap(err, "Close") + } + }() -// WriteTar will write the contents of the given tree, encoded as a tar to the given destination. -func WriteTar(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, dst io.Writer) error { - dmp := &tarDumper{ - cache: NewCache(), - w: tar.NewWriter(dst), + for node := range ch { + if err := d.dumpNodeTar(ctx, node, w); err != nil { + return err + } } - return writeDump(ctx, repo, tree, rootPath, dmp) -} - -func (dmp *tarDumper) Close() error { - return dmp.w.Close() + return nil } // copied from archive/tar.FileInfoHeader @@ -43,7 +38,7 @@ const ( cISVTX = 0o1000 // Save text (sticky bit) ) -func (dmp *tarDumper) dumpNode(ctx context.Context, node *restic.Node, repo restic.Repository) error { +func (d *Dumper) dumpNodeTar(ctx context.Context, node *restic.Node, w *tar.Writer) error { relPath, err := filepath.Rel("/", node.Path) if err != nil { return err @@ -88,13 +83,12 @@ func (dmp *tarDumper) dumpNode(ctx context.Context, node *restic.Node, repo rest header.Name += "/" } - err = dmp.w.WriteHeader(header) - + err = w.WriteHeader(header) if err != nil { return errors.Wrap(err, "TarHeader") } - return WriteNodeData(ctx, dmp.w, repo, node, dmp.cache) + return d.writeNode(ctx, w, node) } func parseXattrs(xattrs []restic.ExtendedAttribute) map[string]string { diff --git a/internal/dump/tar_test.go b/internal/dump/tar_test.go index ecf9869ae..9f094ae44 100644 --- a/internal/dump/tar_test.go +++ b/internal/dump/tar_test.go @@ -16,7 +16,7 @@ import ( ) func TestWriteTar(t *testing.T) { - WriteTest(t, WriteTar, checkTar) + WriteTest(t, "tar", checkTar) } func checkTar(t *testing.T, testDir string, srcTar *bytes.Buffer) error { diff --git a/internal/dump/zip.go b/internal/dump/zip.go index 96e2c95b9..e5ef5c95b 100644 --- a/internal/dump/zip.go +++ b/internal/dump/zip.go @@ -3,36 +3,31 @@ package dump import ( "archive/zip" "context" - "io" "path/filepath" - "github.com/restic/restic/internal/bloblru" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" ) -type zipDumper struct { - cache *bloblru.Cache - w *zip.Writer -} +func (d *Dumper) dumpZip(ctx context.Context, ch <-chan *restic.Node) (err error) { + w := zip.NewWriter(d.w) -// Statically ensure that zipDumper implements dumper. -var _ dumper = &zipDumper{} + defer func() { + if err == nil { + err = w.Close() + err = errors.Wrap(err, "Close") + } + }() -// WriteZip will write the contents of the given tree, encoded as a zip to the given destination. -func WriteZip(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, dst io.Writer) error { - dmp := &zipDumper{ - cache: NewCache(), - w: zip.NewWriter(dst), + for node := range ch { + if err := d.dumpNodeZip(ctx, node, w); err != nil { + return err + } } - return writeDump(ctx, repo, tree, rootPath, dmp) + return nil } -func (dmp *zipDumper) Close() error { - return dmp.w.Close() -} - -func (dmp *zipDumper) dumpNode(ctx context.Context, node *restic.Node, repo restic.Repository) error { +func (d *Dumper) dumpNodeZip(ctx context.Context, node *restic.Node, zw *zip.Writer) error { relPath, err := filepath.Rel("/", node.Path) if err != nil { return err @@ -49,7 +44,7 @@ func (dmp *zipDumper) dumpNode(ctx context.Context, node *restic.Node, repo rest header.Name += "/" } - w, err := dmp.w.CreateHeader(header) + w, err := zw.CreateHeader(header) if err != nil { return errors.Wrap(err, "ZipHeader") } @@ -62,5 +57,5 @@ func (dmp *zipDumper) dumpNode(ctx context.Context, node *restic.Node, repo rest return nil } - return WriteNodeData(ctx, w, repo, node, dmp.cache) + return d.writeNode(ctx, w, node) } diff --git a/internal/dump/zip_test.go b/internal/dump/zip_test.go index 3b482244c..4d0cb3a51 100644 --- a/internal/dump/zip_test.go +++ b/internal/dump/zip_test.go @@ -15,7 +15,7 @@ import ( ) func TestWriteZip(t *testing.T) { - WriteTest(t, WriteZip, checkZip) + WriteTest(t, "zip", checkZip) } func readZipFile(f *zip.File) ([]byte, error) {