2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-02 11:46:36 +00:00
restic/src/restic/repository/packer_manager.go

158 lines
3.3 KiB
Go
Raw Normal View History

package repository
import (
"bytes"
"io"
"io/ioutil"
"os"
2016-08-31 18:29:54 +00:00
"restic"
"sync"
2016-09-01 20:17:37 +00:00
"restic/errors"
"restic/crypto"
"restic/debug"
"restic/fs"
"restic/pack"
)
// Saver implements saving data in a backend.
type Saver interface {
Save(restic.Handle, io.Reader) error
}
// packerManager keeps a list of open packs and creates new on demand.
type packerManager struct {
be Saver
key *crypto.Key
pm sync.Mutex
packs []*pack.Packer
2016-03-06 12:14:06 +00:00
pool sync.Pool
}
const minPackSize = 4 * 1024 * 1024
const maxPackSize = 16 * 1024 * 1024
const maxPackers = 200
2016-03-06 13:20:48 +00:00
// newPackerManager returns an new packer manager which writes temporary files
// to a temporary directory
2016-03-06 13:20:48 +00:00
func newPackerManager(be Saver, key *crypto.Key) *packerManager {
2016-03-06 12:14:06 +00:00
return &packerManager{
be: be,
key: key,
2016-03-06 12:14:06 +00:00
pool: sync.Pool{
New: func() interface{} {
return make([]byte, (minPackSize+maxPackSize)/2)
},
},
}
}
// findPacker returns a packer for a new blob of size bytes. Either a new one is
// created or one is returned that already has some blobs.
2016-03-06 12:14:06 +00:00
func (r *packerManager) findPacker(size uint) (packer *pack.Packer, err error) {
r.pm.Lock()
defer r.pm.Unlock()
// search for a suitable packer
if len(r.packs) > 0 {
2016-09-27 20:35:08 +00:00
debug.Log("searching packer for %d bytes\n", size)
for i, p := range r.packs {
if p.Size()+size < maxPackSize {
2016-09-27 20:35:08 +00:00
debug.Log("found packer %v", p)
// remove from list
r.packs = append(r.packs[:i], r.packs[i+1:]...)
return p, nil
}
}
}
// no suitable packer found, return new
2016-09-27 20:35:08 +00:00
debug.Log("create new pack for %d bytes", size)
2016-03-06 12:14:06 +00:00
tmpfile, err := ioutil.TempFile("", "restic-temp-pack-")
if err != nil {
2016-08-29 20:16:58 +00:00
return nil, errors.Wrap(err, "ioutil.TempFile")
}
return pack.NewPacker(r.key, tmpfile), nil
}
// insertPacker appends p to s.packs.
func (r *packerManager) insertPacker(p *pack.Packer) {
r.pm.Lock()
defer r.pm.Unlock()
r.packs = append(r.packs, p)
2016-09-27 20:35:08 +00:00
debug.Log("%d packers\n", len(r.packs))
}
// savePacker stores p in the backend.
func (r *Repository) savePacker(p *pack.Packer) error {
2016-09-27 20:35:08 +00:00
debug.Log("save packer with %d blobs\n", p.Count())
n, err := p.Finalize()
if err != nil {
return err
}
tmpfile := p.Writer().(*os.File)
f, err := fs.Open(tmpfile.Name())
if err != nil {
2016-08-29 20:16:58 +00:00
return errors.Wrap(err, "Open")
}
data := make([]byte, n)
m, err := io.ReadFull(f, data)
2016-08-29 20:16:58 +00:00
if err != nil {
return errors.Wrap(err, "ReadFul")
}
if uint(m) != n {
return errors.Errorf("read wrong number of bytes from %v: want %v, got %v", tmpfile.Name(), n, m)
}
if err = f.Close(); err != nil {
2016-08-29 20:16:58 +00:00
return errors.Wrap(err, "Close")
}
2016-08-31 18:29:54 +00:00
id := restic.Hash(data)
2016-09-01 19:19:30 +00:00
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
2016-01-24 18:30:14 +00:00
err = r.be.Save(h, bytes.NewReader(data))
if err != nil {
2016-09-27 20:35:08 +00:00
debug.Log("Save(%v) error: %v", h, err)
return err
}
2016-09-27 20:35:08 +00:00
debug.Log("saved as %v", h)
err = fs.Remove(tmpfile.Name())
2016-03-06 12:14:06 +00:00
if err != nil {
2016-08-29 20:16:58 +00:00
return errors.Wrap(err, "Remove")
2016-03-06 12:14:06 +00:00
}
// update blobs in the index
for _, b := range p.Blobs() {
2016-09-27 20:35:08 +00:00
debug.Log(" updating blob %v to pack %v", b.ID.Str(), id.Str())
r.idx.Store(restic.PackedBlob{
2016-08-31 20:39:36 +00:00
Blob: restic.Blob{
Type: b.Type,
ID: b.ID,
Offset: b.Offset,
Length: uint(b.Length),
},
2016-01-24 18:30:14 +00:00
PackID: id,
})
}
return nil
}
// countPacker returns the number of open (unfinished) packers.
func (r *packerManager) countPacker() int {
r.pm.Lock()
defer r.pm.Unlock()
return len(r.packs)
}