2
2
mirror of https://github.com/octoleo/restic.git synced 2025-01-26 00:28:26 +00:00

Refactor rebuild-index code

This code reads all pack headers from all packs and rebuilds the index
from scratch. Afterwards, all indexes are removed. This is needed
because in #434 the command `optimize` produced a broken index that
did not contain a blob any more. Running `rebuild-index` should fix
this.
This commit is contained in:
Alexander Neumann 2016-02-23 23:48:55 +01:00
parent bc911f4609
commit 4cefd456bb
2 changed files with 68 additions and 131 deletions

View File

@ -101,7 +101,7 @@ func printTrees(repo *repository.Repository, wr io.Writer) error {
return nil return nil
} }
const numWorkers = 10 const dumpPackWorkers = 10
// Pack is the struct used in printPacks. // Pack is the struct used in printPacks.
type Pack struct { type Pack struct {
@ -138,7 +138,7 @@ func printPacks(repo *repository.Repository, wr io.Writer) error {
jobCh := make(chan worker.Job) jobCh := make(chan worker.Job)
resCh := make(chan worker.Job) resCh := make(chan worker.Job)
wp := worker.New(numWorkers, f, jobCh, resCh) wp := worker.New(dumpPackWorkers, f, jobCh, resCh)
go func() { go func() {
for name := range repo.Backend().List(backend.Data, done) { for name := range repo.Backend().List(backend.Data, done) {

View File

@ -1,13 +1,13 @@
package main package main
import ( import (
"bytes"
"fmt" "fmt"
"os"
"restic/backend" "restic/backend"
"restic/debug" "restic/debug"
"restic/pack" "restic/pack"
"restic/repository" "restic/repository"
"restic/worker"
) )
type CmdRebuildIndex struct { type CmdRebuildIndex struct {
@ -26,164 +26,101 @@ func init() {
} }
} }
func (cmd CmdRebuildIndex) storeIndex(index *repository.Index) (*repository.Index, error) { const rebuildIndexWorkers = 10
debug.Log("RebuildIndex.RebuildIndex", "saving index")
cmd.global.Printf(" saving new index\n")
id, err := repository.SaveIndex(cmd.repo, index)
if err != nil {
debug.Log("RebuildIndex.RebuildIndex", "error saving index: %v", err)
return nil, err
}
debug.Log("RebuildIndex.RebuildIndex", "index saved as %v", id.Str())
index = repository.NewIndex()
return index, nil
}
func (cmd CmdRebuildIndex) RebuildIndex() error {
debug.Log("RebuildIndex.RebuildIndex", "start")
func loadBlobsFromPacks(repo *repository.Repository) (packs map[backend.ID][]pack.Blob) {
done := make(chan struct{}) done := make(chan struct{})
defer close(done) defer close(done)
indexIDs := backend.NewIDSet() f := func(job worker.Job, done <-chan struct{}) (interface{}, error) {
for id := range cmd.repo.List(backend.Index, done) { id := job.Data.(backend.ID)
indexIDs.Insert(id)
}
cmd.global.Printf("rebuilding index from %d indexes\n", len(indexIDs)) h := backend.Handle{Type: backend.Data, Name: id.String()}
rd := backend.NewReadSeeker(repo.Backend(), h)
debug.Log("RebuildIndex.RebuildIndex", "found %v indexes", len(indexIDs)) unpacker, err := pack.NewUnpacker(repo.Key(), rd)
combinedIndex := repository.NewIndex()
packsDone := backend.NewIDSet()
type Blob struct {
id backend.ID
tpe pack.BlobType
}
blobsDone := make(map[Blob]struct{})
i := 0
for indexID := range indexIDs {
cmd.global.Printf(" loading index %v\n", i)
debug.Log("RebuildIndex.RebuildIndex", "load index %v", indexID.Str())
idx, err := repository.LoadIndex(cmd.repo, indexID)
if err != nil { if err != nil {
return err return nil, err
} }
debug.Log("RebuildIndex.RebuildIndex", "adding blobs from index %v", indexID.Str()) return unpacker.Entries, nil
for packedBlob := range idx.Each(done) {
packsDone.Insert(packedBlob.PackID)
b := Blob{
id: packedBlob.ID,
tpe: packedBlob.Type,
} }
if _, ok := blobsDone[b]; ok {
jobCh := make(chan worker.Job)
resCh := make(chan worker.Job)
wp := worker.New(rebuildIndexWorkers, f, jobCh, resCh)
go func() {
for id := range repo.List(backend.Data, done) {
jobCh <- worker.Job{Data: id}
}
close(jobCh)
}()
packs = make(map[backend.ID][]pack.Blob)
for job := range resCh {
id := job.Data.(backend.ID)
if job.Error != nil {
fmt.Fprintf(os.Stderr, "error for pack %v: %v\n", id, job.Error)
continue continue
} }
blobsDone[b] = struct{}{} entries := job.Result.([]pack.Blob)
combinedIndex.Store(packedBlob) packs[id] = entries
} }
combinedIndex.AddToSupersedes(indexID) wp.Wait()
if repository.IndexFull(combinedIndex) { return packs
combinedIndex, err = cmd.storeIndex(combinedIndex)
if err != nil {
return err
}
} }
i++ func listIndexIDs(repo *repository.Repository) (list backend.IDs) {
done := make(chan struct{})
for id := range repo.List(backend.Index, done) {
list = append(list, id)
} }
var err error return list
if combinedIndex.Length() > 0 {
combinedIndex, err = cmd.storeIndex(combinedIndex)
if err != nil {
return err
}
} }
cmd.global.Printf("removing %d old indexes\n", len(indexIDs)) func (cmd CmdRebuildIndex) RebuildIndex() error {
for id := range indexIDs { debug.Log("RebuildIndex.RebuildIndex", "start rebuilding index")
debug.Log("RebuildIndex.RebuildIndex", "remove index %v", id.Str())
err := cmd.repo.Backend().Remove(backend.Index, id.String()) packs := loadBlobsFromPacks(cmd.repo)
if err != nil { cmd.global.Verbosef("loaded blobs from %d packs\n", len(packs))
debug.Log("RebuildIndex.RebuildIndex", "error removing index %v: %v", id.Str(), err)
return err
}
}
cmd.global.Printf("checking for additional packs\n") idx := repository.NewIndex()
newPacks := 0 for packID, entries := range packs {
var buf []byte for _, entry := range entries {
for packID := range cmd.repo.List(backend.Data, done) { pb := repository.PackedBlob{
if packsDone.Has(packID) { ID: entry.ID,
continue Type: entry.Type,
} Length: entry.Length,
Offset: entry.Offset,
debug.Log("RebuildIndex.RebuildIndex", "pack %v not indexed", packID.Str())
newPacks++
var err error
h := backend.Handle{Type: backend.Data, Name: packID.String()}
buf, err = backend.LoadAll(cmd.repo.Backend(), h, buf)
if err != nil {
debug.Log("RebuildIndex.RebuildIndex", "error while loading pack %v", packID.Str())
return fmt.Errorf("error while loading pack %v: %v", packID.Str(), err)
}
hash := backend.Hash(buf)
if !hash.Equal(packID) {
debug.Log("RebuildIndex.RebuildIndex", "Pack ID does not match, want %v, got %v", packID.Str(), hash.Str())
return fmt.Errorf("Pack ID does not match, want %v, got %v", packID.Str(), hash.Str())
}
up, err := pack.NewUnpacker(cmd.repo.Key(), bytes.NewReader(buf))
if err != nil {
debug.Log("RebuildIndex.RebuildIndex", "error while unpacking pack %v", packID.Str())
return err
}
for _, blob := range up.Entries {
debug.Log("RebuildIndex.RebuildIndex", "pack %v: blob %v", packID.Str(), blob)
combinedIndex.Store(repository.PackedBlob{
Type: blob.Type,
ID: blob.ID,
PackID: packID, PackID: packID,
Offset: blob.Offset, }
Length: blob.Length, idx.Store(pb)
}) }
} }
if repository.IndexFull(combinedIndex) { oldIndexes := listIndexIDs(cmd.repo)
combinedIndex, err = cmd.storeIndex(combinedIndex) idx.AddToSupersedes(oldIndexes...)
cmd.global.Printf(" saving new index\n")
id, err := repository.SaveIndex(cmd.repo, idx)
if err != nil { if err != nil {
debug.Log("RebuildIndex.RebuildIndex", "error saving index: %v", err)
return err return err
} }
} debug.Log("RebuildIndex.RebuildIndex", "new index saved as %v", id.Str())
}
if combinedIndex.Length() > 0 { for _, indexID := range oldIndexes {
combinedIndex, err = cmd.storeIndex(combinedIndex) err := cmd.repo.Backend().Remove(backend.Index, indexID.String())
if err != nil { if err != nil {
return err cmd.global.Warnf("unable to remove index %v: %v\n", indexID.Str(), err)
} }
} }
cmd.global.Printf("added %d packs to the index\n", newPacks)
debug.Log("RebuildIndex.RebuildIndex", "done")
return nil return nil
} }