2015-11-18 19:20:25 +00:00
|
|
|
package repository
|
|
|
|
|
|
|
|
import (
|
2021-08-22 13:10:00 +00:00
|
|
|
"bufio"
|
2017-06-04 09:16:55 +00:00
|
|
|
"context"
|
2024-07-26 17:10:25 +00:00
|
|
|
"crypto/sha256"
|
2020-12-19 11:39:48 +00:00
|
|
|
"io"
|
2016-03-06 11:26:25 +00:00
|
|
|
"os"
|
2015-11-18 19:20:25 +00:00
|
|
|
"sync"
|
|
|
|
|
2023-10-01 09:40:12 +00:00
|
|
|
"github.com/restic/restic/internal/backend"
|
2017-07-23 12:21:03 +00:00
|
|
|
"github.com/restic/restic/internal/errors"
|
2024-05-24 21:11:53 +00:00
|
|
|
"github.com/restic/restic/internal/repository/hashing"
|
2017-07-24 15:42:25 +00:00
|
|
|
"github.com/restic/restic/internal/restic"
|
2017-07-23 12:21:03 +00:00
|
|
|
|
|
|
|
"github.com/restic/restic/internal/crypto"
|
|
|
|
"github.com/restic/restic/internal/debug"
|
|
|
|
"github.com/restic/restic/internal/fs"
|
2024-05-24 21:09:58 +00:00
|
|
|
"github.com/restic/restic/internal/repository/pack"
|
2015-11-18 19:20:25 +00:00
|
|
|
)
|
|
|
|
|
2024-05-19 14:10:48 +00:00
|
|
|
// packer holds a pack.packer together with a hash writer.
|
|
|
|
type packer struct {
|
2017-01-22 16:53:00 +00:00
|
|
|
*pack.Packer
|
|
|
|
tmpfile *os.File
|
2021-08-22 13:10:00 +00:00
|
|
|
bufWr *bufio.Writer
|
2017-01-22 16:53:00 +00:00
|
|
|
}
|
|
|
|
|
2015-11-18 19:20:25 +00:00
|
|
|
// packerManager keeps a list of open packs and creates new on demand.
|
|
|
|
type packerManager struct {
|
2021-08-22 13:10:00 +00:00
|
|
|
tpe restic.BlobType
|
|
|
|
key *crypto.Key
|
2024-05-19 14:10:48 +00:00
|
|
|
queueFn func(ctx context.Context, t restic.BlobType, p *packer) error
|
2021-08-07 20:52:05 +00:00
|
|
|
|
2022-07-02 21:30:26 +00:00
|
|
|
pm sync.Mutex
|
2024-05-19 14:10:48 +00:00
|
|
|
packer *packer
|
2022-07-02 21:30:26 +00:00
|
|
|
packSize uint
|
2015-11-18 19:20:25 +00:00
|
|
|
}
|
|
|
|
|
2023-12-06 12:11:55 +00:00
|
|
|
// newPackerManager returns a new packer manager which writes temporary files
|
2016-03-06 11:26:25 +00:00
|
|
|
// to a temporary directory
|
2024-05-19 14:10:48 +00:00
|
|
|
func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *packer) error) *packerManager {
|
2016-03-06 12:14:06 +00:00
|
|
|
return &packerManager{
|
2022-07-02 21:30:26 +00:00
|
|
|
tpe: tpe,
|
|
|
|
key: key,
|
|
|
|
queueFn: queueFn,
|
|
|
|
packSize: packSize,
|
2021-08-07 20:52:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *packerManager) Flush(ctx context.Context) error {
|
|
|
|
r.pm.Lock()
|
|
|
|
defer r.pm.Unlock()
|
|
|
|
|
2021-08-22 13:10:00 +00:00
|
|
|
if r.packer != nil {
|
|
|
|
debug.Log("manually flushing pending pack")
|
|
|
|
err := r.queueFn(ctx, r.tpe, r.packer)
|
2021-08-07 20:52:05 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-08-22 13:10:00 +00:00
|
|
|
r.packer = nil
|
2021-08-07 20:52:05 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) {
|
2021-08-22 13:10:00 +00:00
|
|
|
r.pm.Lock()
|
|
|
|
defer r.pm.Unlock()
|
|
|
|
|
|
|
|
var err error
|
|
|
|
packer := r.packer
|
2023-09-28 18:58:45 +00:00
|
|
|
// use separate packer if compressed length is larger than the packsize
|
|
|
|
// this speeds up the garbage collection of oversized blobs and reduces the cache size
|
|
|
|
// as the oversize blobs are only downloaded if necessary
|
|
|
|
if len(ciphertext) >= int(r.packSize) || r.packer == nil {
|
2021-08-22 13:10:00 +00:00
|
|
|
packer, err = r.newPacker()
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2023-09-28 18:58:45 +00:00
|
|
|
// don't store packer for oversized blob
|
|
|
|
if r.packer == nil {
|
|
|
|
r.packer = packer
|
|
|
|
}
|
2021-08-07 20:52:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// save ciphertext
|
2021-08-22 13:10:00 +00:00
|
|
|
// Add only appends bytes in memory to avoid being a scaling bottleneck
|
2021-08-07 20:52:05 +00:00
|
|
|
size, err := packer.Add(t, id, ciphertext, uncompressedLength)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
2022-04-30 22:05:20 +00:00
|
|
|
// if the pack and header is not full enough, put back to the list
|
|
|
|
if packer.Size() < r.packSize && !packer.HeaderFull() {
|
2021-08-07 20:52:05 +00:00
|
|
|
debug.Log("pack is not full enough (%d bytes)", packer.Size())
|
|
|
|
return size, nil
|
2016-03-06 11:26:25 +00:00
|
|
|
}
|
2023-09-28 18:58:45 +00:00
|
|
|
if packer == r.packer {
|
|
|
|
// forget full packer
|
|
|
|
r.packer = nil
|
|
|
|
}
|
2021-08-07 20:52:05 +00:00
|
|
|
|
2021-08-22 13:10:00 +00:00
|
|
|
// call while holding lock to prevent findPacker from creating new packers if the uploaders are busy
|
2021-08-07 20:52:05 +00:00
|
|
|
// else write the pack to the backend
|
|
|
|
err = r.queueFn(ctx, t, packer)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return size + packer.HeaderOverhead(), nil
|
2016-03-06 11:26:25 +00:00
|
|
|
}
|
|
|
|
|
2015-11-18 19:20:25 +00:00
|
|
|
// findPacker returns a packer for a new blob of size bytes. Either a new one is
|
|
|
|
// created or one is returned that already has some blobs.
|
2024-05-19 14:10:48 +00:00
|
|
|
func (r *packerManager) newPacker() (pck *packer, err error) {
|
2017-07-16 18:16:02 +00:00
|
|
|
debug.Log("create new pack")
|
2017-05-10 17:48:22 +00:00
|
|
|
tmpfile, err := fs.TempFile("", "restic-temp-pack-")
|
2016-03-06 11:26:25 +00:00
|
|
|
if err != nil {
|
2022-10-16 09:32:38 +00:00
|
|
|
return nil, errors.WithStack(err)
|
2016-03-06 11:26:25 +00:00
|
|
|
}
|
|
|
|
|
2021-08-22 13:10:00 +00:00
|
|
|
bufWr := bufio.NewWriter(tmpfile)
|
|
|
|
p := pack.NewPacker(r.key, bufWr)
|
2024-05-19 14:10:48 +00:00
|
|
|
pck = &packer{
|
2017-01-22 16:53:00 +00:00
|
|
|
Packer: p,
|
|
|
|
tmpfile: tmpfile,
|
2021-08-22 13:10:00 +00:00
|
|
|
bufWr: bufWr,
|
2017-01-22 16:53:00 +00:00
|
|
|
}
|
|
|
|
|
2024-05-19 14:10:48 +00:00
|
|
|
return pck, nil
|
2015-11-18 19:20:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// savePacker stores p in the backend.
|
2024-05-19 14:10:48 +00:00
|
|
|
func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *packer) error {
|
2017-07-16 18:24:37 +00:00
|
|
|
debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size())
|
2021-08-07 20:52:05 +00:00
|
|
|
err := p.Packer.Finalize()
|
2015-11-18 19:20:25 +00:00
|
|
|
if err != nil {
|
2021-08-07 20:52:05 +00:00
|
|
|
return err
|
2015-11-18 19:20:25 +00:00
|
|
|
}
|
2021-08-22 13:10:00 +00:00
|
|
|
err = p.bufWr.Flush()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// calculate sha256 hash in a second pass
|
|
|
|
var rd io.Reader
|
2023-10-01 09:40:12 +00:00
|
|
|
rd, err = backend.NewFileReader(p.tmpfile, nil)
|
2021-08-22 13:10:00 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
beHasher := r.be.Hasher()
|
|
|
|
var beHr *hashing.Reader
|
|
|
|
if beHasher != nil {
|
|
|
|
beHr = hashing.NewReader(rd, beHasher)
|
|
|
|
rd = beHr
|
|
|
|
}
|
2015-11-18 19:20:25 +00:00
|
|
|
|
2021-08-22 13:10:00 +00:00
|
|
|
hr := hashing.NewReader(rd, sha256.New())
|
2022-12-02 18:36:43 +00:00
|
|
|
_, err = io.Copy(io.Discard, hr)
|
2021-08-22 13:10:00 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
id := restic.IDFromHash(hr.Sum(nil))
|
2023-10-01 09:40:12 +00:00
|
|
|
h := backend.Handle{Type: backend.PackFile, Name: id.String(), IsMetadata: t.IsMetadata()}
|
2020-12-19 11:39:48 +00:00
|
|
|
var beHash []byte
|
2021-08-22 13:10:00 +00:00
|
|
|
if beHr != nil {
|
|
|
|
beHash = beHr.Sum(nil)
|
2020-12-19 11:39:48 +00:00
|
|
|
}
|
2023-10-01 09:40:12 +00:00
|
|
|
rrd, err := backend.NewFileReader(p.tmpfile, beHash)
|
2018-03-03 13:20:54 +00:00
|
|
|
if err != nil {
|
2021-08-07 20:52:05 +00:00
|
|
|
return err
|
2018-03-03 13:20:54 +00:00
|
|
|
}
|
|
|
|
|
2021-08-22 13:10:00 +00:00
|
|
|
err = r.be.Save(ctx, h, rrd)
|
2015-11-18 19:20:25 +00:00
|
|
|
if err != nil {
|
2016-09-27 20:35:08 +00:00
|
|
|
debug.Log("Save(%v) error: %v", h, err)
|
2021-08-07 20:52:05 +00:00
|
|
|
return err
|
2015-11-18 19:20:25 +00:00
|
|
|
}
|
|
|
|
|
2016-09-27 20:35:08 +00:00
|
|
|
debug.Log("saved as %v", h)
|
2015-11-18 19:20:25 +00:00
|
|
|
|
2017-01-23 17:45:15 +00:00
|
|
|
err = p.tmpfile.Close()
|
2017-01-22 16:53:00 +00:00
|
|
|
if err != nil {
|
2021-08-07 20:52:05 +00:00
|
|
|
return errors.Wrap(err, "close tempfile")
|
2017-01-22 16:53:00 +00:00
|
|
|
}
|
|
|
|
|
2015-11-18 19:20:25 +00:00
|
|
|
// update blobs in the index
|
2020-06-06 20:20:44 +00:00
|
|
|
debug.Log(" updating blobs %v to pack %v", p.Packer.Blobs(), id)
|
|
|
|
r.idx.StorePack(id, p.Packer.Blobs())
|
2015-11-18 19:20:25 +00:00
|
|
|
|
2020-06-06 20:20:44 +00:00
|
|
|
// Save index if full
|
2021-08-07 20:52:05 +00:00
|
|
|
return r.idx.SaveFullIndex(ctx, r)
|
2015-11-18 19:20:25 +00:00
|
|
|
}
|