restic/internal/archiver/blob_saver.go

139 lines
2.9 KiB
Go
Raw Normal View History

2018-03-30 20:43:18 +00:00
package archiver
import (
"context"
"github.com/restic/restic/internal/debug"
2018-03-30 20:43:18 +00:00
"github.com/restic/restic/internal/restic"
2022-05-27 17:08:50 +00:00
"golang.org/x/sync/errgroup"
2018-03-30 20:43:18 +00:00
)
// Saver allows saving a blob.
type Saver interface {
2022-05-01 12:26:57 +00:00
SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error)
2018-03-30 20:43:18 +00:00
}
// BlobSaver concurrently saves incoming blobs to the repo.
type BlobSaver struct {
repo Saver
ch chan<- saveBlobJob
2018-03-30 20:43:18 +00:00
}
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
// when ctx is cancelled.
2022-05-27 17:08:50 +00:00
func NewBlobSaver(ctx context.Context, wg *errgroup.Group, repo Saver, workers uint) *BlobSaver {
2018-04-30 13:13:03 +00:00
ch := make(chan saveBlobJob)
2018-03-30 20:43:18 +00:00
s := &BlobSaver{
repo: repo,
ch: ch,
2018-03-30 20:43:18 +00:00
}
for i := uint(0); i < workers; i++ {
2022-05-27 17:08:50 +00:00
wg.Go(func() error {
return s.worker(ctx, ch)
})
2018-03-30 20:43:18 +00:00
}
return s
}
2022-05-27 17:08:50 +00:00
func (s *BlobSaver) TriggerShutdown() {
close(s.ch)
}
2018-03-30 20:43:18 +00:00
// Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. It takes ownership of the buffer passed in.
2018-04-29 13:34:41 +00:00
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan SaveBlobResponse, 1)
select {
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled")
close(ch)
return FutureBlob{ch: ch}
}
2018-03-30 20:43:18 +00:00
return FutureBlob{ch: ch}
2018-03-30 20:43:18 +00:00
}
// FutureBlob is returned by SaveBlob and will return the data once it has been processed.
type FutureBlob struct {
ch <-chan SaveBlobResponse
2018-03-30 20:43:18 +00:00
}
func (s *FutureBlob) Poll() *SaveBlobResponse {
select {
case res, ok := <-s.ch:
if ok {
return &res
}
default:
2018-03-30 20:43:18 +00:00
}
return nil
2018-03-30 20:43:18 +00:00
}
// Take blocks until the result is available or the context is cancelled.
func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse {
select {
case res, ok := <-s.ch:
if ok {
return res
}
case <-ctx.Done():
}
return SaveBlobResponse{}
}
2018-03-30 20:43:18 +00:00
type saveBlobJob struct {
restic.BlobType
2018-04-29 13:34:41 +00:00
buf *Buffer
ch chan<- SaveBlobResponse
2018-03-30 20:43:18 +00:00
}
type SaveBlobResponse struct {
id restic.ID
length int
sizeInRepo int
known bool
2018-03-30 20:43:18 +00:00
}
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (SaveBlobResponse, error) {
id, known, sizeInRepo, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false)
2018-03-30 20:43:18 +00:00
if err != nil {
return SaveBlobResponse{}, err
}
return SaveBlobResponse{
id: id,
length: len(buf),
sizeInRepo: sizeInRepo,
known: known,
}, nil
2018-03-30 20:43:18 +00:00
}
func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
2018-03-30 20:43:18 +00:00
for {
var job saveBlobJob
2022-05-27 17:08:50 +00:00
var ok bool
2018-03-30 20:43:18 +00:00
select {
case <-ctx.Done():
return nil
2022-05-27 17:08:50 +00:00
case job, ok = <-jobs:
if !ok {
return nil
}
2018-03-30 20:43:18 +00:00
}
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
if err != nil {
debug.Log("saveBlob returned error, exiting: %v", err)
close(job.ch)
return err
}
job.ch <- res
2018-03-30 20:43:18 +00:00
close(job.ch)
job.buf.Release()
}
}