From 3c92c7e68907532ab8205a6f61518d68b70ff468 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 14 Mar 2015 18:15:47 +0100 Subject: [PATCH] Recreate blob cache if missing (closes #104) --- archiver.go | 40 +++++++++----------------- cache.go | 79 +++++++++++++++++++++++++++++++++++++++++++++++++++ cache_test.go | 45 +++++++++++++++++++++++++++++ map.go | 10 +++++++ 4 files changed, 147 insertions(+), 27 deletions(-) create mode 100644 cache_test.go diff --git a/archiver.go b/archiver.go index 26be67427..408db69c2 100644 --- a/archiver.go +++ b/archiver.go @@ -76,21 +76,16 @@ func (arch *Archiver) Preload() error { // TODO: track seen tree ids, load trees that aren't in the set for _, id := range snapshots { - // try to load snapshot blobs from cache - rd, err := arch.c.Load(backend.Snapshot, "blobs", id) + m, err := arch.c.LoadMap(arch.s, id) if err != nil { debug.Log("Archiver.Preload", "blobs for snapshot %v not cached: %v", id.Str(), err) - return err - } - debug.Log("Archiver.Preload", "load cached blobs for snapshot %v", id.Str()) - dec := json.NewDecoder(rd) - - m := &Map{} - err = dec.Decode(m) - if err != nil { - debug.Log("Archiver.Preload", "error loading cached blobs for snapshot %v: %v", id.Str(), err) - continue + // build new cache + m, err = CacheSnapshotBlobs(arch.s, arch.c, id) + if err != nil { + debug.Log("Archiver.Preload", "unable to cache snapshot blobs for %v: %v", id.Str(), err) + return err + } } arch.m.Merge(m) @@ -473,9 +468,9 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str debug.Log("Archiver.dirWorker", "got tree node for %s: %v", node.path, node.blobs) } + // also store blob in tree map for _, blob := range node.blobs { tree.Map.Insert(blob) - arch.m.Insert(blob) } } @@ -761,11 +756,12 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn // receive the top-level tree root := (<-resCh).(*Node) - debug.Log("Archiver.Snapshot", "root node received: %v", root.blobs[0]) - sn.Tree = root.blobs[0] + blob := root.blobs[0] + debug.Log("Archiver.Snapshot", "root node received: %v", blob) + sn.Tree = blob // save snapshot - blob, err := arch.s.SaveJSON(backend.Snapshot, sn) + blob, err = arch.s.SaveJSON(backend.Snapshot, sn) if err != nil { return nil, nil, err } @@ -776,23 +772,13 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn debug.Log("Archiver.Snapshot", "saved snapshot %v", blob.Storage.Str()) // cache blobs - wr, err := arch.c.Store(backend.Snapshot, "blobs", blob.Storage) + err = arch.c.StoreMap(sn.id, arch.m) if err != nil { debug.Log("Archiver.Snapshot", "unable to cache blobs for snapshot %v: %v", blob.Storage.Str(), err) fmt.Fprintf(os.Stderr, "unable to cache blobs for snapshot %v: %v\n", blob.Storage.Str(), err) return sn, blob.Storage, nil } - enc := json.NewEncoder(wr) - err = enc.Encode(arch.m) - if err != nil { - debug.Log("Archiver.Snapshot", "error encoding map for snapshot %v: %v", blob.Storage.Str(), err) - } else { - debug.Log("Archiver.Snapshot", "cached %d blobs for snapshot %v", arch.m.Len(), blob.Storage.Str()) - } - - wr.Close() - return sn, blob.Storage, nil } diff --git a/cache.go b/cache.go index 32762bc84..36c6e69b0 100644 --- a/cache.go +++ b/cache.go @@ -1,11 +1,13 @@ package restic import ( + "encoding/json" "fmt" "io" "os" "path/filepath" "strings" + "sync" "github.com/restic/restic/backend" "github.com/restic/restic/debug" @@ -203,3 +205,80 @@ func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string, return "", fmt.Errorf("cache not supported for type %v", t) } + +// high-level functions + +// CacheSnapshotBlobs creates a cache of all the blobs used within the +// snapshot. It collects all blobs from all trees and saves the resulting map +// to the cache and returns the map. +func CacheSnapshotBlobs(s Server, c *Cache, id backend.ID) (*Map, error) { + debug.Log("CacheSnapshotBlobs", "create cache for snapshot %v", id.Str()) + + sn, err := LoadSnapshot(s, id) + if err != nil { + debug.Log("CacheSnapshotBlobs", "unable to load snapshot %v: %v", id.Str(), err) + return nil, err + } + + m := NewMap() + + // add top-level node + m.Insert(sn.Tree) + + // start walker + var wg sync.WaitGroup + ch := make(chan WalkTreeJob) + + wg.Add(1) + go func() { + WalkTree(s, sn.Tree.Storage, nil, ch) + wg.Done() + }() + + for i := 0; i < maxConcurrencyPreload; i++ { + wg.Add(1) + go func() { + for job := range ch { + if job.Tree == nil { + continue + } + debug.Log("CacheSnapshotBlobs", "got job %v", job) + m.Merge(job.Tree.Map) + } + + wg.Done() + }() + } + + wg.Wait() + + // save blob list for snapshot + return m, c.StoreMap(id, m) +} + +func (c *Cache) StoreMap(snid backend.ID, m *Map) error { + wr, err := c.Store(backend.Snapshot, "blobs", snid) + if err != nil { + return nil + } + defer wr.Close() + + enc := json.NewEncoder(wr) + err = enc.Encode(m) + if err != nil { + return err + } + + return nil +} + +func (c *Cache) LoadMap(s Server, snid backend.ID) (*Map, error) { + rd, err := c.Load(backend.Snapshot, "blobs", snid) + if err != nil { + return nil, err + } + + m := &Map{} + err = json.NewDecoder(rd).Decode(m) + return m, err +} diff --git a/cache_test.go b/cache_test.go new file mode 100644 index 000000000..e642aa699 --- /dev/null +++ b/cache_test.go @@ -0,0 +1,45 @@ +package restic_test + +import ( + "encoding/json" + "testing" + + "github.com/restic/restic" + "github.com/restic/restic/backend" +) + +func TestCache(t *testing.T) { + be := setupBackend(t) + defer teardownBackend(t, be) + key := setupKey(t, be, "geheim") + server := restic.NewServerWithKey(be, key) + + cache, err := restic.NewCache(server) + ok(t, err) + + arch, err := restic.NewArchiver(server) + ok(t, err) + + // archive some files, this should automatically cache all blobs from the snapshot + _, id, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) + + // try to load map from cache + rd, err := cache.Load(backend.Snapshot, "blobs", id) + ok(t, err) + + dec := json.NewDecoder(rd) + + m := &restic.Map{} + err = dec.Decode(m) + ok(t, err) + + // remove cached blob list + ok(t, cache.Purge(backend.Snapshot, "blobs", id)) + + // recreate cached blob list + m2, err := restic.CacheSnapshotBlobs(server, cache, id) + ok(t, err) + + // compare maps + assert(t, m.Equals(m2), "Maps are not equal") +} diff --git a/map.go b/map.go index 4b9cd7d87..d80ca8258 100644 --- a/map.go +++ b/map.go @@ -124,15 +124,25 @@ func (bl *Map) StorageIDs() []backend.ID { } func (bl *Map) Equals(other *Map) bool { + if bl == nil && other == nil { + return true + } + + if bl == nil || other == nil { + return false + } + bl.m.Lock() defer bl.m.Unlock() if len(bl.list) != len(other.list) { + debug.Log("Map.Equals", "length does not match: %d != %d", len(bl.list), len(other.list)) return false } for i := 0; i < len(bl.list); i++ { if bl.list[i].Compare(other.list[i]) != 0 { + debug.Log("Map.Equals", "entry %d does not match: %v != %v", i, bl.list[i], other.list[i]) return false } }