diff --git a/archiver.go b/archiver.go index c181711d7..a26dc3e7f 100644 --- a/archiver.go +++ b/archiver.go @@ -9,6 +9,7 @@ import ( "path/filepath" "sort" "sync" + "time" "github.com/restic/chunker" "github.com/restic/restic/backend" @@ -540,6 +541,30 @@ func (j archiveJob) Copy() pipe.Job { return j.new } +const saveIndexTime = 30 * time.Second + +// saveIndexes regularly queries the master index for full indexes and saves them. +func (arch *Archiver) saveIndexes(wg *sync.WaitGroup, done <-chan struct{}) { + defer wg.Done() + + ticker := time.NewTicker(saveIndexTime) + defer ticker.Stop() + + for { + select { + case <-done: + return + case <-ticker.C: + debug.Log("Archiver.saveIndexes", "saving full indexes") + err := arch.repo.SaveFullIndex() + if err != nil { + debug.Log("Archiver.saveIndexes", "save indexes returned an error: %v", err) + fmt.Fprintf(os.Stderr, "error saving preliminary index: %v\n", err) + } + } + } +} + // Snapshot creates a snapshot of the given paths. If parentID is set, this is // used to compare the files to the ones archived at the time this snapshot was // taken. @@ -623,10 +648,20 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentID *backend.ID go arch.dirWorker(&wg, p, done, dirCh) } + // run index saver + var wgIndexSaver sync.WaitGroup + stopIndexSaver := make(chan struct{}) + wgIndexSaver.Add(1) + go arch.saveIndexes(&wgIndexSaver, stopIndexSaver) + // wait for all workers to terminate debug.Log("Archiver.Snapshot", "wait for workers") wg.Wait() + // stop index saver + close(stopIndexSaver) + wgIndexSaver.Wait() + debug.Log("Archiver.Snapshot", "workers terminated") // receive the top-level tree @@ -681,7 +716,6 @@ func Scan(dirs []string, filter pipe.SelectFunc, p *Progress) (Stat, error) { for _, dir := range dirs { debug.Log("Scan", "Start for %v", dir) err := filepath.Walk(dir, func(str string, fi os.FileInfo, err error) error { - debug.Log("Scan.Walk", "%v, fi: %v, err: %v", str, fi, err) // TODO: integrate error reporting if err != nil { fmt.Fprintf(os.Stderr, "error for %v: %v\n", str, err) diff --git a/repository/index.go b/repository/index.go index 6107d8c84..006944fb6 100644 --- a/repository/index.go +++ b/repository/index.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "sync" + "time" "github.com/restic/restic/backend" "github.com/restic/restic/crypto" @@ -20,6 +21,7 @@ type Index struct { final bool // set to true for all indexes read from the backend ("finalized") supersedes backend.IDs + created time.Time } type indexEntry struct { @@ -32,7 +34,8 @@ type indexEntry struct { // NewIndex returns a new index. func NewIndex() *Index { return &Index{ - pack: make(map[backend.ID]indexEntry), + pack: make(map[backend.ID]indexEntry), + created: time.Now(), } } @@ -54,6 +57,42 @@ func (idx *Index) Final() bool { return idx.final } +const ( + indexMinBlobs = 20 + indexMaxBlobs = 2000 + indexMinAge = 2 * time.Minute + indexMaxAge = 15 * time.Minute +) + +// Full returns true iff the index is "full enough" to be saved as a preliminary index. +func (idx *Index) Full() bool { + idx.m.Lock() + defer idx.m.Unlock() + + debug.Log("Index.Full", "checking whether index %p is full", idx) + + packs := len(idx.pack) + age := time.Now().Sub(idx.created) + + if age > indexMaxAge { + debug.Log("Index.Full", "index %p is old enough", idx, age) + return true + } + + if packs < indexMinBlobs || age < indexMinAge { + debug.Log("Index.Full", "index %p only has %d packs or is too young (%v)", idx, packs, age) + return false + } + + if packs > indexMaxBlobs { + debug.Log("Index.Full", "index %p has %d packs", idx, packs) + return true + } + + debug.Log("Index.Full", "index %p is not full", idx) + return false +} + // Store remembers the id and pack in the index. An existing entry will be // silently overwritten. func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) { diff --git a/repository/master_index.go b/repository/master_index.go index c032152cc..f7cdae261 100644 --- a/repository/master_index.go +++ b/repository/master_index.go @@ -187,7 +187,33 @@ func (mi *MasterIndex) NotFinalIndexes() []*Index { } } - debug.Log("MasterIndex.NotFinalIndexes", "saving %d indexes", len(list)) + debug.Log("MasterIndex.NotFinalIndexes", "return %d indexes", len(list)) + return list +} + +// FullIndexes returns all indexes that are full. +func (mi *MasterIndex) FullIndexes() []*Index { + mi.idxMutex.Lock() + defer mi.idxMutex.Unlock() + + var list []*Index + + debug.Log("MasterIndex.FullIndexes", "checking %d indexes", len(mi.idx)) + for _, idx := range mi.idx { + if idx.Final() { + debug.Log("MasterIndex.FullIndexes", "index %p is final", idx) + continue + } + + if idx.Full() { + debug.Log("MasterIndex.FullIndexes", "index %p is full", idx) + list = append(list, idx) + } else { + debug.Log("MasterIndex.FullIndexes", "index %p not full", idx) + } + } + + debug.Log("MasterIndex.FullIndexes", "return %d indexes", len(list)) return list } diff --git a/repository/repository.go b/repository/repository.go index 2fe65a963..ac2b8b03b 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -235,7 +235,7 @@ func (r *Repository) findPacker(size uint) (*pack.Packer, error) { if err != nil { return nil, err } - debug.Log("Repo.findPacker", "create new pack %p", blob) + debug.Log("Repo.findPacker", "create new pack %p for %d bytes", blob, size) return pack.NewPacker(r.key, blob), nil } @@ -513,9 +513,9 @@ func (bw *BlobWriter) ID() backend.ID { return bw.id } -// SaveIndex saves all new indexes in the backend. -func (r *Repository) SaveIndex() error { - for i, idx := range r.idx.NotFinalIndexes() { +// saveIndex saves all indexes in the backend. +func (r *Repository) saveIndex(indexes ...*Index) error { + for i, idx := range indexes { debug.Log("Repo.SaveIndex", "Saving index %d", i) blob, err := r.CreateEncryptedBlob(backend.Index) @@ -541,6 +541,16 @@ func (r *Repository) SaveIndex() error { return nil } +// SaveIndex saves all new indexes in the backend. +func (r *Repository) SaveIndex() error { + return r.saveIndex(r.idx.NotFinalIndexes()...) +} + +// SaveFullIndex saves all full indexes in the backend. +func (r *Repository) SaveFullIndex() error { + return r.saveIndex(r.idx.FullIndexes()...) +} + const loadIndexParallelism = 20 // LoadIndex loads all index files from the backend in parallel and stores them