From f676c0c41bfe26b45df7a056324396b7e5dca509 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 18 Jun 2017 14:45:02 +0200 Subject: [PATCH] index: Add Each() to MasterIndex --- src/restic/checker/checker.go | 2 +- src/restic/fuse/snapshot.go | 2 +- src/restic/repository.go | 5 ++++ src/restic/repository/index.go | 10 +++---- src/restic/repository/master_index.go | 37 ++++++++++++++++++++++-- src/restic/repository/repository_test.go | 2 +- 6 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/restic/checker/checker.go b/src/restic/checker/checker.go index 99d9f18c1..b423da7b6 100644 --- a/src/restic/checker/checker.go +++ b/src/restic/checker/checker.go @@ -142,7 +142,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { debug.Log("process blobs") cnt := 0 - for blob := range res.Index.Each(done) { + for blob := range res.Index.Each(ctx) { c.packs.Insert(blob.PackID) c.blobs.Insert(blob.ID) c.blobRefs.M[blob.ID] = 0 diff --git a/src/restic/fuse/snapshot.go b/src/restic/fuse/snapshot.go index 7701af318..af560bb55 100644 --- a/src/restic/fuse/snapshot.go +++ b/src/restic/fuse/snapshot.go @@ -28,7 +28,7 @@ type BlobSizeCache struct { func NewBlobSizeCache(midx *repository.MasterIndex) *BlobSizeCache { m := make(map[restic.ID]uint, 1000) for _, idx := range midx.All() { - for pb := range idx.Each(nil) { + for pb := range idx.Each(context.TODO()) { m[pb.ID] = pb.Length } } diff --git a/src/restic/repository.go b/src/restic/repository.go index 81f217a37..302a55703 100644 --- a/src/restic/repository.go +++ b/src/restic/repository.go @@ -58,4 +58,9 @@ type Index interface { Has(ID, BlobType) bool Lookup(ID, BlobType) ([]PackedBlob, error) Count(BlobType) uint + + // Each returns a channel that yields all blobs known to the index. When + // the context is cancelled, the background goroutine terminates. This + // blocks any modification of the index. + Each(ctx context.Context) <-chan PackedBlob } diff --git a/src/restic/repository/index.go b/src/restic/repository/index.go index 6e5dac25b..dd96eec34 100644 --- a/src/restic/repository/index.go +++ b/src/restic/repository/index.go @@ -206,10 +206,10 @@ func (idx *Index) AddToSupersedes(ids ...restic.ID) error { return nil } -// Each returns a channel that yields all blobs known to the index. If done is -// closed, the background goroutine terminates. This blocks any modification of -// the index. -func (idx *Index) Each(done chan struct{}) <-chan restic.PackedBlob { +// Each returns a channel that yields all blobs known to the index. When the +// context is cancelled, the background goroutine terminates. This blocks any +// modification of the index. +func (idx *Index) Each(ctx context.Context) <-chan restic.PackedBlob { idx.m.Lock() ch := make(chan restic.PackedBlob) @@ -223,7 +223,7 @@ func (idx *Index) Each(done chan struct{}) <-chan restic.PackedBlob { for h, packs := range idx.pack { for _, blob := range packs { select { - case <-done: + case <-ctx.Done(): return case ch <- restic.PackedBlob{ Blob: restic.Blob{ diff --git a/src/restic/repository/master_index.go b/src/restic/repository/master_index.go index ebd2cbef2..54d61f6db 100644 --- a/src/restic/repository/master_index.go +++ b/src/restic/repository/master_index.go @@ -1,6 +1,7 @@ package repository import ( + "context" "restic" "sync" @@ -188,6 +189,35 @@ func (mi *MasterIndex) All() []*Index { return mi.idx } +// Each returns a channel that yields all blobs known to the index. When the +// context is cancelled, the background goroutine terminates. This blocks any +// modification of the index. +func (mi *MasterIndex) Each(ctx context.Context) <-chan restic.PackedBlob { + mi.idxMutex.RLock() + + ch := make(chan restic.PackedBlob) + + go func() { + defer mi.idxMutex.RUnlock() + defer func() { + close(ch) + }() + + for _, idx := range mi.idx { + idxCh := idx.Each(ctx) + for pb := range idxCh { + select { + case <-ctx.Done(): + return + case ch <- pb: + } + } + } + }() + + return ch +} + // RebuildIndex combines all known indexes to a new index, leaving out any // packs whose ID is contained in packBlacklist. The new index contains the IDs // of all known indexes in the "supersedes" field. @@ -198,13 +228,14 @@ func (mi *MasterIndex) RebuildIndex(packBlacklist restic.IDSet) (*Index, error) debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist) newIndex := NewIndex() - done := make(chan struct{}) - defer close(done) + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() for i, idx := range mi.idx { debug.Log("adding index %d", i) - for pb := range idx.Each(done) { + for pb := range idx.Each(ctx) { if packBlacklist.Has(pb.PackID) { continue } diff --git a/src/restic/repository/repository_test.go b/src/restic/repository/repository_test.go index cc9e0ab64..629716978 100644 --- a/src/restic/repository/repository_test.go +++ b/src/restic/repository/repository_test.go @@ -373,7 +373,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) { idx, err := repository.LoadIndex(context.TODO(), repo, id) OK(t, err) - for pb := range idx.Each(nil) { + for pb := range idx.Each(context.TODO()) { if _, ok := packEntries[pb.PackID]; !ok { packEntries[pb.PackID] = make(map[restic.ID]struct{}) }