diff --git a/archiver.go b/archiver.go index 983d6f168..c850b11fe 100644 --- a/archiver.go +++ b/archiver.go @@ -14,6 +14,7 @@ import ( "github.com/restic/restic/backend" "github.com/restic/restic/chunker" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/pipe" "github.com/restic/restic/server" ) @@ -29,8 +30,6 @@ const ( type Archiver struct { s *server.Server - m *Map - c *Cache blobToken chan struct{} @@ -50,15 +49,6 @@ func NewArchiver(s *server.Server) (*Archiver, error) { arch.blobToken <- struct{}{} } - // create new map to store all blobs in - arch.m = NewMap() - - // init cache - arch.c, err = NewCache(s) - if err != nil { - return nil, err - } - // abort on all errors arch.Error = func(string, os.FileInfo, error) error { return err } // allow all files @@ -67,119 +57,59 @@ func NewArchiver(s *server.Server) (*Archiver, error) { return arch, nil } -// Cache returns the current cache for the Archiver. -func (arch *Archiver) Cache() *Cache { - return arch.c -} - -// Preload loads all blobs for all cached snapshots. -func (arch *Archiver) Preload() error { - done := make(chan struct{}) - defer close(done) - - // list snapshots - // TODO: track seen tree ids, load trees that aren't in the set - snapshots := 0 - for name := range arch.s.List(backend.Snapshot, done) { - id, err := backend.ParseID(name) - if err != nil { - debug.Log("Archiver.Preload", "unable to parse name %v as id: %v", name, err) - continue - } - - m, err := arch.c.LoadMap(arch.s, id) - if err != nil { - debug.Log("Archiver.Preload", "blobs for snapshot %v not cached: %v", id.Str(), err) - continue - } - - arch.m.Merge(m) - debug.Log("Archiver.Preload", "done loading cached blobs for snapshot %v", id.Str()) - snapshots++ - } - - debug.Log("Archiver.Preload", "Loaded %v blobs from %v snapshots", arch.m.Len(), snapshots) - return nil -} - -func (arch *Archiver) Save(t backend.Type, id backend.ID, length uint, rd io.Reader) (server.Blob, error) { +func (arch *Archiver) Save(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error { debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str()) // test if this blob is already known - blob, err := arch.m.FindID(id) - if err == nil { - debug.Log("Archiver.Save", "Save(%v, %v): reusing %v\n", t, id.Str(), blob.Storage.Str()) - return blob, nil + if arch.s.Index().Has(id) { + debug.Log("Archiver.Save", "(%v, %v) already saved\n", t, id.Str()) + return nil } - // else encrypt and save data - blob, err = arch.s.SaveFrom(t, id, length, rd) - - // store blob in storage map - smapblob := arch.m.Insert(blob) - - // if the map has a different storage id for this plaintext blob, use that - // one and remove the other. This happens if the same plaintext blob was - // stored concurrently and finished earlier than this blob. - if blob.Storage.Compare(smapblob.Storage) != 0 { - debug.Log("Archiver.Save", "using other block, removing %v\n", blob.Storage.Str()) - - // remove the blob again - // TODO: implement a list of blobs in transport, so this doesn't happen so often - err = arch.s.Remove(t, blob.Storage.String()) - if err != nil { - return server.Blob{}, err - } + // otherwise save blob + err := arch.s.SaveFrom(t, id, length, rd) + if err != nil { + debug.Log("Archiver.Save", "Save(%v, %v): error %v\n", t, id.Str(), err) + return err } - debug.Log("Archiver.Save", "Save(%v, %v): new blob %v\n", t, id.Str(), blob) - - return smapblob, nil + debug.Log("Archiver.Save", "Save(%v, %v): new blob\n", t, id.Str()) + return nil } -func (arch *Archiver) SaveTreeJSON(item interface{}) (server.Blob, error) { +func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) { // convert to json data, err := json.Marshal(item) // append newline data = append(data, '\n') if err != nil { - return server.Blob{}, err + return nil, err } // check if tree has been saved before id := backend.Hash(data) - blob, err := arch.m.FindID(id) - // return the blob if we found it - if err == nil { - return blob, nil + if arch.s.Index().Has(id) { + return id, nil } // otherwise save the data - blob, err = arch.s.SaveJSON(backend.Tree, item) - if err != nil { - return server.Blob{}, err - } - - // store blob in storage map - arch.m.Insert(blob) - - return blob, nil + return arch.s.SaveJSON(pack.Tree, item) } // SaveFile stores the content of the file on the backend as a Blob by calling // Save for each chunk. -func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) { +func (arch *Archiver) SaveFile(p *Progress, node *Node) error { file, err := node.OpenForReading() defer file.Close() if err != nil { - return nil, err + return err } // check file again fi, err := file.Stat() if err != nil { - return nil, err + return err } if fi.ModTime() != node.ModTime { @@ -190,7 +120,7 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) { n, err := NodeFromFileInfo(node.path, fi) if err != nil { debug.Log("Archiver.SaveFile", "NodeFromFileInfo returned error for %v: %v", node.path, err) - return nil, err + return err } // copy node @@ -198,12 +128,15 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) { } } - var blobs server.Blobs + type result struct { + id backend.ID + bytes uint64 + } // store all chunks chnker := GetChunker("archiver.SaveFile") chnker.Reset(file, arch.s.ChunkerPolynomial()) - chans := [](<-chan server.Blob){} + chans := [](<-chan result){} defer FreeChunker("archiver.SaveFile", chnker) chunks := 0 @@ -215,75 +148,71 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) { } if err != nil { - return nil, arrar.Annotate(err, "SaveFile() chunker.Next()") + return arrar.Annotate(err, "SaveFile() chunker.Next()") } chunks++ // acquire token, start goroutine to save chunk token := <-arch.blobToken - resCh := make(chan server.Blob, 1) + resCh := make(chan result, 1) - go func(ch chan<- server.Blob) { - blob, err := arch.Save(backend.Data, chunk.Digest, chunk.Length, chunk.Reader(file)) + go func(ch chan<- result) { + err := arch.Save(pack.Data, chunk.Digest, chunk.Length, chunk.Reader(file)) // TODO handle error if err != nil { panic(err) } - p.Report(Stat{Bytes: blob.Size}) + p.Report(Stat{Bytes: uint64(chunk.Length)}) arch.blobToken <- token - ch <- blob + ch <- result{id: backend.ID(chunk.Digest), bytes: uint64(chunk.Length)} }(resCh) chans = append(chans, resCh) } - blobs = []server.Blob{} + results := []result{} for _, ch := range chans { - blobs = append(blobs, <-ch) + results = append(results, <-ch) } - if len(blobs) != chunks { - return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(blobs)) + if len(results) != chunks { + return fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(results)) } var bytes uint64 - node.Content = make([]backend.ID, len(blobs)) + node.Content = make([]backend.ID, len(results)) debug.Log("Archiver.Save", "checking size for file %s", node.path) - for i, blob := range blobs { - node.Content[i] = blob.ID - bytes += blob.Size + for i, b := range results { + node.Content[i] = b.id + bytes += b.bytes - debug.Log("Archiver.Save", " adding blob %s", blob) + debug.Log("Archiver.Save", " adding blob %s, %d bytes", b.id.Str(), b.bytes) } if bytes != node.Size { - return nil, fmt.Errorf("errors saving node %q: saved %d bytes, wanted %d bytes", node.path, bytes, node.Size) + return fmt.Errorf("errors saving node %q: saved %d bytes, wanted %d bytes", node.path, bytes, node.Size) } - debug.Log("Archiver.SaveFile", "SaveFile(%q): %v\n", node.path, blobs) + debug.Log("Archiver.SaveFile", "SaveFile(%q): %v blobs\n", node.path, len(results)) - return blobs, nil + return nil } -func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { +func (arch *Archiver) saveTree(p *Progress, t *Tree) (backend.ID, error) { debug.Log("Archiver.saveTree", "saveTree(%v)\n", t) var wg sync.WaitGroup - // add all blobs to global map - arch.m.Merge(t.Map) - // TODO: do all this in parallel for _, node := range t.Nodes { if node.tree != nil { - b, err := arch.saveTree(p, node.tree) + id, err := arch.saveTree(p, node.tree) if err != nil { - return server.Blob{}, err + return nil, err } - node.Subtree = b.ID - t.Map.Insert(b) + node.Subtree = id p.Report(Stat{Dirs: 1}) } else if node.Type == "file" { if len(node.Content) > 0 { @@ -291,22 +220,18 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { // check content for _, id := range node.Content { - blob, err := t.Map.FindID(id) + packID, _, _, _, err := arch.s.Index().Lookup(id) if err != nil { - debug.Log("Archiver.saveTree", "unable to find storage id for data blob %v", id.Str()) - arch.Error(node.path, nil, fmt.Errorf("unable to find storage id for data blob %v", id.Str())) + debug.Log("Archiver.saveTree", "unable to find storage id for data blob %v: %v", id.Str(), err) + arch.Error(node.path, nil, fmt.Errorf("unable to find storage id for data blob %v: %v", id.Str(), err)) removeContent = true - t.Map.DeleteID(id) - arch.m.DeleteID(id) continue } - if ok, err := arch.s.Test(backend.Data, blob.Storage.String()); !ok || err != nil { - debug.Log("Archiver.saveTree", "blob %v not in repository (error is %v)", blob, err) - arch.Error(node.path, nil, fmt.Errorf("blob %v not in repository (error is %v)", blob.Storage.Str(), err)) + if ok, err := arch.s.Test(backend.Data, packID.String()); !ok || err != nil { + debug.Log("Archiver.saveTree", "pack %v of blob %v not in repository (error is %v)", packID, id, err) + arch.Error(node.path, nil, fmt.Errorf("pack %v of blob %v not in repository (error is %v)", packID, id, err)) removeContent = true - t.Map.DeleteID(id) - arch.m.DeleteID(id) } } @@ -322,12 +247,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { go func(n *Node) { defer wg.Done() - var blobs server.Blobs - blobs, n.err = arch.SaveFile(p, n) - for _, b := range blobs { - t.Map.Insert(b) - } - + n.err = arch.SaveFile(p, n) p.Report(Stat{Files: 1}) }(node) } @@ -341,7 +261,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { // check for invalid file nodes for _, node := range t.Nodes { if node.Type == "file" && node.Content == nil && node.err == nil { - return server.Blob{}, fmt.Errorf("node %v has empty content", node.Name) + return nil, fmt.Errorf("node %v has empty content", node.Name) } // remember used hashes @@ -358,7 +278,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { if node.err != nil { err := arch.Error(node.path, nil, node.err) if err != nil { - return server.Blob{}, err + return nil, err } // save error message in node @@ -366,20 +286,12 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { } } - before := len(t.Map.IDs()) - t.Map.Prune(usedIDs) - after := len(t.Map.IDs()) - - if before != after { - debug.Log("Archiver.saveTree", "pruned %d ids from map for tree %v\n", before-after, t) - } - - blob, err := arch.SaveTreeJSON(t) + id, err := arch.SaveTreeJSON(t) if err != nil { - return server.Blob{}, err + return nil, err } - return blob, nil + return id, nil } func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, entCh <-chan pipe.Entry) { @@ -444,7 +356,7 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st // otherwise read file normally if node.Type == "file" && len(node.Content) == 0 { debug.Log("Archiver.fileWorker", " read and save %v, content: %v", e.Path(), node.Content) - node.blobs, err = arch.SaveFile(p, node) + err = arch.SaveFile(p, node) if err != nil { // TODO: integrate error reporting fmt.Fprintf(os.Stderr, "error for %v: %v\n", node.path, err) @@ -501,11 +413,6 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str if node.Type == "dir" { debug.Log("Archiver.dirWorker", "got tree node for %s: %v", node.path, node.blobs) } - - // also store blob in tree map - for _, blob := range node.blobs { - tree.Map.Insert(blob) - } } var ( @@ -525,14 +432,13 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str } } - blob, err := arch.SaveTreeJSON(tree) + id, err := arch.SaveTreeJSON(tree) if err != nil { panic(err) } - debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), blob) + debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), id.Str()) - node.Subtree = blob.ID - node.blobs = server.Blobs{blob} + node.Subtree = id dir.Result() <- node if dir.Path() != "" { @@ -792,30 +698,35 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn // receive the top-level tree root := (<-resCh).(*Node) - blob := root.blobs[0] - debug.Log("Archiver.Snapshot", "root node received: %v", blob) - sn.Tree = blob + debug.Log("Archiver.Snapshot", "root node received: %v", root.Subtree.Str()) + sn.Tree = root.Subtree // save snapshot - blob, err = arch.s.SaveJSON(backend.Snapshot, sn) + id, err := arch.s.SaveJSONUnpacked(backend.Snapshot, sn) if err != nil { return nil, nil, err } // store ID in snapshot struct - sn.id = blob.Storage + sn.id = id + debug.Log("Archiver.Snapshot", "saved snapshot %v", id.Str()) - debug.Log("Archiver.Snapshot", "saved snapshot %v", blob.Storage.Str()) - - // cache blobs - err = arch.c.StoreMap(sn.id, arch.m) + // flush server + err = arch.s.Flush() if err != nil { - debug.Log("Archiver.Snapshot", "unable to cache blobs for snapshot %v: %v", blob.Storage.Str(), err) - fmt.Fprintf(os.Stderr, "unable to cache blobs for snapshot %v: %v\n", blob.Storage.Str(), err) - return sn, blob.Storage, nil + return nil, nil, err } - return sn, blob.Storage, nil + // save index + indexID, err := arch.s.SaveIndex() + if err != nil { + debug.Log("Archiver.Snapshot", "error saving index: %v", err) + return nil, nil, err + } + + debug.Log("Archiver.Snapshot", "saved index %v", indexID.Str()) + + return sn, id, nil } func isFile(fi os.FileInfo) bool { diff --git a/archiver_test.go b/archiver_test.go index 77e10f17f..8cd45a8ca 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic" "github.com/restic/restic/backend" "github.com/restic/restic/chunker" + "github.com/restic/restic/pack" "github.com/restic/restic/server" . "github.com/restic/restic/test" ) @@ -114,11 +115,7 @@ func BenchmarkChunkEncryptParallel(b *testing.B) { restic.FreeChunkBuf("BenchmarkChunkEncryptParallel", buf) } -func BenchmarkArchiveDirectory(b *testing.B) { - if *benchArchiveDirectory == "" { - b.Skip("benchdir not set, skipping BenchmarkArchiveDirectory") - } - +func archiveDirectory(b testing.TB) { server := SetupBackend(b) defer TeardownBackend(b, server) key := SetupKey(b, server, "geheim") @@ -132,13 +129,25 @@ func BenchmarkArchiveDirectory(b *testing.B) { b.Logf("snapshot archived as %v", id) } -func countBlobs(t testing.TB, server *server.Server) (trees int, data int) { - return server.Count(backend.Tree), server.Count(backend.Data) +func TestArchiveDirectory(t *testing.T) { + if *benchArchiveDirectory == "" { + t.Skip("benchdir not set, skipping TestArchiveDirectory") + } + + archiveDirectory(t) } -func archiveWithPreload(t testing.TB) { +func BenchmarkArchiveDirectory(b *testing.B) { if *benchArchiveDirectory == "" { - t.Skip("benchdir not set, skipping TestArchiverPreload") + b.Skip("benchdir not set, skipping BenchmarkArchiveDirectory") + } + + archiveDirectory(b) +} + +func archiveWithDedup(t testing.TB) { + if *benchArchiveDirectory == "" { + t.Skip("benchdir not set, skipping TestArchiverDedup") } server := SetupBackend(t) @@ -146,78 +155,81 @@ func archiveWithPreload(t testing.TB) { key := SetupKey(t, server, "geheim") server.SetKey(key) + var cnt struct { + before, after, after2 struct { + packs, dataBlobs, treeBlobs uint + } + } + // archive a few files sn := SnapshotDir(t, server, *benchArchiveDirectory, nil) t.Logf("archived snapshot %v", sn.ID().Str()) // get archive stats - beforeTrees, beforeData := countBlobs(t, server) - t.Logf("found %v trees, %v data blobs", beforeTrees, beforeData) + cnt.before.packs = server.Count(backend.Data) + cnt.before.dataBlobs = server.Index().Count(pack.Data) + cnt.before.treeBlobs = server.Index().Count(pack.Tree) + t.Logf("packs %v, data blobs %v, tree blobs %v", + cnt.before.packs, cnt.before.dataBlobs, cnt.before.treeBlobs) // archive the same files again, without parent snapshot sn2 := SnapshotDir(t, server, *benchArchiveDirectory, nil) t.Logf("archived snapshot %v", sn2.ID().Str()) - // get archive stats - afterTrees2, afterData2 := countBlobs(t, server) - t.Logf("found %v trees, %v data blobs", afterTrees2, afterData2) + // get archive stats again + cnt.after.packs = server.Count(backend.Data) + cnt.after.dataBlobs = server.Index().Count(pack.Data) + cnt.after.treeBlobs = server.Index().Count(pack.Tree) + t.Logf("packs %v, data blobs %v, tree blobs %v", + cnt.after.packs, cnt.after.dataBlobs, cnt.after.treeBlobs) - // if there are more blobs, something is wrong - if afterData2 > beforeData { - t.Fatalf("TestArchiverPreload: too many data blobs in repository: before %d, after %d", - beforeData, afterData2) + // if there are more packs or blobs, something is wrong + if cnt.after.packs > cnt.before.packs { + t.Fatalf("TestArchiverDedup: too many packs in repository: before %d, after %d", + cnt.before.packs, cnt.after.packs) + } + if cnt.after.dataBlobs > cnt.before.dataBlobs { + t.Fatalf("TestArchiverDedup: too many data blobs in repository: before %d, after %d", + cnt.before.dataBlobs, cnt.after.dataBlobs) + } + if cnt.after.treeBlobs > cnt.before.treeBlobs { + t.Fatalf("TestArchiverDedup: too many tree blobs in repository: before %d, after %d", + cnt.before.treeBlobs, cnt.after.treeBlobs) } // archive the same files again, with a parent snapshot sn3 := SnapshotDir(t, server, *benchArchiveDirectory, sn2.ID()) t.Logf("archived snapshot %v, parent %v", sn3.ID().Str(), sn2.ID().Str()) - // get archive stats - afterTrees3, afterData3 := countBlobs(t, server) - t.Logf("found %v trees, %v data blobs", afterTrees3, afterData3) + // get archive stats again + cnt.after2.packs = server.Count(backend.Data) + cnt.after2.dataBlobs = server.Index().Count(pack.Data) + cnt.after2.treeBlobs = server.Index().Count(pack.Tree) + t.Logf("packs %v, data blobs %v, tree blobs %v", + cnt.after2.packs, cnt.after2.dataBlobs, cnt.after2.treeBlobs) - // if there are more blobs, something is wrong - if afterData3 > beforeData { - t.Fatalf("TestArchiverPreload: too many data blobs in repository: before %d, after %d", - beforeData, afterData3) + // if there are more packs or blobs, something is wrong + if cnt.after2.packs > cnt.before.packs { + t.Fatalf("TestArchiverDedup: too many packs in repository: before %d, after %d", + cnt.before.packs, cnt.after2.packs) + } + if cnt.after2.dataBlobs > cnt.before.dataBlobs { + t.Fatalf("TestArchiverDedup: too many data blobs in repository: before %d, after %d", + cnt.before.dataBlobs, cnt.after2.dataBlobs) + } + if cnt.after2.treeBlobs > cnt.before.treeBlobs { + t.Fatalf("TestArchiverDedup: too many tree blobs in repository: before %d, after %d", + cnt.before.treeBlobs, cnt.after2.treeBlobs) } } -func TestArchivePreload(t *testing.T) { - archiveWithPreload(t) -} - -func BenchmarkPreload(t *testing.B) { - if *benchArchiveDirectory == "" { - t.Skip("benchdir not set, skipping TestArchiverPreload") - } - - server := SetupBackend(t) - defer TeardownBackend(t, server) - key := SetupKey(t, server, "geheim") - server.SetKey(key) - - // archive a few files - arch, err := restic.NewArchiver(server) - OK(t, err) - sn, _, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) - OK(t, err) - t.Logf("archived snapshot %v", sn.ID()) - - // start benchmark - t.ResetTimer() - - for i := 0; i < t.N; i++ { - // create new archiver and preload - arch2, err := restic.NewArchiver(server) - OK(t, err) - OK(t, arch2.Preload()) - } +func TestArchiveDedup(t *testing.T) { + archiveWithDedup(t) } func BenchmarkLoadTree(t *testing.B) { if *benchArchiveDirectory == "" { - t.Skip("benchdir not set, skipping TestArchiverPreload") + t.Skip("benchdir not set, skipping TestArchiverDedup") } s := SetupBackend(t) @@ -254,7 +266,7 @@ func BenchmarkLoadTree(t *testing.B) { for i := 0; i < t.N; i++ { for _, id := range list { - _, err := restic.LoadTree(s, server.Blob{Storage: id}) + _, err := restic.LoadTree(s, id) OK(t, err) } } diff --git a/backend/generic.go b/backend/generic.go index 173f4e1b1..0720809bd 100644 --- a/backend/generic.go +++ b/backend/generic.go @@ -3,6 +3,7 @@ package backend import ( "crypto/sha256" "errors" + "io" ) const ( @@ -96,3 +97,33 @@ outer: return IDSize, nil } + +// wrap around io.LimitedReader that implements io.ReadCloser +type blobReader struct { + f io.Closer + rd io.Reader + closed bool +} + +func (l *blobReader) Read(p []byte) (int, error) { + n, err := l.rd.Read(p) + if err == io.EOF { + l.Close() + } + + return n, err +} + +func (l *blobReader) Close() error { + if !l.closed { + err := l.f.Close() + l.closed = true + return err + } + + return nil +} + +func LimitReader(f io.ReadCloser, n int64) *blobReader { + return &blobReader{f: f, rd: io.LimitReader(f, n)} +} diff --git a/backend/interface.go b/backend/interface.go index 2256212a8..31cef66d0 100644 --- a/backend/interface.go +++ b/backend/interface.go @@ -18,7 +18,7 @@ const ( Version = 1 ) -// A Backend manages blobs of data. +// A Backend manages data stored somewhere. type Backend interface { // Location returns a string that specifies the location of the repository, // like a URL. @@ -31,6 +31,10 @@ type Backend interface { // Get returns an io.ReadCloser for the Blob with the given name of type t. Get(t Type, name string) (io.ReadCloser, error) + // GetReader returns an io.ReadCloser for the Blob with the given name of + // type t at offset and length. + GetReader(t Type, name string, offset, length uint) (io.ReadCloser, error) + // Test a boolean value whether a Blob with the name and type exists. Test(t Type, name string) (bool, error) diff --git a/backend/local/local.go b/backend/local/local.go index ee1430639..442a5ffcf 100644 --- a/backend/local/local.go +++ b/backend/local/local.go @@ -302,6 +302,26 @@ func (b *Local) Get(t backend.Type, name string) (io.ReadCloser, error) { return os.Open(filename(b.p, t, name)) } +// GetReader returns an io.ReadCloser for the Blob with the given name of +// type t at offset and length. If length is 0, the reader reads until EOF. +func (b *Local) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) { + f, err := os.Open(filename(b.p, t, name)) + if err != nil { + return nil, err + } + + _, err = f.Seek(int64(offset), 0) + if err != nil { + return nil, err + } + + if length == 0 { + return f, nil + } + + return backend.LimitReader(f, int64(length)), nil +} + // Test returns true if a blob of the given type and name exists in the backend. func (b *Local) Test(t backend.Type, name string) (bool, error) { _, err := os.Stat(filename(b.p, t, name)) diff --git a/backend/sftp/sftp.go b/backend/sftp/sftp.go index 5a942be50..115d32ef7 100644 --- a/backend/sftp/sftp.go +++ b/backend/sftp/sftp.go @@ -436,6 +436,26 @@ func (r *SFTP) Get(t backend.Type, name string) (io.ReadCloser, error) { return file, nil } +// GetReader returns an io.ReadCloser for the Blob with the given name of +// type t at offset and length. If length is 0, the reader reads until EOF. +func (r *SFTP) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) { + f, err := r.c.Open(r.filename(t, name)) + if err != nil { + return nil, err + } + + _, err = f.Seek(int64(offset), 0) + if err != nil { + return nil, err + } + + if length == 0 { + return f, nil + } + + return backend.LimitReader(f, int64(length)), nil +} + // Test returns true if a blob of the given type and name exists in the backend. func (r *SFTP) Test(t backend.Type, name string) (bool, error) { _, err := r.c.Lstat(r.filename(t, name)) diff --git a/cache.go b/cache.go index 19b32cb75..81f279ea9 100644 --- a/cache.go +++ b/cache.go @@ -1,14 +1,12 @@ package restic import ( - "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "strings" - "sync" "github.com/restic/restic/backend" "github.com/restic/restic/debug" @@ -211,146 +209,6 @@ func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string, // high-level functions -// RefreshSnapshots loads the maps for all snapshots and saves them to the -// local cache. Old cache entries are purged. -func (c *Cache) RefreshSnapshots(s *server.Server, p *Progress) error { - defer p.Done() - - // list cache entries - entries, err := c.List(backend.Snapshot) - if err != nil { - return err - } - - // list snapshots first - done := make(chan struct{}) - defer close(done) - - // check that snapshot blobs are cached - for name := range s.List(backend.Snapshot, done) { - id, err := backend.ParseID(name) - if err != nil { - continue - } - - // remove snapshot from list of entries - for i, e := range entries { - if e.ID.Equal(id) { - entries = append(entries[:i], entries[i+1:]...) - break - } - } - - has, err := c.Has(backend.Snapshot, "blobs", id) - if err != nil { - return err - } - - if has { - continue - } - - // else start progress reporting - p.Start() - - // build new cache - _, err = cacheSnapshotBlobs(p, s, c, id) - if err != nil { - debug.Log("Cache.RefreshSnapshots", "unable to cache snapshot blobs for %v: %v", id.Str(), err) - return err - } - } - - // remove other entries - for _, e := range entries { - debug.Log("Cache.RefreshSnapshots", "remove entry %v", e) - err = c.Purge(backend.Snapshot, e.Subtype, e.ID) - if err != nil { - return err - } - } - - return nil -} - -// cacheSnapshotBlobs creates a cache of all the blobs used within the -// snapshot. It collects all blobs from all trees and saves the resulting map -// to the cache and returns the map. -func cacheSnapshotBlobs(p *Progress, s *server.Server, c *Cache, id backend.ID) (*Map, error) { - debug.Log("CacheSnapshotBlobs", "create cache for snapshot %v", id.Str()) - - sn, err := LoadSnapshot(s, id) - if err != nil { - debug.Log("CacheSnapshotBlobs", "unable to load snapshot %v: %v", id.Str(), err) - return nil, err - } - - m := NewMap() - - // add top-level node - m.Insert(sn.Tree) - - p.Report(Stat{Trees: 1}) - - // start walker - var wg sync.WaitGroup - ch := make(chan WalkTreeJob) - - wg.Add(1) - go func() { - WalkTree(s, sn.Tree, nil, ch) - wg.Done() - }() - - for i := 0; i < maxConcurrencyPreload; i++ { - wg.Add(1) - go func() { - for job := range ch { - if job.Tree == nil { - continue - } - p.Report(Stat{Trees: 1}) - debug.Log("CacheSnapshotBlobs", "got job %v", job) - m.Merge(job.Tree.Map) - } - - wg.Done() - }() - } - - wg.Wait() - - // save blob list for snapshot - return m, c.StoreMap(id, m) -} - -func (c *Cache) StoreMap(snid backend.ID, m *Map) error { - wr, err := c.Store(backend.Snapshot, "blobs", snid) - if err != nil { - return nil - } - defer wr.Close() - - enc := json.NewEncoder(wr) - err = enc.Encode(m) - if err != nil { - return err - } - - return nil -} - -func (c *Cache) LoadMap(s *server.Server, snid backend.ID) (*Map, error) { - rd, err := c.Load(backend.Snapshot, "blobs", snid) - if err != nil { - return nil, err - } - - m := &Map{} - err = json.NewDecoder(rd).Decode(m) - return m, err -} - // GetCacheDir returns the cache directory according to XDG basedir spec, see // http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html func GetCacheDir() (string, error) { @@ -389,3 +247,5 @@ func GetCacheDir() (string, error) { return cachedir, nil } + +// TODO: implement RefreshIndex() diff --git a/cache_test.go b/cache_test.go index 98c8ff564..aed61de9c 100644 --- a/cache_test.go +++ b/cache_test.go @@ -1,11 +1,9 @@ package restic_test import ( - "encoding/json" "testing" "github.com/restic/restic" - "github.com/restic/restic/backend" . "github.com/restic/restic/test" ) @@ -15,46 +13,14 @@ func TestCache(t *testing.T) { key := SetupKey(t, server, "geheim") server.SetKey(key) - cache, err := restic.NewCache(server) + _, err := restic.NewCache(server) OK(t, err) arch, err := restic.NewArchiver(server) OK(t, err) // archive some files, this should automatically cache all blobs from the snapshot - _, id, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) + _, _, err = arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) - // try to load map from cache - rd, err := cache.Load(backend.Snapshot, "blobs", id) - OK(t, err) - - dec := json.NewDecoder(rd) - - m := &restic.Map{} - err = dec.Decode(m) - OK(t, err) - - // remove cached blob list - OK(t, cache.Purge(backend.Snapshot, "blobs", id)) - - // load map from cache again, this should fail - rd, err = cache.Load(backend.Snapshot, "blobs", id) - Assert(t, err != nil, "Expected failure did not occur") - - // recreate cached blob list - err = cache.RefreshSnapshots(server, nil) - OK(t, err) - - // load map from cache again - rd, err = cache.Load(backend.Snapshot, "blobs", id) - OK(t, err) - - dec = json.NewDecoder(rd) - - m2 := &restic.Map{} - err = dec.Decode(m2) - OK(t, err) - - // compare maps - Assert(t, m.Equals(m2), "Maps are not equal") + // TODO: test caching index } diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index ee53e4c60..cdcda3418 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -96,26 +96,6 @@ func (cmd CmdBackup) Usage() string { return "DIR/FILE [snapshot-ID]" } -func newCacheRefreshProgress() *restic.Progress { - p := restic.NewProgress(time.Second) - p.OnStart = func() { - fmt.Printf("refreshing cache\n") - } - - if !terminal.IsTerminal(int(os.Stdout.Fd())) { - return p - } - - p.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) { - fmt.Printf("\x1b[2K[%s] %d trees loaded\r", formatDuration(d), s.Trees) - } - p.OnDone = func(s restic.Stat, d time.Duration, ticker bool) { - fmt.Printf("\x1b[2Krefreshed cache in %s\n", formatDuration(d)) - } - - return p -} - func newScanProgress() *restic.Progress { if !terminal.IsTerminal(int(os.Stdout.Fd())) { return nil @@ -200,6 +180,11 @@ func (cmd CmdBackup) Execute(args []string) error { return err } + err = s.LoadIndex() + if err != nil { + return err + } + var ( parentSnapshot string parentSnapshotID backend.ID @@ -278,16 +263,7 @@ func (cmd CmdBackup) Execute(args []string) error { return nil } - err = arch.Cache().RefreshSnapshots(s, newCacheRefreshProgress()) - if err != nil { - return err - } - - fmt.Printf("loading blobs\n") - err = arch.Preload() - if err != nil { - return err - } + // TODO: load index _, id, err := arch.Snapshot(newArchiveProgress(stat), target, parentSnapshotID) if err != nil { diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index 850ac4325..c49b7d0e2 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -4,10 +4,13 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/server" ) @@ -24,7 +27,7 @@ func init() { } func (cmd CmdCat) Usage() string { - return "[data|tree|snapshot|key|masterkey|lock] ID" + return "[pack|blob|tree|snapshot|key|masterkey|lock] ID" } func (cmd CmdCat) Execute(args []string) error { @@ -62,37 +65,20 @@ func (cmd CmdCat) Execute(args []string) error { } } + // handle all types that don't need an index switch tpe { - case "data": - // try storage id - data, err := s.LoadID(backend.Data, id) - if err == nil { - _, err = os.Stdout.Write(data) + case "index": + buf, err := s.Load(backend.Index, id) + if err != nil { return err } - _, err = os.Stdout.Write(data) + _, err = os.Stdout.Write(append(buf, '\n')) return err - case "tree": - // try storage id - tree := &restic.Tree{} - err := s.LoadJSONID(backend.Tree, id, tree) - if err != nil { - return err - } - - buf, err := json.MarshalIndent(&tree, "", " ") - if err != nil { - return err - } - - fmt.Println(string(buf)) - - return nil case "snapshot": sn := &restic.Snapshot{} - err = s.LoadJSONID(backend.Snapshot, id, sn) + err = s.LoadJSONEncrypted(backend.Snapshot, id, sn) if err != nil { return err } @@ -136,6 +122,52 @@ func (cmd CmdCat) Execute(args []string) error { return nil case "lock": return errors.New("not yet implemented") + } + + // load index, handle all the other types + err = s.LoadIndex() + if err != nil { + return err + } + + switch tpe { + case "pack": + rd, err := s.Backend().Get(backend.Data, id.String()) + if err != nil { + return err + } + + _, err = io.Copy(os.Stdout, rd) + return err + + case "blob": + data, err := s.LoadBlob(pack.Data, id) + if err == nil { + _, err = os.Stdout.Write(data) + return err + } + + _, err = os.Stdout.Write(data) + return err + + case "tree": + debug.Log("cat", "cat tree %v", id.Str()) + tree := restic.NewTree() + err = s.LoadJSONPack(pack.Tree, id, tree) + if err != nil { + debug.Log("cat", "unable to load tree %v: %v", id.Str(), err) + return err + } + + buf, err := json.MarshalIndent(&tree, "", " ") + if err != nil { + debug.Log("cat", "error json.MarshalIndent(): %v", err) + return err + } + + _, err = os.Stdout.Write(append(buf, '\n')) + return nil + default: return errors.New("invalid type") } diff --git a/cmd/restic/cmd_find.go b/cmd/restic/cmd_find.go index e80bda99f..077752001 100644 --- a/cmd/restic/cmd_find.go +++ b/cmd/restic/cmd_find.go @@ -59,9 +59,9 @@ func parseTime(str string) (time.Time, error) { return time.Time{}, fmt.Errorf("unable to parse time: %q", str) } -func (c CmdFind) findInTree(s *server.Server, blob server.Blob, path string) ([]findResult, error) { - debug.Log("restic.find", "checking tree %v\n", blob) - tree, err := restic.LoadTree(s, blob) +func (c CmdFind) findInTree(s *server.Server, id backend.ID, path string) ([]findResult, error) { + debug.Log("restic.find", "checking tree %v\n", id) + tree, err := restic.LoadTree(s, id) if err != nil { return nil, err } @@ -93,12 +93,7 @@ func (c CmdFind) findInTree(s *server.Server, blob server.Blob, path string) ([] } if node.Type == "dir" { - b, err := tree.Map.FindID(node.Subtree) - if err != nil { - return nil, err - } - - subdirResults, err := c.findInTree(s, b, filepath.Join(path, node.Name)) + subdirResults, err := c.findInTree(s, id, filepath.Join(path, node.Name)) if err != nil { return nil, err } diff --git a/cmd/restic/cmd_fsck.go b/cmd/restic/cmd_fsck.go index 2c360652e..aa7ded31d 100644 --- a/cmd/restic/cmd_fsck.go +++ b/cmd/restic/cmd_fsck.go @@ -7,7 +7,9 @@ import ( "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/crypto" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/server" ) @@ -32,31 +34,31 @@ func init() { } } -func fsckFile(opts CmdFsck, s *server.Server, m *restic.Map, IDs []backend.ID) (uint64, error) { +func fsckFile(opts CmdFsck, s *server.Server, IDs []backend.ID) (uint64, error) { debug.Log("restic.fsckFile", "checking file %v", IDs) var bytes uint64 for _, id := range IDs { debug.Log("restic.fsck", " checking data blob %v\n", id) - // test if blob is in map - blob, err := m.FindID(id) + // test if blob is in the index + packID, tpe, _, length, err := s.Index().Lookup(id) if err != nil { - return 0, fmt.Errorf("storage ID for data blob %v not found", id) + return 0, fmt.Errorf("storage for blob %v (%v) not found", id, tpe) } - bytes += blob.Size - debug.Log("restic.fsck", " data blob found: %v\n", blob) + bytes += uint64(length - crypto.Extension) + debug.Log("restic.fsck", " blob found in pack %v\n", packID) if opts.CheckData { // load content - _, err := s.Load(backend.Data, blob) + _, err := s.LoadBlob(pack.Data, id) if err != nil { return 0, err } } else { // test if data blob is there - ok, err := s.Test(backend.Data, blob.Storage.String()) + ok, err := s.Test(backend.Data, packID.String()) if err != nil { return 0, err } @@ -68,17 +70,17 @@ func fsckFile(opts CmdFsck, s *server.Server, m *restic.Map, IDs []backend.ID) ( // if orphan check is active, record storage id if opts.o_data != nil { - opts.o_data.Insert(blob.Storage) + opts.o_data.Insert(id) } } return bytes, nil } -func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { - debug.Log("restic.fsckTree", "checking tree %v", blob) +func fsckTree(opts CmdFsck, s *server.Server, id backend.ID) error { + debug.Log("restic.fsckTree", "checking tree %v", id.Str()) - tree, err := restic.LoadTree(s, blob) + tree, err := restic.LoadTree(s, id) if err != nil { return err } @@ -86,7 +88,7 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { // if orphan check is active, record storage id if opts.o_trees != nil { // add ID to list - opts.o_trees.Insert(blob.Storage) + opts.o_trees.Insert(id) } var firstErr error @@ -95,23 +97,23 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { for i, node := range tree.Nodes { if node.Name == "" { - return fmt.Errorf("node %v of tree %v has no name", i, blob.ID) + return fmt.Errorf("node %v of tree %v has no name", i, id.Str()) } if node.Type == "" { - return fmt.Errorf("node %q of tree %v has no type", node.Name, blob.ID) + return fmt.Errorf("node %q of tree %v has no type", node.Name, id.Str()) } switch node.Type { case "file": if node.Content == nil { - debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, blob.ID, node) - return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, blob.ID, node) + debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, id, node) + return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, id, node) } if node.Content == nil && node.Error == "" { - debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, blob.ID) - return fmt.Errorf("file node %q of tree %v has no content", node.Name, blob.ID) + debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, id) + return fmt.Errorf("file node %q of tree %v has no content", node.Name, id) } // record ids @@ -119,32 +121,25 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { seenIDs.Insert(id) } - debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, blob.ID.Str()) - bytes, err := fsckFile(opts, s, tree.Map, node.Content) + debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, id.Str()) + bytes, err := fsckFile(opts, s, node.Content) if err != nil { return err } if bytes != node.Size { - debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, blob, node.Size, bytes) - return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, blob, node.Size, bytes) + debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes) + return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes) } case "dir": if node.Subtree == nil { - return fmt.Errorf("dir node %q of tree %v (storage id %v) has no subtree", node.Name, blob.ID, blob.Storage) - } - - // lookup blob - subtreeBlob, err := tree.Map.FindID(node.Subtree) - if err != nil { - firstErr = err - fmt.Fprintf(os.Stderr, "%v\n", err) + return fmt.Errorf("dir node %q of tree %v has no subtree", node.Name, id) } // record id seenIDs.Insert(node.Subtree) - err = fsckTree(opts, s, subtreeBlob) + err = fsckTree(opts, s, node.Subtree) if err != nil { firstErr = err fmt.Fprintf(os.Stderr, "%v\n", err) @@ -153,11 +148,11 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { } // check map for unused ids - for _, id := range tree.Map.IDs() { - if seenIDs.Find(id) != nil { - return fmt.Errorf("tree %v: map contains unused ID %v", blob.ID, id) - } - } + // for _, id := range tree.Map.IDs() { + // if seenIDs.Find(id) != nil { + // return fmt.Errorf("tree %v: map contains unused ID %v", id, id) + // } + // } return firstErr } @@ -170,10 +165,6 @@ func fsckSnapshot(opts CmdFsck, s *server.Server, id backend.ID) error { return fmt.Errorf("loading snapshot %v failed: %v", id, err) } - if !sn.Tree.Valid() { - return fmt.Errorf("snapshot %s has invalid tree %v", sn.ID(), sn.Tree) - } - err = fsckTree(opts, s, sn.Tree) if err != nil { debug.Log("restic.fsck", " checking tree %v for snapshot %v\n", sn.Tree, id) @@ -201,6 +192,11 @@ func (cmd CmdFsck) Execute(args []string) error { return err } + err = s.LoadIndex() + if err != nil { + return err + } + if cmd.Snapshot != "" { name, err := s.FindSnapshot(cmd.Snapshot) if err != nil { diff --git a/cmd/restic/cmd_list.go b/cmd/restic/cmd_list.go index c7ee12f54..01201cef0 100644 --- a/cmd/restic/cmd_list.go +++ b/cmd/restic/cmd_list.go @@ -20,7 +20,7 @@ func init() { } func (cmd CmdList) Usage() string { - return "[data|trees|snapshots|keys|locks]" + return "[blobs|packs|index|snapshots|keys|locks]" } func (cmd CmdList) Execute(args []string) error { @@ -35,10 +35,21 @@ func (cmd CmdList) Execute(args []string) error { var t backend.Type switch args[0] { - case "data": + case "blobs": + err = s.LoadIndex() + if err != nil { + return err + } + + for blob := range s.Index().Each(nil) { + fmt.Println(blob.ID) + } + + return nil + case "packs": t = backend.Data - case "trees": - t = backend.Tree + case "index": + t = backend.Index case "snapshots": t = backend.Snapshot case "keys": diff --git a/cmd/restic/cmd_ls.go b/cmd/restic/cmd_ls.go index ddbe460a3..ff3bbc015 100644 --- a/cmd/restic/cmd_ls.go +++ b/cmd/restic/cmd_ls.go @@ -38,8 +38,8 @@ func printNode(prefix string, n *restic.Node) string { } } -func printTree(prefix string, s *server.Server, blob server.Blob) error { - tree, err := restic.LoadTree(s, blob) +func printTree(prefix string, s *server.Server, id backend.ID) error { + tree, err := restic.LoadTree(s, id) if err != nil { return err } @@ -48,12 +48,7 @@ func printTree(prefix string, s *server.Server, blob server.Blob) error { fmt.Println(printNode(prefix, entry)) if entry.Type == "dir" && entry.Subtree != nil { - b, err := tree.Map.FindID(entry.Subtree) - if err != nil { - return err - } - - err = printTree(filepath.Join(prefix, entry.Name), s, b) + err = printTree(filepath.Join(prefix, entry.Name), s, id) if err != nil { return err } diff --git a/cmd/restic/cmd_restore.go b/cmd/restic/cmd_restore.go index 732b489a6..5d6589a82 100644 --- a/cmd/restic/cmd_restore.go +++ b/cmd/restic/cmd_restore.go @@ -35,6 +35,11 @@ func (cmd CmdRestore) Execute(args []string) error { return err } + err = s.LoadIndex() + if err != nil { + return err + } + name, err := backend.FindSnapshot(s, args[0]) if err != nil { errx(1, "invalid id %q: %v", args[0], err) diff --git a/map.go b/map.go deleted file mode 100644 index eec11d567..000000000 --- a/map.go +++ /dev/null @@ -1,219 +0,0 @@ -package restic - -import ( - "encoding/json" - "errors" - "sort" - "sync" - - "github.com/restic/restic/backend" - "github.com/restic/restic/debug" - "github.com/restic/restic/server" -) - -type Map struct { - list []server.Blob - m sync.Mutex -} - -var ErrBlobNotFound = errors.New("Blob not found") - -func NewMap() *Map { - return &Map{ - list: []server.Blob{}, - } -} - -func (bl *Map) find(blob server.Blob, checkSize bool) (int, server.Blob, error) { - pos := sort.Search(len(bl.list), func(i int) bool { - return blob.ID.Compare(bl.list[i].ID) >= 0 - }) - - if pos < len(bl.list) { - b := bl.list[pos] - if blob.ID.Compare(b.ID) == 0 && (!checkSize || blob.Size == b.Size) { - return pos, b, nil - } - } - - return pos, server.Blob{}, ErrBlobNotFound -} - -func (bl *Map) Find(blob server.Blob) (server.Blob, error) { - bl.m.Lock() - defer bl.m.Unlock() - - _, blob, err := bl.find(blob, true) - return blob, err -} - -func (bl *Map) FindID(id backend.ID) (server.Blob, error) { - bl.m.Lock() - defer bl.m.Unlock() - - _, blob, err := bl.find(server.Blob{ID: id}, false) - return blob, err -} - -func (bl *Map) Merge(other *Map) { - bl.m.Lock() - defer bl.m.Unlock() - other.m.Lock() - defer other.m.Unlock() - - for _, blob := range other.list { - bl.insert(blob) - } -} - -func (bl *Map) insert(blob server.Blob) server.Blob { - pos, b, err := bl.find(blob, true) - if err == nil { - // already present - return b - } - - // insert blob - // https://code.google.com/p/go-wiki/wiki/SliceTricks - bl.list = append(bl.list, server.Blob{}) - copy(bl.list[pos+1:], bl.list[pos:]) - bl.list[pos] = blob - - return blob -} - -func (bl *Map) Insert(blob server.Blob) server.Blob { - bl.m.Lock() - defer bl.m.Unlock() - - debug.Log("Map.Insert", " Map<%p> insert %v", bl, blob) - - return bl.insert(blob) -} - -func (bl *Map) MarshalJSON() ([]byte, error) { - return json.Marshal(bl.list) -} - -func (bl *Map) UnmarshalJSON(data []byte) error { - return json.Unmarshal(data, &bl.list) -} - -func (bl *Map) IDs() []backend.ID { - bl.m.Lock() - defer bl.m.Unlock() - - ids := make([]backend.ID, 0, len(bl.list)) - for _, b := range bl.list { - ids = append(ids, b.ID) - } - - return ids -} - -func (bl *Map) StorageIDs() []backend.ID { - bl.m.Lock() - defer bl.m.Unlock() - - ids := make([]backend.ID, 0, len(bl.list)) - for _, b := range bl.list { - ids = append(ids, b.Storage) - } - - return ids -} - -func (bl *Map) Equals(other *Map) bool { - if bl == nil && other == nil { - return true - } - - if bl == nil || other == nil { - return false - } - - bl.m.Lock() - defer bl.m.Unlock() - - if len(bl.list) != len(other.list) { - debug.Log("Map.Equals", "length does not match: %d != %d", len(bl.list), len(other.list)) - return false - } - - for i := 0; i < len(bl.list); i++ { - if bl.list[i].Compare(other.list[i]) != 0 { - debug.Log("Map.Equals", "entry %d does not match: %v != %v", i, bl.list[i], other.list[i]) - return false - } - } - - return true -} - -// Each calls f for each blob in the Map. While Each is running, no other -// operation is possible, since a mutex is held for the whole time. -func (bl *Map) Each(f func(blob server.Blob)) { - bl.m.Lock() - defer bl.m.Unlock() - - for _, blob := range bl.list { - f(blob) - } -} - -// Select returns a list of of blobs from the plaintext IDs given in list. -func (bl *Map) Select(list backend.IDs) (server.Blobs, error) { - bl.m.Lock() - defer bl.m.Unlock() - - blobs := make(server.Blobs, 0, len(list)) - for _, id := range list { - _, blob, err := bl.find(server.Blob{ID: id}, false) - if err != nil { - return nil, err - } - - blobs = append(blobs, blob) - } - - return blobs, nil -} - -// Len returns the number of blobs in the map. -func (bl *Map) Len() int { - bl.m.Lock() - defer bl.m.Unlock() - - return len(bl.list) -} - -// Prune deletes all IDs from the map except the ones listed in ids. -func (bl *Map) Prune(ids *backend.IDSet) { - bl.m.Lock() - defer bl.m.Unlock() - - pos := 0 - for pos < len(bl.list) { - blob := bl.list[pos] - if ids.Find(blob.ID) != nil { - // remove element - bl.list = append(bl.list[:pos], bl.list[pos+1:]...) - continue - } - - pos++ - } -} - -// DeleteID removes the plaintext ID id from the map. -func (bl *Map) DeleteID(id backend.ID) { - bl.m.Lock() - defer bl.m.Unlock() - - pos, _, err := bl.find(server.Blob{ID: id}, false) - if err != nil { - return - } - - bl.list = append(bl.list[:pos], bl.list[pos+1:]...) -} diff --git a/map_test.go b/map_test.go deleted file mode 100644 index f83fa6828..000000000 --- a/map_test.go +++ /dev/null @@ -1,147 +0,0 @@ -package restic_test - -import ( - "crypto/rand" - "encoding/json" - "flag" - "io" - mrand "math/rand" - "sync" - "testing" - "time" - - "github.com/restic/restic" - "github.com/restic/restic/backend" - "github.com/restic/restic/server" - . "github.com/restic/restic/test" -) - -var maxWorkers = flag.Uint("workers", 20, "number of workers to test Map concurrent access against") - -func randomID() []byte { - buf := make([]byte, backend.IDSize) - _, err := io.ReadFull(rand.Reader, buf) - if err != nil { - panic(err) - } - return buf -} - -func newBlob() server.Blob { - return server.Blob{ - ID: randomID(), - Size: uint64(mrand.Uint32()), - Storage: randomID(), - StorageSize: uint64(mrand.Uint32()), - } -} - -// Test basic functionality -func TestMap(t *testing.T) { - bl := restic.NewMap() - - b := newBlob() - bl.Insert(b) - - for i := 0; i < 1000; i++ { - bl.Insert(newBlob()) - } - - b2, err := bl.Find(server.Blob{ID: b.ID, Size: b.Size}) - OK(t, err) - Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) - - b2, err = bl.FindID(b.ID) - OK(t, err) - Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) - - bl2 := restic.NewMap() - for i := 0; i < 1000; i++ { - bl.Insert(newBlob()) - } - - b2, err = bl2.Find(b) - Assert(t, err != nil, "found ID in restic that was never inserted: %v", b2) - - bl2.Merge(bl) - - b2, err = bl2.Find(b) - - if err != nil { - t.Fatal(err) - } - - if b.Compare(b2) != 0 { - t.Fatalf("items are not equal: want %v, got %v", b, b2) - } -} - -// Test JSON encode/decode -func TestMapJSON(t *testing.T) { - bl := restic.NewMap() - b := server.Blob{ID: randomID()} - bl.Insert(b) - - b2, err := bl.Find(b) - OK(t, err) - Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) - - buf, err := json.Marshal(bl) - OK(t, err) - - bl2 := restic.Map{} - json.Unmarshal(buf, &bl2) - - b2, err = bl2.Find(b) - OK(t, err) - Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) - - buf, err = json.Marshal(bl2) - OK(t, err) -} - -// random insert/find access by several goroutines -func TestMapRandom(t *testing.T) { - var wg sync.WaitGroup - - worker := func(bl *restic.Map) { - defer wg.Done() - - b := newBlob() - bl.Insert(b) - - for i := 0; i < 200; i++ { - bl.Insert(newBlob()) - } - - d := time.Duration(mrand.Intn(10)*100) * time.Millisecond - time.Sleep(d) - - for i := 0; i < 100; i++ { - b2, err := bl.Find(b) - if err != nil { - t.Fatal(err) - } - - if b.Compare(b2) != 0 { - t.Fatalf("items are not equal: want %v, got %v", b, b2) - } - } - - bl2 := restic.NewMap() - for i := 0; i < 200; i++ { - bl2.Insert(newBlob()) - } - - bl2.Merge(bl) - } - - bl := restic.NewMap() - - for i := 0; uint(i) < *maxWorkers; i++ { - wg.Add(1) - go worker(bl) - } - - wg.Wait() -} diff --git a/node.go b/node.go index 45a6a9be0..b29f2fe45 100644 --- a/node.go +++ b/node.go @@ -12,6 +12,7 @@ import ( "github.com/juju/arrar" "github.com/restic/restic/backend" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/server" ) @@ -98,14 +99,14 @@ func nodeTypeFromFileInfo(fi os.FileInfo) string { return "" } -func (node *Node) CreateAt(path string, m *Map, s *server.Server) error { +func (node *Node) CreateAt(path string, s *server.Server) error { switch node.Type { case "dir": if err := node.createDirAt(path); err != nil { return err } case "file": - if err := node.createFileAt(path, m, s); err != nil { + if err := node.createFileAt(path, s); err != nil { return err } case "symlink": @@ -171,7 +172,7 @@ func (node Node) createDirAt(path string) error { return nil } -func (node Node) createFileAt(path string, m *Map, s *server.Server) error { +func (node Node) createFileAt(path string, s *server.Server) error { f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0600) defer f.Close() @@ -179,13 +180,8 @@ func (node Node) createFileAt(path string, m *Map, s *server.Server) error { return arrar.Annotate(err, "OpenFile") } - for _, blobid := range node.Content { - blob, err := m.FindID(blobid) - if err != nil { - return arrar.Annotate(err, "Find Blob") - } - - buf, err := s.Load(backend.Data, blob) + for _, id := range node.Content { + buf, err := s.LoadBlob(pack.Data, id) if err != nil { return arrar.Annotate(err, "Load") } diff --git a/pack/pack.go b/pack/pack.go index b1a3e7ef7..8e4fad93f 100644 --- a/pack/pack.go +++ b/pack/pack.go @@ -19,6 +19,17 @@ const ( Tree = 1 ) +func (t BlobType) String() string { + switch t { + case Data: + return "data" + case Tree: + return "tree" + } + + return fmt.Sprintf("", t) +} + func (t BlobType) MarshalJSON() ([]byte, error) { switch t { case Data: diff --git a/restorer.go b/restorer.go index e7a8d18ca..54e797abc 100644 --- a/restorer.go +++ b/restorer.go @@ -36,8 +36,8 @@ func NewRestorer(s *server.Server, snid backend.ID) (*Restorer, error) { return r, nil } -func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error { - tree, err := LoadTree(res.s, treeBlob) +func (res *Restorer) to(dst string, dir string, treeID backend.ID) error { + tree, err := LoadTree(res.s, treeID) if err != nil { return res.Error(dir, nil, arrar.Annotate(err, "LoadTree")) } @@ -47,7 +47,7 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error { if res.Filter == nil || res.Filter(filepath.Join(dir, node.Name), dstpath, node) { - err := node.CreateAt(dstpath, tree.Map, res.s) + err := node.CreateAt(dstpath, res.s) // Did it fail because of ENOENT? if arrar.Check(err, func(err error) bool { @@ -60,7 +60,7 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error { // Create parent directories and retry err = os.MkdirAll(filepath.Dir(dstpath), 0700) if err == nil || err == os.ErrExist { - err = node.CreateAt(dstpath, tree.Map, res.s) + err = node.CreateAt(dstpath, res.s) } } @@ -74,20 +74,11 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error { if node.Type == "dir" { if node.Subtree == nil { - return fmt.Errorf("Dir without subtree in tree %s", treeBlob) + return fmt.Errorf("Dir without subtree in tree %v", treeID.Str()) } subp := filepath.Join(dir, node.Name) - - subtreeBlob, err := tree.Map.FindID(node.Subtree) - if err != nil { - err = res.Error(subp, node, arrar.Annotate(err, "lookup subtree")) - if err != nil { - return err - } - } - - err = res.to(dst, subp, subtreeBlob) + err = res.to(dst, subp, node.Subtree) if err != nil { err = res.Error(subp, node, arrar.Annotate(err, "restore subtree")) if err != nil { diff --git a/server/index.go b/server/index.go index 8ffefd984..a6271ff69 100644 --- a/server/index.go +++ b/server/index.go @@ -3,6 +3,7 @@ package server import ( "encoding/json" "errors" + "fmt" "io" "sync" @@ -47,8 +48,8 @@ func (idx *Index) Store(t pack.BlobType, id, pack backend.ID, offset, length uin idx.m.Lock() defer idx.m.Unlock() - debug.Log("Index.Store", "pack %v contains id %v, offset %v, length %v", - pack.Str(), id.Str(), offset, length) + debug.Log("Index.Store", "pack %v contains id %v (%v), offset %v, length %v", + pack.Str(), id.Str(), t, offset, length) idx.store(t, id, pack, offset, length, false) } @@ -91,6 +92,22 @@ func (idx *Index) Has(id backend.ID) bool { return false } +// Merge loads all items from other into idx. +func (idx *Index) Merge(other *Index) { + debug.Log("Index.Merge", "Merge index with %p", other) + idx.m.Lock() + defer idx.m.Unlock() + + for k, v := range other.pack { + if _, ok := idx.pack[k]; ok { + debug.Log("Index.Merge", "index already has key %v, updating", k[:8]) + } + + idx.pack[k] = v + } + debug.Log("Index.Merge", "done merging index") +} + // Each returns a channel that yields all blobs known to the index. If done is // closed, the background goroutine terminates. This blocks any modification of // the index. @@ -128,6 +145,22 @@ func (idx *Index) Each(done chan struct{}) <-chan pack.Blob { return ch } +// Count returns the number of blobs of type t in the index. +func (idx *Index) Count(t pack.BlobType) (n uint) { + debug.Log("Index.Count", "counting blobs of type %v", t) + idx.m.Lock() + defer idx.m.Unlock() + + for id, blob := range idx.pack { + if blob.tpe == t { + n++ + debug.Log("Index.Count", " blob %v counted: %v", id[:8], blob) + } + } + + return +} + type packJSON struct { ID string `json:"id"` Blobs []blobJSON `json:"blobs"` @@ -144,6 +177,7 @@ type blobJSON struct { // serialization only contains new blobs added via idx.Store(), not old ones // introduced via DecodeIndex(). func (idx *Index) Encode(w io.Writer) error { + debug.Log("Index.Encode", "encoding index") idx.m.Lock() defer idx.m.Unlock() @@ -155,6 +189,14 @@ func (idx *Index) Encode(w io.Writer) error { continue } + debug.Log("Index.Encode", "handle blob %q", id[:8]) + + if blob.packID == nil { + debug.Log("Index.Encode", "blob %q has no packID! (type %v, offset %v, length %v)", + id[:8], blob.tpe, blob.offset, blob.length) + return fmt.Errorf("unable to serialize index: pack for blob %v hasn't been written yet", id) + } + // see if pack is already in map p, ok := packs[blob.packID.String()] if !ok { @@ -175,12 +217,15 @@ func (idx *Index) Encode(w io.Writer) error { }) } + debug.Log("Index.Encode", "done") + enc := json.NewEncoder(w) return enc.Encode(list) } // DecodeIndex loads and unserializes an index from rd. func DecodeIndex(rd io.Reader) (*Index, error) { + debug.Log("Index.DecodeIndex", "Start decoding index") list := []*packJSON{} dec := json.NewDecoder(rd) @@ -193,12 +238,14 @@ func DecodeIndex(rd io.Reader) (*Index, error) { for _, pack := range list { packID, err := backend.ParseID(pack.ID) if err != nil { + debug.Log("Index.DecodeIndex", "error parsing pack ID %q: %v", pack.ID, err) return nil, err } for _, blob := range pack.Blobs { blobID, err := backend.ParseID(blob.ID) if err != nil { + debug.Log("Index.DecodeIndex", "error parsing blob ID %q: %v", blob.ID, err) return nil, err } @@ -206,5 +253,6 @@ func DecodeIndex(rd io.Reader) (*Index, error) { } } + debug.Log("Index.DecodeIndex", "done") return idx, err } diff --git a/server/server.go b/server/server.go index d00d47f4f..716c93c18 100644 --- a/server/server.go +++ b/server/server.go @@ -1,24 +1,35 @@ package server import ( + "bytes" "crypto/sha256" "encoding/json" "errors" "fmt" "io" "io/ioutil" + "sync" "github.com/restic/restic/backend" "github.com/restic/restic/chunker" + "github.com/restic/restic/debug" + "github.com/restic/restic/pack" ) type Server struct { be backend.Backend key *Key + idx *Index + + pm sync.Mutex + packs []*pack.Packer } func NewServer(be backend.Backend) *Server { - return &Server{be: be} + return &Server{ + be: be, + idx: NewIndex(), + } } func (s *Server) SetKey(k *Key) { @@ -49,14 +60,15 @@ func (s *Server) PrefixLength(t backend.Type) (int, error) { return backend.PrefixLength(s.be, t) } -// Load tries to load and decrypt content identified by t and blob from the -// backend. If the blob specifies an ID, the decrypted plaintext is checked -// against this ID. The same goes for blob.Size and blob.StorageSize: If they -// are set to a value > 0, this value is checked. -func (s *Server) Load(t backend.Type, blob Blob) ([]byte, error) { - // load data - rd, err := s.be.Get(t, blob.Storage.String()) +// Load tries to load and decrypt content identified by t and id from the +// backend. +func (s *Server) Load(t backend.Type, id backend.ID) ([]byte, error) { + debug.Log("Server.Load", "load %v with id %v", t, id.Str()) + + // load blob from pack + rd, err := s.be.Get(t, id.String()) if err != nil { + debug.Log("Server.Load", "error loading %v: %v", id.Str(), err) return nil, err } @@ -65,58 +77,78 @@ func (s *Server) Load(t backend.Type, blob Blob) ([]byte, error) { return nil, err } - // check hash - if !backend.Hash(buf).Equal(blob.Storage) { - return nil, errors.New("invalid data returned") - } - - // check length - if blob.StorageSize > 0 && len(buf) != int(blob.StorageSize) { - return nil, errors.New("Invalid storage length") - } - - // decrypt - buf, err = s.Decrypt(buf) + err = rd.Close() if err != nil { return nil, err } - // check length - if blob.Size > 0 && len(buf) != int(blob.Size) { - return nil, errors.New("Invalid length") + // check hash + if !backend.Hash(buf).Equal(id) { + return nil, errors.New("invalid data returned") } - // check SHA256 sum - if blob.ID != nil { - id := backend.Hash(buf) - if !blob.ID.Equal(id) { - return nil, fmt.Errorf("load %v: expected plaintext hash %v, got %v", blob.Storage, blob.ID, id) - } - } - - return buf, nil -} - -// Load tries to load and decrypt content identified by t and id from the backend. -func (s *Server) LoadID(t backend.Type, storageID backend.ID) ([]byte, error) { - return s.Load(t, Blob{Storage: storageID}) -} - -// LoadJSON calls Load() to get content from the backend and afterwards calls -// json.Unmarshal on the item. -func (s *Server) LoadJSON(t backend.Type, blob Blob, item interface{}) error { - buf, err := s.Load(t, blob) + // decrypt + plain, err := s.Decrypt(buf) if err != nil { - return err + return nil, err } - return json.Unmarshal(buf, item) + return plain, nil } -// LoadJSONID calls Load() to get content from the backend and afterwards calls -// json.Unmarshal on the item. -func (s *Server) LoadJSONID(t backend.Type, id backend.ID, item interface{}) error { - // read +// LoadBlob tries to load and decrypt content identified by t and id from a +// pack from the backend. +func (s *Server) LoadBlob(t pack.BlobType, id backend.ID) ([]byte, error) { + debug.Log("Server.LoadPack", "load %v with id %v", t, id.Str()) + // lookup pack + packID, tpe, offset, length, err := s.idx.Lookup(id) + if err != nil { + debug.Log("Server.LoadPack", "id %v not found in index: %v", id.Str(), err) + return nil, err + } + + if tpe != t { + debug.Log("Server.LoadPack", "wrong type returned for %v: wanted %v, got %v", id.Str(), t, tpe) + return nil, fmt.Errorf("blob has wrong type %v (wanted: %v)", tpe, t) + } + + debug.Log("Server.LoadPack", "id %v found in pack %v at offset %v (length %d)", id.Str(), packID.Str(), offset, length) + + // load blob from pack + rd, err := s.be.GetReader(backend.Data, packID.String(), offset, length) + if err != nil { + debug.Log("Server.LoadPack", "error loading pack %v for %v: %v", packID.Str(), id.Str(), err) + return nil, err + } + + buf, err := ioutil.ReadAll(rd) + if err != nil { + return nil, err + } + + err = rd.Close() + if err != nil { + return nil, err + } + + // decrypt + plain, err := s.Decrypt(buf) + if err != nil { + return nil, err + } + + // check hash + if !backend.Hash(plain).Equal(id) { + return nil, errors.New("invalid data returned") + } + + return plain, nil +} + +// LoadJSONEncrypted decrypts the data and afterwards calls json.Unmarshal on +// the item. +func (s *Server) LoadJSONEncrypted(t backend.Type, id backend.ID, item interface{}) error { + // load blob from backend rd, err := s.be.Get(t, id.String()) if err != nil { return err @@ -140,132 +172,254 @@ func (s *Server) LoadJSONID(t backend.Type, id backend.ID, item interface{}) err return nil } -// Save encrypts data and stores it to the backend as type t. -func (s *Server) Save(t backend.Type, data []byte, id backend.ID) (Blob, error) { +// LoadJSONPack calls LoadBlob() to load a blob from the backend, decrypt the +// data and afterwards call json.Unmarshal on the item. +func (s *Server) LoadJSONPack(t pack.BlobType, id backend.ID, item interface{}) error { + // lookup pack + packID, _, offset, length, err := s.idx.Lookup(id) + if err != nil { + return err + } + + // load blob from pack + rd, err := s.be.GetReader(backend.Data, packID.String(), offset, length) + if err != nil { + return err + } + defer rd.Close() + + // decrypt + decryptRd, err := s.key.DecryptFrom(rd) + defer decryptRd.Close() + if err != nil { + return err + } + + // decode + decoder := json.NewDecoder(decryptRd) + err = decoder.Decode(item) + if err != nil { + return err + } + + return nil +} + +const minPackSize = 4 * chunker.MiB +const maxPackSize = 16 * chunker.MiB +const maxPackers = 200 + +// findPacker returns a packer for a new blob of size bytes. Either a new one is +// created or one is returned that already has some blobs. +func (s *Server) findPacker(size uint) (*pack.Packer, error) { + s.pm.Lock() + defer s.pm.Unlock() + + // search for a suitable packer + if len(s.packs) > 0 { + debug.Log("Server.findPacker", "searching packer for %d bytes\n", size) + for i, p := range s.packs { + if p.Size()+size < maxPackSize { + debug.Log("Server.findPacker", "found packer %v", p) + // remove from list + s.packs = append(s.packs[:i], s.packs[i+1:]...) + return p, nil + } + } + } + + // no suitable packer found, return new + blob, err := s.be.Create() + if err != nil { + return nil, err + } + debug.Log("Server.findPacker", "create new pack %p", blob) + return pack.NewPacker(s.key.Master(), blob), nil +} + +// insertPacker appends p to s.packs. +func (s *Server) insertPacker(p *pack.Packer) { + s.pm.Lock() + defer s.pm.Unlock() + + s.packs = append(s.packs, p) + debug.Log("Server.insertPacker", "%d packers\n", len(s.packs)) +} + +// savePacker stores p in the backend. +func (s *Server) savePacker(p *pack.Packer) error { + debug.Log("Server.savePacker", "save packer with %d blobs\n", p.Count()) + _, err := p.Finalize() + if err != nil { + return err + } + + // move file to the final location + sid := p.ID() + err = p.Writer().(backend.Blob).Finalize(backend.Data, sid.String()) + if err != nil { + debug.Log("Server.savePacker", "blob Finalize() error: %v", err) + return err + } + + debug.Log("Server.savePacker", "saved as %v", sid.Str()) + + // update blobs in the index + for _, b := range p.Blobs() { + debug.Log("Server.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str()) + s.idx.Store(b.Type, b.ID, sid, b.Offset, uint(b.Length)) + } + + return nil +} + +// countPacker returns the number of open (unfinished) packers. +func (s *Server) countPacker() int { + s.pm.Lock() + defer s.pm.Unlock() + + return len(s.packs) +} + +// Save encrypts data and stores it to the backend as type t. If data is small +// enough, it will be packed together with other small blobs. +func (s *Server) Save(t pack.BlobType, data []byte, id backend.ID) (backend.ID, error) { if id == nil { // compute plaintext hash id = backend.Hash(data) } - // create a new blob - blob := Blob{ - ID: id, - Size: uint64(len(data)), - } + debug.Log("Server.Save", "save id %v (%v, %d bytes)", id.Str(), t, len(data)) + // get buf from the pool ciphertext := getBuf() defer freeBuf(ciphertext) // encrypt blob ciphertext, err := s.Encrypt(ciphertext, data) if err != nil { - return Blob{}, err + return nil, err } - // compute ciphertext hash - sid := backend.Hash(ciphertext) - - // save blob - backendBlob, err := s.be.Create() + // find suitable packer and add blob + packer, err := s.findPacker(uint(len(ciphertext))) if err != nil { - return Blob{}, err + return nil, err } - _, err = backendBlob.Write(ciphertext) - if err != nil { - return Blob{}, err + // save ciphertext + packer.Add(t, id, bytes.NewReader(ciphertext)) + + // add this id to the index, although we don't know yet in which pack it + // will be saved, the entry will be updated when the pack is written. + s.idx.Store(t, id, nil, 0, 0) + debug.Log("Server.Save", "saving stub for %v (%v) in index", id.Str, t) + + // if the pack is not full enough and there are less than maxPackers + // packers, put back to the list + if packer.Size() < minPackSize && s.countPacker() < maxPackers { + debug.Log("Server.Save", "pack is not full enough (%d bytes)", packer.Size()) + s.insertPacker(packer) + return id, nil } - err = backendBlob.Finalize(t, sid.String()) - if err != nil { - return Blob{}, err - } - - blob.Storage = sid - blob.StorageSize = uint64(len(ciphertext)) - - return blob, nil + // else write the pack to the backend + return id, s.savePacker(packer) } -// SaveFrom encrypts data read from rd and stores it to the backend as type t. -func (s *Server) SaveFrom(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) { +// SaveFrom encrypts data read from rd and stores it in a pack in the backend as type t. +func (s *Server) SaveFrom(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error { + debug.Log("Server.SaveFrom", "save id %v (%v, %d bytes)", id.Str(), t, length) if id == nil { - return Blob{}, errors.New("id is nil") + return errors.New("id is nil") } - backendBlob, err := s.be.Create() + buf, err := ioutil.ReadAll(rd) if err != nil { - return Blob{}, err + return err } - hw := backend.NewHashingWriter(backendBlob, sha256.New()) - encWr := s.key.EncryptTo(hw) - - _, err = io.Copy(encWr, rd) + _, err = s.Save(t, buf, id) if err != nil { - return Blob{}, err + return err } - // finish encryption - err = encWr.Close() - if err != nil { - return Blob{}, fmt.Errorf("EncryptedWriter.Close(): %v", err) - } - - // finish backend blob - sid := backend.ID(hw.Sum(nil)) - err = backendBlob.Finalize(t, sid.String()) - if err != nil { - return Blob{}, fmt.Errorf("backend.Blob.Close(): %v", err) - } - - return Blob{ - ID: id, - Size: uint64(length), - Storage: sid, - StorageSize: uint64(backendBlob.Size()), - }, nil + return nil } -// SaveJSON serialises item as JSON and encrypts and saves it in the backend as -// type t. -func (s *Server) SaveJSON(t backend.Type, item interface{}) (Blob, error) { - backendBlob, err := s.be.Create() +// SaveJSON serialises item as JSON and encrypts and saves it in a pack in the +// backend as type t. +func (s *Server) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, error) { + debug.Log("Server.SaveJSON", "save %v blob", t) + buf := getBuf()[:0] + defer freeBuf(buf) + + wr := bytes.NewBuffer(buf) + + enc := json.NewEncoder(wr) + err := enc.Encode(item) if err != nil { - return Blob{}, fmt.Errorf("Create: %v", err) + return nil, fmt.Errorf("json.Encode: %v", err) } - storagehw := backend.NewHashingWriter(backendBlob, sha256.New()) - encWr := s.key.EncryptTo(storagehw) - plainhw := backend.NewHashingWriter(encWr, sha256.New()) + buf = wr.Bytes() + return s.Save(t, buf, nil) +} - enc := json.NewEncoder(plainhw) +// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the +// backend as type t, without a pack. It returns the storage hash. +func (s *Server) SaveJSONUnpacked(t backend.Type, item interface{}) (backend.ID, error) { + // create blob + blob, err := s.be.Create() + if err != nil { + return nil, err + } + debug.Log("Server.SaveJSONUnpacked", "create new pack %p", blob) + + // hash + hw := backend.NewHashingWriter(blob, sha256.New()) + + // encrypt blob + ewr := s.key.EncryptTo(hw) + + enc := json.NewEncoder(ewr) err = enc.Encode(item) if err != nil { - return Blob{}, fmt.Errorf("json.NewEncoder: %v", err) + return nil, fmt.Errorf("json.Encode: %v", err) } - // finish encryption - err = encWr.Close() + err = ewr.Close() if err != nil { - return Blob{}, fmt.Errorf("EncryptedWriter.Close(): %v", err) + return nil, err } - // finish backend blob - sid := backend.ID(storagehw.Sum(nil)) - err = backendBlob.Finalize(t, sid.String()) + // finalize blob in the backend + sid := backend.ID(hw.Sum(nil)) + + err = blob.Finalize(t, sid.String()) if err != nil { - return Blob{}, fmt.Errorf("backend.Blob.Close(): %v", err) + return nil, err } - id := backend.ID(plainhw.Sum(nil)) + return sid, nil +} - return Blob{ - ID: id, - Size: uint64(plainhw.Size()), - Storage: sid, - StorageSize: uint64(backendBlob.Size()), - }, nil +// Flush saves all remaining packs. +func (s *Server) Flush() error { + s.pm.Lock() + defer s.pm.Unlock() + + debug.Log("Server.Flush", "manually flushing %d packs", len(s.packs)) + + for _, p := range s.packs { + err := s.savePacker(p) + if err != nil { + return err + } + } + s.packs = s.packs[:0] + + return nil } // Returns the backend used for this server. @@ -273,6 +427,106 @@ func (s *Server) Backend() backend.Backend { return s.be } +// Returns the index of this server. +func (s *Server) Index() *Index { + return s.idx +} + +// SetIndex instructs the server to use the given index. +func (s *Server) SetIndex(i *Index) { + s.idx = i +} + +// SaveIndex saves all new packs in the index in the backend, returned is the +// storage ID. +func (s *Server) SaveIndex() (backend.ID, error) { + debug.Log("Server.SaveIndex", "Saving index") + + // create blob + blob, err := s.be.Create() + if err != nil { + return nil, err + } + + debug.Log("Server.SaveIndex", "create new pack %p", blob) + + // hash + hw := backend.NewHashingWriter(blob, sha256.New()) + + // encrypt blob + ewr := s.key.EncryptTo(hw) + + err = s.idx.Encode(ewr) + if err != nil { + return nil, err + } + + err = ewr.Close() + if err != nil { + return nil, err + } + + // finalize blob in the backend + sid := backend.ID(hw.Sum(nil)) + + err = blob.Finalize(backend.Index, sid.String()) + if err != nil { + return nil, err + } + + debug.Log("Server.SaveIndex", "Saved index as %v", sid.Str()) + + return sid, nil +} + +// LoadIndex loads all index files from the backend and merges them with the +// current index. +func (s *Server) LoadIndex() error { + debug.Log("Server.LoadIndex", "Loading index") + done := make(chan struct{}) + defer close(done) + + for id := range s.be.List(backend.Index, done) { + err := s.loadIndex(id) + if err != nil { + return err + } + } + return nil +} + +// loadIndex loads the index id and merges it with the currently used index. +func (s *Server) loadIndex(id string) error { + debug.Log("Server.loadIndex", "Loading index %v", id[:8]) + before := len(s.idx.pack) + + rd, err := s.be.Get(backend.Index, id) + defer rd.Close() + if err != nil { + return err + } + + // decrypt + decryptRd, err := s.key.DecryptFrom(rd) + defer decryptRd.Close() + if err != nil { + return err + } + + idx, err := DecodeIndex(decryptRd) + if err != nil { + debug.Log("Server.loadIndex", "error while decoding index %v: %v", id, err) + return err + } + + s.idx.Merge(idx) + + after := len(s.idx.pack) + debug.Log("Server.loadIndex", "Loaded index %v, added %v blobs", id[:8], after-before) + + return nil +} + func (s *Server) SearchKey(password string) error { key, err := SearchKey(s, password) if err != nil { @@ -289,7 +543,7 @@ func (s *Server) Decrypt(ciphertext []byte) ([]byte, error) { return nil, errors.New("key for server not set") } - return s.key.Decrypt([]byte{}, ciphertext) + return s.key.Decrypt(nil, ciphertext) } func (s *Server) Encrypt(ciphertext, plaintext []byte) ([]byte, error) { @@ -305,8 +559,8 @@ func (s *Server) Key() *Key { } // Count returns the number of blobs of a given type in the backend. -func (s *Server) Count(t backend.Type) (n int) { - for _ = range s.List(t, nil) { +func (s *Server) Count(t backend.Type) (n uint) { + for _ = range s.be.List(t, nil) { n++ } diff --git a/server/server_test.go b/server/server_test.go index 414b4f0cc..51556ee94 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -11,6 +11,7 @@ import ( "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/pack" . "github.com/restic/restic/test" ) @@ -38,12 +39,12 @@ func TestSaveJSON(t *testing.T) { data = append(data, '\n') h := sha256.Sum256(data) - blob, err := server.SaveJSON(backend.Tree, obj) + id, err := server.SaveJSON(pack.Tree, obj) OK(t, err) - Assert(t, bytes.Equal(h[:], blob.ID), + Assert(t, bytes.Equal(h[:], id), "TestSaveJSON: wrong plaintext ID: expected %02x, got %02x", - h, blob.ID) + h, id) } } @@ -63,17 +64,51 @@ func BenchmarkSaveJSON(t *testing.B) { t.ResetTimer() for i := 0; i < t.N; i++ { - blob, err := server.SaveJSON(backend.Tree, obj) + id, err := server.SaveJSON(pack.Tree, obj) OK(t, err) - Assert(t, bytes.Equal(h[:], blob.ID), + Assert(t, bytes.Equal(h[:], id), "TestSaveJSON: wrong plaintext ID: expected %02x, got %02x", - h, blob.ID) + h, id) } } var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20} +func TestSave(t *testing.T) { + server := SetupBackend(t) + defer TeardownBackend(t, server) + key := SetupKey(t, server, "geheim") + server.SetKey(key) + + for _, size := range testSizes { + data := make([]byte, size) + _, err := io.ReadFull(rand.Reader, data) + OK(t, err) + + id := backend.Hash(data) + + // save + sid, err := server.Save(pack.Data, data, nil) + OK(t, err) + + Equals(t, id, sid) + + OK(t, server.Flush()) + + // read back + buf, err := server.LoadBlob(pack.Data, id) + + Assert(t, len(buf) == len(data), + "number of bytes read back does not match: expected %d, got %d", + len(data), len(buf)) + + Assert(t, bytes.Equal(buf, data), + "data does not match: expected %02x, got %02x", + data, buf) + } +} + func TestSaveFrom(t *testing.T) { server := SetupBackend(t) defer TeardownBackend(t, server) @@ -85,14 +120,16 @@ func TestSaveFrom(t *testing.T) { _, err := io.ReadFull(rand.Reader, data) OK(t, err) - id := sha256.Sum256(data) + id := backend.Hash(data) // save - blob, err := server.SaveFrom(backend.Data, id[:], uint(size), bytes.NewReader(data)) + err = server.SaveFrom(pack.Data, id[:], uint(size), bytes.NewReader(data)) OK(t, err) + OK(t, server.Flush()) + // read back - buf, err := server.Load(backend.Data, blob) + buf, err := server.LoadBlob(pack.Data, id[:]) Assert(t, len(buf) == len(data), "number of bytes read back does not match: expected %d, got %d", @@ -123,12 +160,12 @@ func BenchmarkSaveFrom(t *testing.B) { for i := 0; i < t.N; i++ { // save - _, err := server.SaveFrom(backend.Data, id[:], uint(size), bytes.NewReader(data)) + err = server.SaveFrom(pack.Data, id[:], uint(size), bytes.NewReader(data)) OK(t, err) } } -func TestLoadJSONID(t *testing.T) { +func TestLoadJSONPack(t *testing.T) { if *benchTestDir == "" { t.Skip("benchdir not set, skipping TestServerStats") } @@ -140,23 +177,14 @@ func TestLoadJSONID(t *testing.T) { // archive a few files sn := SnapshotDir(t, server, *benchTestDir, nil) - t.Logf("archived snapshot %v", sn.ID()) - - // benchmark loading first tree - done := make(chan struct{}) - first, found := <-server.List(backend.Tree, done) - Assert(t, found, "no Trees in repository found") - close(done) - - id, err := backend.ParseID(first) - OK(t, err) + OK(t, server.Flush()) tree := restic.NewTree() - err = server.LoadJSONID(backend.Tree, id, &tree) + err := server.LoadJSONPack(pack.Tree, sn.Tree, &tree) OK(t, err) } -func BenchmarkLoadJSONID(t *testing.B) { +func TestLoadJSONEncrypted(t *testing.T) { if *benchTestDir == "" { t.Skip("benchdir not set, skipping TestServerStats") } @@ -166,18 +194,20 @@ func BenchmarkLoadJSONID(t *testing.B) { key := SetupKey(t, server, "geheim") server.SetKey(key) - // archive a few files - sn := SnapshotDir(t, server, *benchTestDir, nil) - t.Logf("archived snapshot %v", sn.ID()) + // archive a snapshot + sn := restic.Snapshot{} + sn.Hostname = "foobar" + sn.Username = "test!" - t.ResetTimer() + id, err := server.SaveJSONUnpacked(backend.Snapshot, &sn) + OK(t, err) - tree := restic.NewTree() - for i := 0; i < t.N; i++ { - for name := range server.List(backend.Tree, nil) { - id, err := backend.ParseID(name) - OK(t, err) - OK(t, server.LoadJSONID(backend.Tree, id, &tree)) - } - } + var sn2 restic.Snapshot + + // restore + err = server.LoadJSONEncrypted(backend.Snapshot, id, &sn2) + OK(t, err) + + Equals(t, sn.Hostname, sn2.Hostname) + Equals(t, sn.Username, sn2.Username) } diff --git a/snapshot.go b/snapshot.go index eaaf94ec4..e8ac96bba 100644 --- a/snapshot.go +++ b/snapshot.go @@ -11,14 +11,14 @@ import ( ) type Snapshot struct { - Time time.Time `json:"time"` - Parent backend.ID `json:"parent,omitempty"` - Tree server.Blob `json:"tree"` - Paths []string `json:"paths"` - Hostname string `json:"hostname,omitempty"` - Username string `json:"username,omitempty"` - UID uint32 `json:"uid,omitempty"` - GID uint32 `json:"gid,omitempty"` + Time time.Time `json:"time"` + Parent backend.ID `json:"parent,omitempty"` + Tree backend.ID `json:"tree"` + Paths []string `json:"paths"` + Hostname string `json:"hostname,omitempty"` + Username string `json:"username,omitempty"` + UID uint32 `json:"uid,omitempty"` + GID uint32 `json:"gid,omitempty"` id backend.ID // plaintext ID, used during restore } @@ -50,7 +50,7 @@ func NewSnapshot(paths []string) (*Snapshot, error) { func LoadSnapshot(s *server.Server, id backend.ID) (*Snapshot, error) { sn := &Snapshot{id: id} - err := s.LoadJSONID(backend.Snapshot, id, sn) + err := s.LoadJSONEncrypted(backend.Snapshot, id, sn) if err != nil { return nil, err } diff --git a/test/backend.go b/test/backend.go index f2cc8dae6..7dc517ae2 100644 --- a/test/backend.go +++ b/test/backend.go @@ -52,7 +52,6 @@ func SetupKey(t testing.TB, s *server.Server, password string) *server.Key { func SnapshotDir(t testing.TB, server *server.Server, path string, parent backend.ID) *restic.Snapshot { arch, err := restic.NewArchiver(server) OK(t, err) - OK(t, arch.Preload()) sn, _, err := arch.Snapshot(nil, []string{path}, parent) OK(t, err) return sn diff --git a/tree.go b/tree.go index b835e2118..d621ced6f 100644 --- a/tree.go +++ b/tree.go @@ -7,12 +7,12 @@ import ( "github.com/restic/restic/backend" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/server" ) type Tree struct { Nodes []*Node `json:"nodes"` - Map *Map `json:"map"` } var ( @@ -23,17 +23,16 @@ var ( func NewTree() *Tree { return &Tree{ Nodes: []*Node{}, - Map: NewMap(), } } func (t Tree) String() string { - return fmt.Sprintf("Tree<%d nodes, %d blobs>", len(t.Nodes), len(t.Map.list)) + return fmt.Sprintf("Tree<%d nodes>", len(t.Nodes)) } -func LoadTree(s *server.Server, blob server.Blob) (*Tree, error) { +func LoadTree(s *server.Server, id backend.ID) (*Tree, error) { tree := &Tree{} - err := s.LoadJSON(backend.Tree, blob, tree) + err := s.LoadJSONPack(pack.Tree, id, tree) if err != nil { return nil, err } @@ -41,18 +40,13 @@ func LoadTree(s *server.Server, blob server.Blob) (*Tree, error) { return tree, nil } -// Equals returns true if t and other have exactly the same nodes and map. -func (t Tree) Equals(other Tree) bool { +// Equals returns true if t and other have exactly the same nodes. +func (t Tree) Equals(other *Tree) bool { if len(t.Nodes) != len(other.Nodes) { debug.Log("Tree.Equals", "tree.Equals(): trees have different number of nodes") return false } - if !t.Map.Equals(other.Map) { - debug.Log("Tree.Equals", "tree.Equals(): maps aren't equal") - return false - } - for i := 0; i < len(t.Nodes); i++ { if !t.Nodes[i].Equals(*other.Nodes[i]) { debug.Log("Tree.Equals", "tree.Equals(): node %d is different:", i) diff --git a/tree_test.go b/tree_test.go index 89a28b698..db9653d19 100644 --- a/tree_test.go +++ b/tree_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/restic/restic" + "github.com/restic/restic/pack" . "github.com/restic/restic/test" ) @@ -91,3 +92,26 @@ func TestNodeComparison(t *testing.T) { n2.Size -= 1 Assert(t, !node.Equals(n2), "nodes are equal") } + +func TestLoadTree(t *testing.T) { + server := SetupBackend(t) + defer TeardownBackend(t, server) + key := SetupKey(t, server, "geheim") + server.SetKey(key) + + // save tree + tree := restic.NewTree() + id, err := server.SaveJSON(pack.Tree, tree) + OK(t, err) + + // save packs + OK(t, server.Flush()) + + // load tree again + tree2, err := restic.LoadTree(server, id) + OK(t, err) + + Assert(t, tree.Equals(tree2), + "trees are not equal: want %v, got %v", + tree, tree2) +} diff --git a/walk.go b/walk.go index 2bd3373de..569e5820c 100644 --- a/walk.go +++ b/walk.go @@ -3,6 +3,7 @@ package restic import ( "path/filepath" + "github.com/restic/restic/backend" "github.com/restic/restic/debug" "github.com/restic/restic/server" ) @@ -15,10 +16,10 @@ type WalkTreeJob struct { Tree *Tree } -func walkTree(s *server.Server, path string, treeBlob server.Blob, done chan struct{}, jobCh chan<- WalkTreeJob) { - debug.Log("walkTree", "start on %q (%v)", path, treeBlob) +func walkTree(s *server.Server, path string, treeID backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { + debug.Log("walkTree", "start on %q (%v)", path, treeID.Str()) // load tree - t, err := LoadTree(s, treeBlob) + t, err := LoadTree(s, treeID) if err != nil { jobCh <- WalkTreeJob{Path: path, Error: err} return @@ -27,32 +28,22 @@ func walkTree(s *server.Server, path string, treeBlob server.Blob, done chan str for _, node := range t.Nodes { p := filepath.Join(path, node.Name) if node.Type == "dir" { - blob, err := t.Map.FindID(node.Subtree) - if err != nil { - jobCh <- WalkTreeJob{Path: p, Error: err} - continue - } - walkTree(s, p, blob, done, jobCh) + walkTree(s, p, node.Subtree, done, jobCh) } else { - // load old blobs - node.blobs, err = t.Map.Select(node.Content) - if err != nil { - debug.Log("walkTree", "unable to load bobs for %q (%v): %v", path, treeBlob, err) - } jobCh <- WalkTreeJob{Path: p, Node: node, Error: err} } } jobCh <- WalkTreeJob{Path: filepath.Join(path), Tree: t} - debug.Log("walkTree", "done for %q (%v)", path, treeBlob) + debug.Log("walkTree", "done for %q (%v)", path, treeID.Str()) } // WalkTree walks the tree specified by ID recursively and sends a job for each // file and directory it finds. When the channel done is closed, processing // stops. -func WalkTree(server *server.Server, blob server.Blob, done chan struct{}, jobCh chan<- WalkTreeJob) { - debug.Log("WalkTree", "start on %v", blob) - walkTree(server, "", blob, done, jobCh) +func WalkTree(server *server.Server, id backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { + debug.Log("WalkTree", "start on %v", id.Str()) + walkTree(server, "", id, done, jobCh) close(jobCh) debug.Log("WalkTree", "done") } diff --git a/walk_test.go b/walk_test.go index bce1da765..170742d48 100644 --- a/walk_test.go +++ b/walk_test.go @@ -27,6 +27,9 @@ func TestWalkTree(t *testing.T) { sn, _, err := arch.Snapshot(nil, dirs, nil) OK(t, err) + // flush server, write all packs + OK(t, server.Flush()) + // start benchmark // t.ResetTimer() @@ -48,6 +51,9 @@ func TestWalkTree(t *testing.T) { fsJob, fsChOpen := <-fsJobs Assert(t, !fsChOpen || fsJob != nil, "received nil job from filesystem: %v %v", fsJob, fsChOpen) + if fsJob != nil { + OK(t, fsJob.Error()) + } var path string fsEntries := 1 @@ -63,6 +69,8 @@ func TestWalkTree(t *testing.T) { treeJob, treeChOpen := <-treeJobs treeEntries := 1 + OK(t, treeJob.Error) + if treeJob.Tree != nil { treeEntries = len(treeJob.Tree.Nodes) }