2
2
mirror of https://github.com/octoleo/restic.git synced 2025-02-02 11:58:26 +00:00

Use streaming functions for saving data in repo

This commit is contained in:
Alexander Neumann 2017-01-22 17:53:00 +01:00
parent 9b48da5b4e
commit a36c01372d
4 changed files with 76 additions and 66 deletions

View File

@ -114,3 +114,13 @@ func (id *ID) UnmarshalJSON(b []byte) error {
return nil return nil
} }
// IDFromHash returns the ID for the hash.
func IDFromHash(hash []byte) (id ID) {
if len(hash) != idSize {
panic("invalid hash type, not enough/too many bytes")
}
copy(id[:], hash)
return id
}

View File

@ -1,7 +1,7 @@
package repository package repository
import ( import (
"bytes" "crypto/sha256"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -9,6 +9,7 @@ import (
"sync" "sync"
"restic/errors" "restic/errors"
"restic/hashing"
"restic/crypto" "restic/crypto"
"restic/debug" "restic/debug"
@ -21,12 +22,19 @@ type Saver interface {
Save(restic.Handle, io.Reader) error Save(restic.Handle, io.Reader) error
} }
// Packer holds a pack.Packer together with a hash writer.
type Packer struct {
*pack.Packer
hw *hashing.Writer
tmpfile *os.File
}
// packerManager keeps a list of open packs and creates new on demand. // packerManager keeps a list of open packs and creates new on demand.
type packerManager struct { type packerManager struct {
be Saver be Saver
key *crypto.Key key *crypto.Key
pm sync.Mutex pm sync.Mutex
packs []*pack.Packer packers []*Packer
pool sync.Pool pool sync.Pool
} }
@ -51,18 +59,18 @@ func newPackerManager(be Saver, key *crypto.Key) *packerManager {
// findPacker returns a packer for a new blob of size bytes. Either a new one is // findPacker returns a packer for a new blob of size bytes. Either a new one is
// created or one is returned that already has some blobs. // created or one is returned that already has some blobs.
func (r *packerManager) findPacker(size uint) (packer *pack.Packer, err error) { func (r *packerManager) findPacker(size uint) (packer *Packer, err error) {
r.pm.Lock() r.pm.Lock()
defer r.pm.Unlock() defer r.pm.Unlock()
// search for a suitable packer // search for a suitable packer
if len(r.packs) > 0 { if len(r.packers) > 0 {
debug.Log("searching packer for %d bytes\n", size) debug.Log("searching packer for %d bytes\n", size)
for i, p := range r.packs { for i, p := range r.packers {
if p.Size()+size < maxPackSize { if p.Packer.Size()+size < maxPackSize {
debug.Log("found packer %v", p) debug.Log("found packer %v", p)
// remove from list // remove from list
r.packs = append(r.packs[:i], r.packs[i+1:]...) r.packers = append(r.packers[:i], r.packers[i+1:]...)
return p, nil return p, nil
} }
} }
@ -75,50 +83,43 @@ func (r *packerManager) findPacker(size uint) (packer *pack.Packer, err error) {
return nil, errors.Wrap(err, "ioutil.TempFile") return nil, errors.Wrap(err, "ioutil.TempFile")
} }
return pack.NewPacker(r.key, tmpfile), nil hw := hashing.NewWriter(tmpfile, sha256.New())
p := pack.NewPacker(r.key, hw)
packer = &Packer{
Packer: p,
hw: hw,
tmpfile: tmpfile,
}
return packer, nil
} }
// insertPacker appends p to s.packs. // insertPacker appends p to s.packs.
func (r *packerManager) insertPacker(p *pack.Packer) { func (r *packerManager) insertPacker(p *Packer) {
r.pm.Lock() r.pm.Lock()
defer r.pm.Unlock() defer r.pm.Unlock()
r.packs = append(r.packs, p) r.packers = append(r.packers, p)
debug.Log("%d packers\n", len(r.packs)) debug.Log("%d packers\n", len(r.packers))
} }
// savePacker stores p in the backend. // savePacker stores p in the backend.
func (r *Repository) savePacker(p *pack.Packer) error { func (r *Repository) savePacker(p *Packer) error {
debug.Log("save packer with %d blobs\n", p.Count()) debug.Log("save packer with %d blobs\n", p.Packer.Count())
n, err := p.Finalize() _, err := p.Packer.Finalize()
if err != nil { if err != nil {
return err return err
} }
tmpfile := p.Writer().(*os.File) f, err := fs.Open(p.tmpfile.Name())
f, err := fs.Open(tmpfile.Name())
if err != nil { if err != nil {
return errors.Wrap(err, "Open") return errors.Wrap(err, "Open")
} }
data := make([]byte, n) id := restic.IDFromHash(p.hw.Sum(nil))
m, err := io.ReadFull(f, data)
if err != nil {
return errors.Wrap(err, "ReadFul")
}
if uint(m) != n {
return errors.Errorf("read wrong number of bytes from %v: want %v, got %v", tmpfile.Name(), n, m)
}
if err = f.Close(); err != nil {
return errors.Wrap(err, "Close")
}
id := restic.Hash(data)
h := restic.Handle{Type: restic.DataFile, Name: id.String()} h := restic.Handle{Type: restic.DataFile, Name: id.String()}
err = r.be.Save(h, bytes.NewReader(data)) err = r.be.Save(h, f)
if err != nil { if err != nil {
debug.Log("Save(%v) error: %v", h, err) debug.Log("Save(%v) error: %v", h, err)
return err return err
@ -126,13 +127,18 @@ func (r *Repository) savePacker(p *pack.Packer) error {
debug.Log("saved as %v", h) debug.Log("saved as %v", h)
err = fs.Remove(tmpfile.Name()) err = f.Close()
if err != nil {
return errors.Wrap(err, "close tempfile")
}
err = fs.Remove(p.tmpfile.Name())
if err != nil { if err != nil {
return errors.Wrap(err, "Remove") return errors.Wrap(err, "Remove")
} }
// update blobs in the index // update blobs in the index
for _, b := range p.Blobs() { for _, b := range p.Packer.Blobs() {
debug.Log(" updating blob %v to pack %v", b.ID.Str(), id.Str()) debug.Log(" updating blob %v to pack %v", b.ID.Str(), id.Str())
r.idx.Store(restic.PackedBlob{ r.idx.Store(restic.PackedBlob{
Blob: restic.Blob{ Blob: restic.Blob{
@ -153,5 +159,5 @@ func (r *packerManager) countPacker() int {
r.pm.Lock() r.pm.Lock()
defer r.pm.Unlock() defer r.pm.Unlock()
return len(r.packs) return len(r.packers)
} }

View File

@ -1,7 +1,6 @@
package repository package repository
import ( import (
"bytes"
"io" "io"
"math/rand" "math/rand"
"os" "os"
@ -48,26 +47,22 @@ func randomID(rd io.Reader) restic.ID {
const maxBlobSize = 1 << 20 const maxBlobSize = 1 << 20
func saveFile(t testing.TB, be Saver, filename string, n int) { func saveFile(t testing.TB, be Saver, filename string, id restic.ID) {
f, err := os.Open(filename) f, err := os.Open(filename)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
data := make([]byte, n) defer func() {
m, err := io.ReadFull(f, data) if err := f.Close(); err != nil {
if m != n {
t.Fatalf("read wrong number of bytes from %v: want %v, got %v", filename, m, n)
}
if err = f.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
}()
h := restic.Handle{Type: restic.DataFile, Name: restic.Hash(data).String()} h := restic.Handle{Type: restic.DataFile, Name: id.String()}
t.Logf("save file %v", h)
err = be.Save(h, bytes.NewReader(data)) err = be.Save(h, f)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -107,13 +102,13 @@ func fillPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager, buf [
continue continue
} }
bytesWritten, err := packer.Finalize() _, err = packer.Finalize()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
tmpfile := packer.Writer().(*os.File) packID := restic.IDFromHash(packer.hw.Sum(nil))
saveFile(t, be, tmpfile.Name(), int(bytesWritten)) saveFile(t, be, packer.tmpfile.Name(), packID)
} }
return bytes return bytes
@ -121,15 +116,15 @@ func fillPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager, buf [
func flushRemainingPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager) (bytes int) { func flushRemainingPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager) (bytes int) {
if pm.countPacker() > 0 { if pm.countPacker() > 0 {
for _, packer := range pm.packs { for _, packer := range pm.packers {
n, err := packer.Finalize() n, err := packer.Finalize()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
bytes += int(n) bytes += int(n)
tmpfile := packer.Writer().(*os.File) packID := restic.IDFromHash(packer.hw.Sum(nil))
saveFile(t, be, tmpfile.Name(), bytes) saveFile(t, be, packer.tmpfile.Name(), packID)
} }
} }
@ -156,16 +151,15 @@ func BenchmarkPackerManager(t *testing.B) {
be := &mock.Backend{ be := &mock.Backend{
SaveFn: func(restic.Handle, io.Reader) error { return nil }, SaveFn: func(restic.Handle, io.Reader) error { return nil },
} }
pm := newPackerManager(be, crypto.NewRandomKey())
blobBuf := make([]byte, maxBlobSize) blobBuf := make([]byte, maxBlobSize)
t.ResetTimer() t.ResetTimer()
bytes := 0
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
bytes := 0
pm := newPackerManager(be, crypto.NewRandomKey())
bytes += fillPacks(t, rnd, be, pm, blobBuf) bytes += fillPacks(t, rnd, be, pm, blobBuf)
}
bytes += flushRemainingPacks(t, rnd, be, pm) bytes += flushRemainingPacks(t, rnd, be, pm)
t.Logf("saved %d bytes", bytes) t.Logf("saved %d bytes", bytes)
}
} }

View File

@ -235,15 +235,15 @@ func (r *Repository) Flush() error {
r.pm.Lock() r.pm.Lock()
defer r.pm.Unlock() defer r.pm.Unlock()
debug.Log("manually flushing %d packs", len(r.packs)) debug.Log("manually flushing %d packs", len(r.packerManager.packers))
for _, p := range r.packs { for _, p := range r.packerManager.packers {
err := r.savePacker(p) err := r.savePacker(p)
if err != nil { if err != nil {
return err return err
} }
} }
r.packs = r.packs[:0] r.packerManager.packers = r.packerManager.packers[:0]
return nil return nil
} }