2
2
mirror of https://github.com/octoleo/restic.git synced 2025-01-10 09:56:20 +00:00
restic/internal/archiver/file_saver.go

235 lines
4.9 KiB
Go
Raw Normal View History

2018-03-30 20:43:18 +00:00
package archiver
import (
"context"
"io"
"os"
"github.com/restic/chunker"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
2022-05-27 17:08:50 +00:00
"golang.org/x/sync/errgroup"
2018-03-30 20:43:18 +00:00
)
// SaveBlobFn saves a blob to a repo.
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob
2018-03-30 20:43:18 +00:00
// FileSaver concurrently saves incoming files to the repo.
type FileSaver struct {
saveFilePool *BufferPool
saveBlob SaveBlobFn
2018-03-30 20:43:18 +00:00
pol chunker.Pol
ch chan<- saveFileJob
2018-03-30 20:43:18 +00:00
CompleteBlob func(filename string, bytes uint64)
NodeFromFileInfo func(snPath, filename string, fi os.FileInfo) (*restic.Node, error)
2018-03-30 20:43:18 +00:00
}
2018-04-29 13:34:41 +00:00
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
2018-03-30 20:43:18 +00:00
// started, it is stopped when ctx is cancelled.
2022-05-27 17:08:50 +00:00
func NewFileSaver(ctx context.Context, wg *errgroup.Group, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
2018-04-29 13:34:41 +00:00
ch := make(chan saveFileJob)
2018-04-29 13:34:41 +00:00
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
poolSize := fileWorkers + blobWorkers
2018-03-30 20:43:18 +00:00
s := &FileSaver{
saveBlob: save,
saveFilePool: NewBufferPool(int(poolSize), chunker.MaxSize),
2018-03-30 20:43:18 +00:00
pol: pol,
ch: ch,
CompleteBlob: func(string, uint64) {},
}
for i := uint(0); i < fileWorkers; i++ {
2022-05-27 17:08:50 +00:00
wg.Go(func() error {
s.worker(ctx, ch)
return nil
})
2018-03-30 20:43:18 +00:00
}
return s
}
2022-05-27 17:08:50 +00:00
func (s *FileSaver) TriggerShutdown() {
close(s.ch)
}
2018-03-30 20:43:18 +00:00
// CompleteFunc is called when the file has been saved.
type CompleteFunc func(*restic.Node, ItemStats)
// Save stores the file f and returns the data once it has been completed. The
// file is closed by Save.
func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureNode {
fn, ch := newFutureNode()
job := saveFileJob{
2018-03-30 20:43:18 +00:00
snPath: snPath,
target: target,
2018-03-30 20:43:18 +00:00
file: file,
fi: fi,
start: start,
complete: complete,
ch: ch,
}
select {
case s.ch <- job:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
_ = file.Close()
close(ch)
}
return fn
2018-03-30 20:43:18 +00:00
}
type saveFileJob struct {
snPath string
target string
2018-03-30 20:43:18 +00:00
file fs.File
fi os.FileInfo
ch chan<- futureNodeResult
2018-03-30 20:43:18 +00:00
complete CompleteFunc
start func()
}
// saveFile stores the file f in the repo, then closes it.
func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult {
2018-03-30 20:43:18 +00:00
start()
stats := ItemStats{}
fnr := futureNodeResult{
snPath: snPath,
target: target,
}
2018-03-30 20:43:18 +00:00
debug.Log("%v", snPath)
node, err := s.NodeFromFileInfo(snPath, f.Name(), fi)
2018-03-30 20:43:18 +00:00
if err != nil {
_ = f.Close()
fnr.err = err
return fnr
2018-03-30 20:43:18 +00:00
}
if node.Type != "file" {
_ = f.Close()
fnr.err = errors.Errorf("node type %q is wrong", node.Type)
return fnr
2018-03-30 20:43:18 +00:00
}
// reuse the chunker
chnker.Reset(f, s.pol)
var results []FutureBlob
complete := func(sbr SaveBlobResponse) {
if !sbr.known {
stats.DataBlobs++
stats.DataSize += uint64(sbr.length)
stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
}
node.Content = append(node.Content, sbr.id)
}
2018-03-30 20:43:18 +00:00
node.Content = []restic.ID{}
var size uint64
for {
buf := s.saveFilePool.Get()
chunk, err := chnker.Next(buf.Data)
if err == io.EOF {
2018-03-30 20:43:18 +00:00
buf.Release()
break
}
2018-04-29 13:34:41 +00:00
2018-03-30 20:43:18 +00:00
buf.Data = chunk.Data
size += uint64(chunk.Length)
if err != nil {
_ = f.Close()
fnr.err = err
return fnr
2018-03-30 20:43:18 +00:00
}
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
_ = f.Close()
fnr.err = ctx.Err()
return fnr
2018-03-30 20:43:18 +00:00
}
res := s.saveBlob(ctx, restic.DataBlob, buf)
2018-03-30 20:43:18 +00:00
results = append(results, res)
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
_ = f.Close()
fnr.err = ctx.Err()
return fnr
2018-03-30 20:43:18 +00:00
}
s.CompleteBlob(f.Name(), uint64(len(chunk.Data)))
// collect already completed blobs
for len(results) > 0 {
sbr := results[0].Poll()
if sbr == nil {
break
}
results[0] = FutureBlob{}
results = results[1:]
complete(*sbr)
}
2018-03-30 20:43:18 +00:00
}
err = f.Close()
if err != nil {
fnr.err = err
return fnr
2018-03-30 20:43:18 +00:00
}
for i, res := range results {
results[i] = FutureBlob{}
sbr := res.Take(ctx)
complete(sbr)
2018-03-30 20:43:18 +00:00
}
node.Size = size
fnr.node = node
fnr.stats = stats
return fnr
2018-03-30 20:43:18 +00:00
}
func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
2018-03-30 20:43:18 +00:00
// a worker has one chunker which is reused for each file (because it contains a rather large buffer)
chnker := chunker.New(nil, s.pol)
for {
var job saveFileJob
2022-05-27 17:08:50 +00:00
var ok bool
2018-03-30 20:43:18 +00:00
select {
case <-ctx.Done():
return
2022-05-27 17:08:50 +00:00
case job, ok = <-jobs:
if !ok {
return
}
2018-03-30 20:43:18 +00:00
}
res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start)
2018-03-30 20:43:18 +00:00
if job.complete != nil {
job.complete(res.node, res.stats)
}
job.ch <- res
close(job.ch)
}
}