diff --git a/archiver.go b/archiver.go index c472a0eeb..c181711d7 100644 --- a/archiver.go +++ b/archiver.go @@ -87,7 +87,7 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) { // check if tree has been saved before id := backend.Hash(data) - if arch.repo.Index().Has(id) { + if arch.repo.Index().IsInFlight(id) || arch.repo.Index().Has(id) { return id, nil } @@ -651,13 +651,13 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentID *backend.ID } // save index - indexID, err := arch.repo.SaveIndex() + err = arch.repo.SaveIndex() if err != nil { debug.Log("Archiver.Snapshot", "error saving index: %v", err) return nil, backend.ID{}, err } - debug.Log("Archiver.Snapshot", "saved index %v", indexID.Str()) + debug.Log("Archiver.Snapshot", "saved indexes") return sn, id, nil } diff --git a/archiver_test.go b/archiver_test.go index 49b7137ec..007ed4115 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -209,15 +209,17 @@ func BenchmarkLoadTree(t *testing.B) { list := make([]backend.ID, 0, 10) done := make(chan struct{}) - for blob := range repo.Index().Each(done) { - if blob.Type != pack.Tree { - continue - } + for _, idx := range repo.Index().All() { + for blob := range idx.Each(done) { + if blob.Type != pack.Tree { + continue + } - list = append(list, blob.ID) - if len(list) == cap(list) { - close(done) - break + list = append(list, blob.ID) + if len(list) == cap(list) { + close(done) + break + } } } diff --git a/checker/checker.go b/checker/checker.go index ca5e6c913..d6fe49eb4 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -27,7 +27,7 @@ type Checker struct { indexes map[backend.ID]*repository.Index orphanedPacks backend.IDs - masterIndex *repository.Index + masterIndex *repository.MasterIndex repo *repository.Repository } @@ -37,7 +37,7 @@ func New(repo *repository.Repository) *Checker { c := &Checker{ packs: make(map[backend.ID]struct{}), blobs: make(map[backend.ID]struct{}), - masterIndex: repository.NewIndex(), + masterIndex: repository.NewMasterIndex(), indexes: make(map[backend.ID]*repository.Index), repo: repo, } @@ -105,7 +105,7 @@ func (c *Checker) LoadIndex() error { } c.indexes[id] = res.Index - c.masterIndex.Merge(res.Index) + c.masterIndex.Insert(res.Index) debug.Log("LoadIndex", "process blobs") cnt := 0 diff --git a/cmd/restic/cmd_dump.go b/cmd/restic/cmd_dump.go index bc968797d..5ea3e318c 100644 --- a/cmd/restic/cmd_dump.go +++ b/cmd/restic/cmd_dump.go @@ -73,12 +73,14 @@ func printTrees(repo *repository.Repository, wr io.Writer) error { trees := []backend.ID{} - for blob := range repo.Index().Each(done) { - if blob.Type != pack.Tree { - continue - } + for _, idx := range repo.Index().All() { + for blob := range idx.Each(nil) { + if blob.Type != pack.Tree { + continue + } - trees = append(trees, blob.ID) + trees = append(trees, blob.ID) + } } for _, id := range trees { diff --git a/cmd/restic/cmd_list.go b/cmd/restic/cmd_list.go index c8ea6c65c..11f4bd8d6 100644 --- a/cmd/restic/cmd_list.go +++ b/cmd/restic/cmd_list.go @@ -49,8 +49,10 @@ func (cmd CmdList) Execute(args []string) error { return err } - for blob := range repo.Index().Each(nil) { - cmd.global.Printf("%s\n", blob.ID) + for _, idx := range repo.Index().All() { + for blob := range idx.Each(nil) { + cmd.global.Printf("%s\n", blob.ID) + } } return nil diff --git a/repository/index.go b/repository/index.go index bc5324e62..11a84e29b 100644 --- a/repository/index.go +++ b/repository/index.go @@ -18,6 +18,7 @@ type Index struct { m sync.Mutex pack map[backend.ID]indexEntry + final bool // set to true for all indexes read from the backend ("finalized") supersedes backend.IDs } @@ -26,7 +27,6 @@ type indexEntry struct { packID *backend.ID offset uint length uint - old bool } // NewIndex returns a new index. @@ -36,26 +36,38 @@ func NewIndex() *Index { } } -func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint, old bool) { +func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) { idx.pack[id] = indexEntry{ tpe: t, packID: pack, offset: offset, length: length, - old: old, } } +// Final returns true iff the index is already written to the repository, it is +// finalized. +func (idx *Index) Final() bool { + idx.m.Lock() + defer idx.m.Unlock() + + return idx.final +} + // Store remembers the id and pack in the index. An existing entry will be // silently overwritten. func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) { idx.m.Lock() defer idx.m.Unlock() + if idx.final { + panic("store new item in finalized index") + } + debug.Log("Index.Store", "pack %v contains id %v (%v), offset %v, length %v", pack.Str(), id.Str(), t, offset, length) - idx.store(t, id, pack, offset, length, false) + idx.store(t, id, pack, offset, length) } // StoreInProgress adds a preliminary index entry for a blob that is about to be @@ -66,13 +78,17 @@ func (idx *Index) StoreInProgress(t pack.BlobType, id backend.ID) error { idx.m.Lock() defer idx.m.Unlock() + if idx.final { + panic("store new item in finalized index") + } + if _, hasID := idx.pack[id]; hasID { errorMsg := fmt.Sprintf("index already contains id %v (%v)", id.Str(), t) debug.Log("Index.StoreInProgress", errorMsg) return errors.New(errorMsg) } - idx.store(t, id, nil, 0, 0, false) + idx.store(t, id, nil, 0, 0) debug.Log("Index.StoreInProgress", "preliminary entry added for id %v (%v)", id.Str(), t) return nil @@ -83,6 +99,10 @@ func (idx *Index) Remove(packID backend.ID) { idx.m.Lock() defer idx.m.Unlock() + if idx.final { + panic("remove item from finalized index") + } + debug.Log("Index.Remove", "id %v removed", packID.Str()) if _, ok := idx.pack[packID]; ok { @@ -270,32 +290,41 @@ type jsonIndex struct { type jsonOldIndex []*packJSON -// encode writes the JSON serialization of the index filtered by selectFn to enc. -func (idx *Index) encode(w io.Writer, supersedes backend.IDs, selectFn func(indexEntry) bool) error { - list, err := idx.generatePackList(selectFn) - if err != nil { - return err - } - - debug.Log("Index.Encode", "done, %d entries selected", len(list)) - - enc := json.NewEncoder(w) - idxJSON := jsonIndex{ - Supersedes: supersedes, - Packs: list, - } - return enc.Encode(idxJSON) -} - -// Encode writes the JSON serialization of the index to the writer w. This -// serialization only contains new blobs added via idx.Store(), not old ones -// introduced via DecodeIndex(). +// Encode writes the JSON serialization of the index to the writer w. func (idx *Index) Encode(w io.Writer) error { debug.Log("Index.Encode", "encoding index") idx.m.Lock() defer idx.m.Unlock() - return idx.encode(w, idx.supersedes, func(e indexEntry) bool { return !e.old }) + return idx.encode(w) +} + +// encode writes the JSON serialization of the index to the writer w. +func (idx *Index) encode(w io.Writer) error { + debug.Log("Index.encode", "encoding index") + + list, err := idx.generatePackList(nil) + if err != nil { + return err + } + + enc := json.NewEncoder(w) + idxJSON := jsonIndex{ + Supersedes: idx.supersedes, + Packs: list, + } + return enc.Encode(idxJSON) +} + +// Finalize sets the index to final and writes the JSON serialization to w. +func (idx *Index) Finalize(w io.Writer) error { + debug.Log("Index.Encode", "encoding index") + idx.m.Lock() + defer idx.m.Unlock() + + idx.final = true + + return idx.encode(w) } // Dump writes the pretty-printed JSON representation of the index to w. @@ -358,10 +387,11 @@ func DecodeIndex(rd io.Reader) (idx *Index, err error) { idx = NewIndex() for _, pack := range idxJSON.Packs { for _, blob := range pack.Blobs { - idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length, true) + idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length) } } idx.supersedes = idxJSON.Supersedes + idx.final = true debug.Log("Index.DecodeIndex", "done") return idx, err @@ -382,7 +412,7 @@ func DecodeOldIndex(rd io.Reader) (idx *Index, err error) { idx = NewIndex() for _, pack := range list { for _, blob := range pack.Blobs { - idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length, true) + idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length) } } @@ -436,8 +466,7 @@ func ConvertIndex(repo *Repository, id backend.ID) (backend.ID, error) { idx.supersedes = backend.IDs{id} - // select all blobs for export - err = idx.encode(blob, idx.supersedes, func(e indexEntry) bool { return true }) + err = idx.Encode(blob) if err != nil { debug.Log("ConvertIndex", "oldIdx.Encode() returned error: %v", err) return id, err diff --git a/repository/index_test.go b/repository/index_test.go index c4587457f..3398c90d3 100644 --- a/repository/index_test.go +++ b/repository/index_test.go @@ -86,7 +86,7 @@ func TestIndexSerialize(t *testing.T) { Equals(t, testBlob.length, length) } - // add more blobs to idx2 + // add more blobs to idx newtests := []testEntry{} for i := 0; i < 10; i++ { packID := randomID() @@ -95,7 +95,7 @@ func TestIndexSerialize(t *testing.T) { for j := 0; j < 10; j++ { id := randomID() length := uint(i*100 + j) - idx2.Store(pack.Data, id, &packID, pos, length) + idx.Store(pack.Data, id, &packID, pos, length) newtests = append(newtests, testEntry{ id: id, @@ -109,22 +109,20 @@ func TestIndexSerialize(t *testing.T) { } } - // serialize idx2, unserialize to idx3 + // serialize idx, unserialize to idx3 wr3 := bytes.NewBuffer(nil) - err = idx2.Encode(wr3) + err = idx.Finalize(wr3) OK(t, err) + Assert(t, idx.Final(), + "index not final after encoding") + idx3, err := repository.DecodeIndex(wr3) OK(t, err) Assert(t, idx3 != nil, "nil returned for decoded index") - - // all old blobs must not be present in the index - for _, testBlob := range tests { - _, _, _, _, err := idx3.Lookup(testBlob.id) - Assert(t, err != nil, - "found old id %v in serialized index", testBlob.id.Str()) - } + Assert(t, idx3.Final(), + "decoded index is not final") // all new blobs must be in the index for _, testBlob := range newtests { @@ -333,7 +331,8 @@ func TestStoreOverwritesPreliminaryEntry(t *testing.T) { blobID := randomID() dataType := pack.Data - idx.StoreInProgress(dataType, blobID) + err := idx.StoreInProgress(dataType, blobID) + OK(t, err) packID := randomID() offset := uint(0) diff --git a/repository/master_index.go b/repository/master_index.go new file mode 100644 index 000000000..c032152cc --- /dev/null +++ b/repository/master_index.go @@ -0,0 +1,200 @@ +package repository + +import ( + "fmt" + "sync" + + "github.com/restic/restic/backend" + "github.com/restic/restic/debug" + "github.com/restic/restic/pack" +) + +// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved. +type MasterIndex struct { + idx []*Index + idxMutex sync.RWMutex + + inFlight struct { + backend.IDSet + sync.RWMutex + } +} + +// NewMasterIndex creates a new master index. +func NewMasterIndex() *MasterIndex { + return &MasterIndex{ + inFlight: struct { + backend.IDSet + sync.RWMutex + }{ + IDSet: backend.NewIDSet(), + }, + } +} + +// Lookup queries all known Indexes for the ID and returns the first match. +func (mi *MasterIndex) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobType, offset, length uint, err error) { + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() + + debug.Log("MasterIndex.Lookup", "looking up id %v", id.Str()) + + for _, idx := range mi.idx { + packID, tpe, offset, length, err = idx.Lookup(id) + if err == nil { + debug.Log("MasterIndex.Lookup", + "found id %v in pack %v at offset %d with length %d", + id.Str(), packID.Str(), offset, length) + return + } + } + + debug.Log("MasterIndex.Lookup", "id %v not found in any index", id.Str()) + return nil, pack.Data, 0, 0, fmt.Errorf("id %v not found in any index", id) +} + +// LookupSize queries all known Indexes for the ID and returns the first match. +func (mi *MasterIndex) LookupSize(id backend.ID) (uint, error) { + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() + + for _, idx := range mi.idx { + length, err := idx.LookupSize(id) + if err == nil { + return length, nil + } + } + + return 0, fmt.Errorf("id %v not found in any index", id) +} + +// Has queries all known Indexes for the ID and returns the first match. +func (mi *MasterIndex) Has(id backend.ID) bool { + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() + + for _, idx := range mi.idx { + if idx.Has(id) { + return true + } + } + + return false +} + +// Count returns the number of blobs of type t in the index. +func (mi *MasterIndex) Count(t pack.BlobType) (n uint) { + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() + + var sum uint + for _, idx := range mi.idx { + sum += idx.Count(t) + } + + return sum +} + +// Insert adds a new index to the MasterIndex. +func (mi *MasterIndex) Insert(idx *Index) { + mi.idxMutex.Lock() + defer mi.idxMutex.Unlock() + + mi.idx = append(mi.idx, idx) +} + +// Remove deletes an index from the MasterIndex. +func (mi *MasterIndex) Remove(index *Index) { + mi.idxMutex.Lock() + defer mi.idxMutex.Unlock() + + for i, idx := range mi.idx { + if idx == index { + mi.idx = append(mi.idx[:i], mi.idx[i+1:]...) + return + } + } +} + +// Current returns an index that is not yet finalized, so that new entries can +// still be added. If all indexes are finalized, a new index is created and +// returned. +func (mi *MasterIndex) Current() *Index { + mi.idxMutex.RLock() + + for _, idx := range mi.idx { + if !idx.Final() { + mi.idxMutex.RUnlock() + return idx + } + } + + mi.idxMutex.RUnlock() + mi.idxMutex.Lock() + defer mi.idxMutex.Unlock() + + newIdx := NewIndex() + mi.idx = append(mi.idx, newIdx) + + return newIdx +} + +// AddInFlight add the given IDs to the list of in-flight IDs. +func (mi *MasterIndex) AddInFlight(IDs ...backend.ID) { + mi.inFlight.Lock() + defer mi.inFlight.Unlock() + + ids := backend.IDs(IDs) + + debug.Log("MasterIndex.AddInFlight", "adding %v", ids) + + for _, id := range ids { + mi.inFlight.Insert(id) + } +} + +// IsInFlight returns true iff the id is contained in the list of in-flight IDs. +func (mi *MasterIndex) IsInFlight(id backend.ID) bool { + mi.inFlight.RLock() + defer mi.inFlight.RUnlock() + + inFlight := mi.inFlight.Has(id) + debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) + + return inFlight +} + +// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs. +func (mi *MasterIndex) RemoveFromInFlight(id backend.ID) { + mi.inFlight.Lock() + defer mi.inFlight.Unlock() + + debug.Log("MasterIndex.RemoveFromInFlight", "removing %v from list of in flight blobs", id.Str()) + + mi.inFlight.Delete(id) +} + +// NotFinalIndexes returns all indexes that have not yet been saved. +func (mi *MasterIndex) NotFinalIndexes() []*Index { + mi.idxMutex.Lock() + defer mi.idxMutex.Unlock() + + var list []*Index + + for _, idx := range mi.idx { + if !idx.Final() { + list = append(list, idx) + } + } + + debug.Log("MasterIndex.NotFinalIndexes", "saving %d indexes", len(list)) + return list +} + +// All returns all indexes. +func (mi *MasterIndex) All() []*Index { + mi.idxMutex.Lock() + defer mi.idxMutex.Unlock() + + return mi.idx +} diff --git a/repository/repository.go b/repository/repository.go index a49188289..2fe65a963 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -24,7 +24,7 @@ type Repository struct { Config Config key *crypto.Key keyName string - idx *Index + idx *MasterIndex pm sync.Mutex packs []*pack.Packer @@ -34,7 +34,7 @@ type Repository struct { func New(be backend.Backend) *Repository { return &Repository{ be: be, - idx: NewIndex(), + idx: NewMasterIndex(), } } @@ -204,7 +204,7 @@ func (r *Repository) LoadJSONPack(t pack.BlobType, id backend.ID, item interface // LookupBlobSize returns the size of blob id. func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) { - return r.Index().LookupSize(id) + return r.idx.LookupSize(id) } const minPackSize = 4 * chunker.MiB @@ -269,7 +269,8 @@ func (r *Repository) savePacker(p *pack.Packer) error { // update blobs in the index for _, b := range p.Blobs() { debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str()) - r.idx.Store(b.Type, b.ID, &sid, b.Offset, uint(b.Length)) + r.idx.Current().Store(b.Type, b.ID, &sid, b.Offset, uint(b.Length)) + r.idx.RemoveFromInFlight(b.ID) } return nil @@ -304,16 +305,15 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID return backend.ID{}, err } - // add this id to the index, although we don't know yet in which pack it - // will be saved; the entry will be updated when the pack is written. - // Note: the current id needs to be added to the index before searching - // for a suitable packer: There's a little chance that more than one - // goroutine handles the same blob concurrently. Due to idx.StoreInProgress - // locking the index and raising an error if a matching index entry - // already exists, updating the index first ensures that only one of - // those goroutines will continue. See issue restic#292. - debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t) - err = r.idx.StoreInProgress(t, *id) + // check if this id is already been saved by another goroutine + if r.idx.IsInFlight(*id) { + debug.Log("Repo.Save", "blob %v is already being saved", id.Str()) + return *id, nil + } + + // add this id to the list of in-flight chunk ids. + debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str()) + r.idx.AddInFlight(*id) if err != nil { debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t) return *id, nil @@ -322,12 +322,15 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID // find suitable packer and add blob packer, err := r.findPacker(uint(len(ciphertext))) if err != nil { - r.idx.Remove(*id) + r.idx.RemoveFromInFlight(*id) return backend.ID{}, err } // save ciphertext - packer.Add(t, *id, bytes.NewReader(ciphertext)) + _, err = packer.Add(t, *id, bytes.NewReader(ciphertext)) + if err != nil { + return backend.ID{}, err + } // if the pack is not full enough and there are less than maxPackers // packers, put back to the list @@ -446,13 +449,13 @@ func (r *Repository) Backend() backend.Backend { return r.be } -// Index returns the currently loaded Index. -func (r *Repository) Index() *Index { +// Index returns the currently used MasterIndex. +func (r *Repository) Index() *MasterIndex { return r.idx } // SetIndex instructs the repository to use the given index. -func (r *Repository) SetIndex(i *Index) { +func (r *Repository) SetIndex(i *MasterIndex) { r.idx = i } @@ -510,37 +513,38 @@ func (bw *BlobWriter) ID() backend.ID { return bw.id } -// SaveIndex saves all new packs in the index in the backend, returned is the -// storage ID. -func (r *Repository) SaveIndex() (backend.ID, error) { - debug.Log("Repo.SaveIndex", "Saving index") +// SaveIndex saves all new indexes in the backend. +func (r *Repository) SaveIndex() error { + for i, idx := range r.idx.NotFinalIndexes() { + debug.Log("Repo.SaveIndex", "Saving index %d", i) - blob, err := r.CreateEncryptedBlob(backend.Index) - if err != nil { - return backend.ID{}, err + blob, err := r.CreateEncryptedBlob(backend.Index) + if err != nil { + return err + } + + err = idx.Encode(blob) + if err != nil { + return err + } + + err = blob.Close() + if err != nil { + return err + } + + sid := blob.ID() + + debug.Log("Repo.SaveIndex", "Saved index %d as %v", i, sid.Str()) } - err = r.idx.Encode(blob) - if err != nil { - return backend.ID{}, err - } - - err = blob.Close() - if err != nil { - return backend.ID{}, err - } - - sid := blob.ID() - - debug.Log("Repo.SaveIndex", "Saved index as %v", sid.Str()) - - return sid, nil + return nil } const loadIndexParallelism = 20 -// LoadIndex loads all index files from the backend in parallel and merges them -// with the current index. The first error that occurred is returned. +// LoadIndex loads all index files from the backend in parallel and stores them +// in the master index. The first error that occurred is returned. func (r *Repository) LoadIndex() error { debug.Log("Repo.LoadIndex", "Loading index") @@ -567,7 +571,7 @@ func (r *Repository) LoadIndex() error { }() for idx := range indexes { - r.idx.Merge(idx) + r.idx.Insert(idx) } if err := <-errCh; err != nil { diff --git a/repository/repository_test.go b/repository/repository_test.go index 7589408a7..93daece63 100644 --- a/repository/repository_test.go +++ b/repository/repository_test.go @@ -88,6 +88,7 @@ func TestSave(t *testing.T) { Equals(t, id, sid) OK(t, repo.Flush()) + // OK(t, repo.SaveIndex()) // read back buf, err := repo.LoadBlob(pack.Data, id, make([]byte, size)) @@ -214,7 +215,7 @@ func BenchmarkLoadIndex(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - repo.SetIndex(repository.NewIndex()) + repo.SetIndex(repository.NewMasterIndex()) OK(b, repo.LoadIndex()) } })