2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-27 15:26:37 +00:00
restic/internal/archiver/tree_saver.go

180 lines
4.0 KiB
Go
Raw Normal View History

2018-04-30 13:13:03 +00:00
package archiver
import (
"context"
"errors"
2018-04-30 13:13:03 +00:00
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
2022-05-27 17:08:50 +00:00
"golang.org/x/sync/errgroup"
2018-04-30 13:13:03 +00:00
)
// TreeSaver concurrently saves incoming trees to the repo.
type TreeSaver struct {
saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse))
2018-04-30 13:13:03 +00:00
errFn ErrorFunc
ch chan<- saveTreeJob
2018-04-30 13:13:03 +00:00
}
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
// started, it is stopped when ctx is cancelled.
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)), errFn ErrorFunc) *TreeSaver {
2018-04-30 13:13:03 +00:00
ch := make(chan saveTreeJob)
s := &TreeSaver{
ch: ch,
saveBlob: saveBlob,
2018-04-30 13:13:03 +00:00
errFn: errFn,
}
for i := uint(0); i < treeWorkers; i++ {
2022-05-27 17:08:50 +00:00
wg.Go(func() error {
return s.worker(ctx, ch)
})
2018-04-30 13:13:03 +00:00
}
return s
}
2022-05-27 17:08:50 +00:00
func (s *TreeSaver) TriggerShutdown() {
close(s.ch)
}
2018-04-30 13:13:03 +00:00
// Save stores the dir d and returns the data once it has been completed.
func (s *TreeSaver) Save(ctx context.Context, snPath string, target string, node *restic.Node, nodes []FutureNode, complete CompleteFunc) FutureNode {
fn, ch := newFutureNode()
job := saveTreeJob{
snPath: snPath,
target: target,
node: node,
nodes: nodes,
ch: ch,
complete: complete,
2018-04-30 13:13:03 +00:00
}
select {
case s.ch <- job:
case <-ctx.Done():
debug.Log("not saving tree, context is cancelled")
close(ch)
}
2018-04-30 13:13:03 +00:00
return fn
2018-04-30 13:13:03 +00:00
}
type saveTreeJob struct {
snPath string
target string
node *restic.Node
nodes []FutureNode
ch chan<- futureNodeResult
complete CompleteFunc
2018-04-30 13:13:03 +00:00
}
// save stores the nodes as a tree in the repo.
func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, ItemStats, error) {
2018-04-30 13:13:03 +00:00
var stats ItemStats
node := job.node
nodes := job.nodes
// allow GC of nodes array once the loop is finished
job.nodes = nil
2018-04-30 13:13:03 +00:00
builder := restic.NewTreeJSONBuilder()
var lastNode *restic.Node
for i, fn := range nodes {
// fn is a copy, so clear the original value explicitly
nodes[i] = FutureNode{}
fnr := fn.take(ctx)
2018-04-30 13:13:03 +00:00
// return the error if it wasn't ignored
if fnr.err != nil {
debug.Log("err for %v: %v", fnr.snPath, fnr.err)
fnr.err = s.errFn(fnr.target, fnr.err)
if fnr.err == nil {
2018-04-30 13:13:03 +00:00
// ignore error
continue
}
return nil, stats, fnr.err
2018-04-30 13:13:03 +00:00
}
// when the error is ignored, the node could not be saved, so ignore it
if fnr.node == nil {
debug.Log("%v excluded: %v", fnr.snPath, fnr.target)
2018-04-30 13:13:03 +00:00
continue
}
err := builder.AddNode(fnr.node)
if err != nil && errors.Is(err, restic.ErrTreeNotOrdered) && lastNode != nil && fnr.node.Equals(*lastNode) {
2023-04-23 09:38:06 +00:00
debug.Log("insert %v failed: %v", fnr.node.Name, err)
// ignore error if an _identical_ node already exists, but nevertheless issue a warning
_ = s.errFn(fnr.target, err)
err = nil
}
2018-04-30 13:13:03 +00:00
if err != nil {
2023-04-23 09:38:06 +00:00
debug.Log("insert %v failed: %v", fnr.node.Name, err)
2018-04-30 13:13:03 +00:00
return nil, stats, err
}
lastNode = fnr.node
2018-04-30 13:13:03 +00:00
}
buf, err := builder.Finalize()
2018-04-30 13:13:03 +00:00
if err != nil {
return nil, stats, err
}
b := &Buffer{Data: buf}
ch := make(chan SaveBlobResponse, 1)
s.saveBlob(ctx, restic.TreeBlob, b, func(res SaveBlobResponse) {
ch <- res
})
select {
case sbr := <-ch:
if !sbr.known {
stats.TreeBlobs++
stats.TreeSize += uint64(sbr.length)
stats.TreeSizeInRepo += uint64(sbr.sizeInRepo)
}
node.Subtree = &sbr.id
return node, stats, nil
case <-ctx.Done():
return nil, stats, ctx.Err()
}
2018-04-30 13:13:03 +00:00
}
func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error {
2018-04-30 13:13:03 +00:00
for {
var job saveTreeJob
2022-05-27 17:08:50 +00:00
var ok bool
2018-04-30 13:13:03 +00:00
select {
case <-ctx.Done():
return nil
2022-05-27 17:08:50 +00:00
case job, ok = <-jobs:
if !ok {
return nil
}
2018-04-30 13:13:03 +00:00
}
node, stats, err := s.save(ctx, &job)
if err != nil {
debug.Log("error saving tree blob: %v", err)
close(job.ch)
return err
}
if job.complete != nil {
job.complete(node, stats)
}
job.ch <- futureNodeResult{
snPath: job.snPath,
target: job.target,
node: node,
stats: stats,
2018-04-30 13:13:03 +00:00
}
close(job.ch)
}
}