From 8209bb309bf061614b9db8ec1d48486b78094b6f Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Wed, 18 Nov 2015 20:20:25 +0100 Subject: [PATCH] split out decryptReader and packerManager --- repository/decrypt_read_closer.go | 36 +++++++++ repository/packer_manager.go | 102 +++++++++++++++++++++++++ repository/repository.go | 123 ++---------------------------- 3 files changed, 144 insertions(+), 117 deletions(-) create mode 100644 repository/decrypt_read_closer.go create mode 100644 repository/packer_manager.go diff --git a/repository/decrypt_read_closer.go b/repository/decrypt_read_closer.go new file mode 100644 index 000000000..81a6f9513 --- /dev/null +++ b/repository/decrypt_read_closer.go @@ -0,0 +1,36 @@ +package repository + +import ( + "io" + + "github.com/restic/restic/crypto" +) + +// decryptReadCloser couples an underlying reader with a DecryptReader and +// implements io.ReadCloser. On Close(), both readers are closed. +type decryptReadCloser struct { + r io.ReadCloser + dr io.ReadCloser +} + +func newDecryptReadCloser(key *crypto.Key, rd io.ReadCloser) (io.ReadCloser, error) { + dr, err := crypto.DecryptFrom(key, rd) + if err != nil { + return nil, err + } + + return &decryptReadCloser{r: rd, dr: dr}, nil +} + +func (dr *decryptReadCloser) Read(buf []byte) (int, error) { + return dr.dr.Read(buf) +} + +func (dr *decryptReadCloser) Close() error { + err := dr.dr.Close() + if err != nil { + return err + } + + return dr.r.Close() +} diff --git a/repository/packer_manager.go b/repository/packer_manager.go new file mode 100644 index 000000000..99b74cea4 --- /dev/null +++ b/repository/packer_manager.go @@ -0,0 +1,102 @@ +package repository + +import ( + "sync" + + "github.com/restic/chunker" + "github.com/restic/restic/backend" + "github.com/restic/restic/crypto" + "github.com/restic/restic/debug" + "github.com/restic/restic/pack" +) + +// packerManager keeps a list of open packs and creates new on demand. +type packerManager struct { + be backend.Backend + key *crypto.Key + pm sync.Mutex + packs []*pack.Packer +} + +const minPackSize = 4 * chunker.MiB +const maxPackSize = 16 * chunker.MiB +const maxPackers = 200 + +// findPacker returns a packer for a new blob of size bytes. Either a new one is +// created or one is returned that already has some blobs. +func (r *packerManager) findPacker(size uint) (*pack.Packer, error) { + r.pm.Lock() + defer r.pm.Unlock() + + // search for a suitable packer + if len(r.packs) > 0 { + debug.Log("Repo.findPacker", "searching packer for %d bytes\n", size) + for i, p := range r.packs { + if p.Size()+size < maxPackSize { + debug.Log("Repo.findPacker", "found packer %v", p) + // remove from list + r.packs = append(r.packs[:i], r.packs[i+1:]...) + return p, nil + } + } + } + + // no suitable packer found, return new + blob, err := r.be.Create() + if err != nil { + return nil, err + } + debug.Log("Repo.findPacker", "create new pack %p for %d bytes", blob, size) + return pack.NewPacker(r.key, blob), nil +} + +// insertPacker appends p to s.packs. +func (r *packerManager) insertPacker(p *pack.Packer) { + r.pm.Lock() + defer r.pm.Unlock() + + r.packs = append(r.packs, p) + debug.Log("Repo.insertPacker", "%d packers\n", len(r.packs)) +} + +// savePacker stores p in the backend. +func (r *Repository) savePacker(p *pack.Packer) error { + debug.Log("Repo.savePacker", "save packer with %d blobs\n", p.Count()) + _, err := p.Finalize() + if err != nil { + return err + } + + // move file to the final location + sid := p.ID() + err = p.Writer().(backend.Blob).Finalize(backend.Data, sid.String()) + if err != nil { + debug.Log("Repo.savePacker", "blob Finalize() error: %v", err) + return err + } + + debug.Log("Repo.savePacker", "saved as %v", sid.Str()) + + // update blobs in the index + for _, b := range p.Blobs() { + debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str()) + r.idx.Current().Store(PackedBlob{ + Type: b.Type, + ID: b.ID, + PackID: sid, + Offset: b.Offset, + Length: uint(b.Length), + }) + r.idx.RemoveFromInFlight(b.ID) + } + + return nil +} + +// countPacker returns the number of open (unfinished) packers. +func (r *packerManager) countPacker() int { + r.pm.Lock() + defer r.pm.Unlock() + + return len(r.packs) +} diff --git a/repository/repository.go b/repository/repository.go index 03ba4ed59..0b1955486 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -9,9 +9,7 @@ import ( "io" "io/ioutil" "os" - "sync" - "github.com/restic/chunker" "github.com/restic/restic/backend" "github.com/restic/restic/crypto" "github.com/restic/restic/debug" @@ -26,8 +24,7 @@ type Repository struct { keyName string idx *MasterIndex - pm sync.Mutex - packs []*pack.Packer + *packerManager } // New returns a new repository with backend be. @@ -35,6 +32,9 @@ func New(be backend.Backend) *Repository { return &Repository{ be: be, idx: NewMasterIndex(), + packerManager: &packerManager{ + be: be, + }, } } @@ -218,90 +218,6 @@ func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) { return r.idx.LookupSize(id) } -const minPackSize = 4 * chunker.MiB -const maxPackSize = 16 * chunker.MiB -const maxPackers = 200 - -// findPacker returns a packer for a new blob of size bytes. Either a new one is -// created or one is returned that already has some blobs. -func (r *Repository) findPacker(size uint) (*pack.Packer, error) { - r.pm.Lock() - defer r.pm.Unlock() - - // search for a suitable packer - if len(r.packs) > 0 { - debug.Log("Repo.findPacker", "searching packer for %d bytes\n", size) - for i, p := range r.packs { - if p.Size()+size < maxPackSize { - debug.Log("Repo.findPacker", "found packer %v", p) - // remove from list - r.packs = append(r.packs[:i], r.packs[i+1:]...) - return p, nil - } - } - } - - // no suitable packer found, return new - blob, err := r.be.Create() - if err != nil { - return nil, err - } - debug.Log("Repo.findPacker", "create new pack %p for %d bytes", blob, size) - return pack.NewPacker(r.key, blob), nil -} - -// insertPacker appends p to s.packs. -func (r *Repository) insertPacker(p *pack.Packer) { - r.pm.Lock() - defer r.pm.Unlock() - - r.packs = append(r.packs, p) - debug.Log("Repo.insertPacker", "%d packers\n", len(r.packs)) -} - -// savePacker stores p in the backend. -func (r *Repository) savePacker(p *pack.Packer) error { - debug.Log("Repo.savePacker", "save packer with %d blobs\n", p.Count()) - _, err := p.Finalize() - if err != nil { - return err - } - - // move file to the final location - sid := p.ID() - err = p.Writer().(backend.Blob).Finalize(backend.Data, sid.String()) - if err != nil { - debug.Log("Repo.savePacker", "blob Finalize() error: %v", err) - return err - } - - debug.Log("Repo.savePacker", "saved as %v", sid.Str()) - - // update blobs in the index - var packedBlobs []PackedBlob - for _, b := range p.Blobs() { - packedBlobs = append(packedBlobs, PackedBlob{ - Type: b.Type, - ID: b.ID, - PackID: sid, - Offset: b.Offset, - Length: uint(b.Length), - }) - r.idx.RemoveFromInFlight(b.ID) - } - r.idx.Current().StoreBlobs(packedBlobs) - - return nil -} - -// countPacker returns the number of open (unfinished) packers. -func (r *Repository) countPacker() int { - r.pm.Lock() - defer r.pm.Unlock() - - return len(r.packs) -} - // SaveAndEncrypt encrypts data and stores it to the backend as type t. If data is small // enough, it will be packed together with other small blobs. func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID) (backend.ID, error) { @@ -628,35 +544,6 @@ func LoadIndex(repo *Repository, id string) (*Index, error) { return nil, err } -// decryptReadCloser couples an underlying reader with a DecryptReader and -// implements io.ReadCloser. On Close(), both readers are closed. -type decryptReadCloser struct { - r io.ReadCloser - dr io.ReadCloser -} - -func newDecryptReadCloser(key *crypto.Key, rd io.ReadCloser) (io.ReadCloser, error) { - dr, err := crypto.DecryptFrom(key, rd) - if err != nil { - return nil, err - } - - return &decryptReadCloser{r: rd, dr: dr}, nil -} - -func (dr *decryptReadCloser) Read(buf []byte) (int, error) { - return dr.dr.Read(buf) -} - -func (dr *decryptReadCloser) Close() error { - err := dr.dr.Close() - if err != nil { - return err - } - - return dr.r.Close() -} - // GetDecryptReader opens the file id stored in the backend and returns a // reader that yields the decrypted content. The reader must be closed. func (r *Repository) GetDecryptReader(t backend.Type, id string) (io.ReadCloser, error) { @@ -677,6 +564,7 @@ func (r *Repository) SearchKey(password string) error { } r.key = key.master + r.packerManager.key = key.master r.keyName = key.Name() r.Config, err = LoadConfig(r) return err @@ -699,6 +587,7 @@ func (r *Repository) Init(password string) error { } r.key = key.master + r.packerManager.key = key.master r.keyName = key.Name() r.Config, err = CreateConfig(r) return err