From 8c7a6daa476c004f44251c64f48cb9e357f33202 Mon Sep 17 00:00:00 2001 From: greatroar <61184462+greatroar@users.noreply.github.com> Date: Mon, 30 Sep 2024 17:24:05 +0200 Subject: [PATCH] dump: Simplify writeNode and use fewer goroutines This changes Dumper.writeNode to spawn loader goroutines as needed instead of as a pool. The code is shorter, fewer goroutines are spawned for small files, and crash dumps (also for unrelated errors) should be smaller. --- internal/dump/common.go | 90 +++++++++++++++-------------------------- 1 file changed, 32 insertions(+), 58 deletions(-) diff --git a/internal/dump/common.go b/internal/dump/common.go index 4bc404fe0..b4741302e 100644 --- a/internal/dump/common.go +++ b/internal/dump/common.go @@ -6,7 +6,6 @@ import ( "path" "github.com/restic/restic/internal/bloblru" - "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/walker" "golang.org/x/sync/errgroup" @@ -104,75 +103,50 @@ func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error { } func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error { - type loadTask struct { - id restic.ID - out chan<- []byte - } - type writeTask struct { - data <-chan []byte - } - - loaderCh := make(chan loadTask) - // per worker: allows for one blob that gets download + one blob thats queue for writing - writerCh := make(chan writeTask, d.repo.Connections()*2) - wg, ctx := errgroup.WithContext(ctx) + limit := d.repo.Connections() - 1 // See below for the -1. + blobs := make(chan (<-chan []byte), limit) wg.Go(func() error { - defer close(loaderCh) - defer close(writerCh) - for _, id := range node.Content { - // non-blocking blob handover to allow the loader to load the next blob - // while the old one is still written - ch := make(chan []byte, 1) + for ch := range blobs { select { - case loaderCh <- loadTask{id: id, out: ch}: case <-ctx.Done(): return ctx.Err() - } - - select { - case writerCh <- writeTask{data: ch}: - case <-ctx.Done(): - return ctx.Err() - } - } - return nil - }) - - for i := uint(0); i < d.repo.Connections(); i++ { - wg.Go(func() error { - for task := range loaderCh { - blob, err := d.cache.GetOrCompute(task.id, func() ([]byte, error) { - return d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil) - }) - if err != nil { + case blob := <-ch: + if _, err := w.Write(blob); err != nil { return err } - - select { - case task.out <- blob: - case <-ctx.Done(): - return ctx.Err() - } - } - return nil - }) - } - - wg.Go(func() error { - for result := range writerCh { - select { - case data := <-result.data: - if _, err := w.Write(data); err != nil { - return errors.Wrap(err, "Write") - } - case <-ctx.Done(): - return ctx.Err() } } return nil }) + // Start short-lived goroutines to load blobs. + // There will be at most 1+cap(blobs) calling LoadBlob at any moment. +loop: + for _, id := range node.Content { + // This needs to be buffered, so that loaders can quit + // without waiting for the writer. + ch := make(chan []byte, 1) + + wg.Go(func() error { + blob, err := d.cache.GetOrCompute(id, func() ([]byte, error) { + return d.repo.LoadBlob(ctx, restic.DataBlob, id, nil) + }) + + if err == nil { + ch <- blob + } + return err + }) + + select { + case blobs <- ch: + case <-ctx.Done(): + break loop + } + } + + close(blobs) return wg.Wait() }