2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-10 23:31:09 +00:00

Implement Repack()

This commit is contained in:
Alexander Neumann 2016-05-11 22:28:56 +02:00
parent 6ba38e9a38
commit 00139648a0
2 changed files with 163 additions and 62 deletions

View File

@ -1,8 +1,12 @@
package repository
import (
"fmt"
"os"
"restic/backend"
"restic/debug"
"restic/pack"
"restic/worker"
)
// Repack takes a list of packs together with a list of blobs contained in
@ -10,5 +14,133 @@ import (
// into a new pack. Afterwards, the packs are removed.
func Repack(repo *Repository, packs, keepBlobs backend.IDSet) error {
debug.Log("Repack", "repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
var buf []byte
for packID := range packs {
list, err := repo.ListPack(packID)
if err != nil {
return err
}
debug.Log("Repack", "processing pack %v, blobs: %v", packID.Str(), list)
for _, blob := range list {
buf, err = repo.LoadBlob(blob.Type, blob.ID, buf)
if err != nil {
return err
}
debug.Log("Repack", " loaded blob %v", blob.ID.Str())
_, err = repo.SaveAndEncrypt(blob.Type, buf, &blob.ID)
if err != nil {
return err
}
debug.Log("Repack", " saved blob %v", blob.ID.Str())
}
}
if err := repo.Flush(); err != nil {
return err
}
for packID := range packs {
err := repo.Backend().Remove(backend.Data, packID.String())
if err != nil {
debug.Log("Repack", "error removing pack %v: %v", packID.Str(), err)
return err
}
debug.Log("Repack", "removed pack %v", packID.Str())
}
return nil
}
const rebuildIndexWorkers = 10
type loadBlobsResult struct {
packID backend.ID
entries []pack.Blob
}
// loadBlobsFromAllPacks sends the contents of all packs to ch.
func loadBlobsFromAllPacks(repo *Repository, ch chan<- worker.Job, done <-chan struct{}) {
f := func(job worker.Job, done <-chan struct{}) (interface{}, error) {
packID := job.Data.(backend.ID)
entries, err := repo.ListPack(packID)
return loadBlobsResult{
packID: packID,
entries: entries,
}, err
}
jobCh := make(chan worker.Job)
wp := worker.New(rebuildIndexWorkers, f, jobCh, ch)
go func() {
for id := range repo.List(backend.Data, done) {
jobCh <- worker.Job{Data: id}
}
close(jobCh)
}()
wp.Wait()
}
// RebuildIndex lists all packs in the repo, writes a new index and removes all
// old indexes. This operation should only be done with an exclusive lock in
// place.
func RebuildIndex(repo *Repository) error {
debug.Log("RebuildIndex", "start rebuilding index")
done := make(chan struct{})
defer close(done)
ch := make(chan worker.Job)
go loadBlobsFromAllPacks(repo, ch, done)
idx := NewIndex()
for job := range ch {
id := job.Data.(backend.ID)
if job.Error != nil {
fmt.Fprintf(os.Stderr, "error for pack %v: %v\n", id, job.Error)
continue
}
res := job.Result.(loadBlobsResult)
for _, entry := range res.entries {
pb := PackedBlob{
ID: entry.ID,
Type: entry.Type,
Length: entry.Length,
Offset: entry.Offset,
PackID: res.packID,
}
idx.Store(pb)
}
}
oldIndexes := backend.NewIDSet()
for id := range repo.List(backend.Index, done) {
idx.AddToSupersedes(id)
oldIndexes.Insert(id)
}
id, err := SaveIndex(repo, idx)
if err != nil {
debug.Log("RebuildIndex.RebuildIndex", "error saving index: %v", err)
return err
}
debug.Log("RebuildIndex.RebuildIndex", "new index saved as %v", id.Str())
for indexID := range oldIndexes {
err := repo.Backend().Remove(backend.Index, indexID.String())
if err != nil {
fmt.Fprintf(os.Stderr, "unable to remove index %v: %v\n", indexID.Str(), err)
}
}
return nil
}

View File

@ -55,55 +55,6 @@ func createRandomBlobs(t *testing.T, repo *Repository, blobs int, pData float32)
}
}
// redundancy returns the amount of duplicate data in the repo. It only looks
// at all pack files.
func redundancy(t *testing.T, repo *Repository) float32 {
done := make(chan struct{})
defer close(done)
type redEntry struct {
count int
size int
}
red := make(map[backend.ID]redEntry)
for id := range repo.List(backend.Data, done) {
entries, err := repo.ListPack(id)
if err != nil {
t.Fatalf("error listing pack %v: %v", id.Str(), err)
}
for _, e := range entries {
updatedEntry := redEntry{
count: 1,
size: int(e.Length),
}
if oldEntry, ok := red[e.ID]; ok {
updatedEntry.count += oldEntry.count
if updatedEntry.size != oldEntry.size {
t.Fatalf("sizes do not match: %v != %v", updatedEntry.size, oldEntry.size)
}
}
red[e.ID] = updatedEntry
}
}
totalBytes := 0
redundantBytes := 0
for _, v := range red {
totalBytes += v.count * v.size
if v.count > 1 {
redundantBytes += (v.count - 1) * v.size
}
}
return float32(redundantBytes) / float32(totalBytes)
}
// selectBlobs returns a list of random blobs from the repository with probability p.
func selectBlobs(t *testing.T, repo *Repository, p float32) backend.IDSet {
done := make(chan struct{})
@ -155,6 +106,32 @@ func findPacksForBlobs(t *testing.T, repo *Repository, blobs backend.IDSet) back
return packs
}
func repack(t *testing.T, repo *Repository, packs, blobs backend.IDSet) {
err := Repack(repo, packs, blobs)
if err != nil {
t.Fatal(err)
}
}
func saveIndex(t *testing.T, repo *Repository) {
if err := repo.SaveIndex(); err != nil {
t.Fatalf("repo.SaveIndex() %v", err)
}
}
func rebuildIndex(t *testing.T, repo *Repository) {
if err := RebuildIndex(repo); err != nil {
t.Fatalf("error rebuilding index: %v", err)
}
}
func reloadIndex(t *testing.T, repo *Repository) {
repo.SetIndex(NewMasterIndex())
if err := repo.LoadIndex(); err != nil {
t.Fatalf("error loading new index: %v", err)
}
}
func TestRepack(t *testing.T) {
repo, cleanup := TestRepository(t)
defer cleanup()
@ -164,10 +141,7 @@ func TestRepack(t *testing.T) {
packsBefore := listPacks(t, repo)
// Running repack on empty ID sets should not do anything at all.
err := Repack(repo, nil, nil)
if err != nil {
t.Fatal(err)
}
repack(t, repo, nil, nil)
packsAfter := listPacks(t, repo)
@ -176,19 +150,14 @@ func TestRepack(t *testing.T) {
packsBefore, packsAfter)
}
if err := repo.SaveIndex(); err != nil {
t.Fatalf("repo.SaveIndex() %v", err)
}
saveIndex(t, repo)
blobs := selectBlobs(t, repo, 0.2)
t.Logf("selected %d blobs: %v", len(blobs), blobs)
packs := findPacksForBlobs(t, repo, blobs)
err = Repack(repo, packs, blobs)
if err != nil {
t.Fatalf("Repack() error %v", err)
}
repack(t, repo, packs, blobs)
rebuildIndex(t, repo)
reloadIndex(t, repo)
packsAfter = listPacks(t, repo)
for id := range packs {