package archiver import ( "context" "sync" "github.com/restic/restic/internal/restic" ) // Saver allows saving a blob. type Saver interface { SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID) (restic.ID, error) Index() restic.Index } // BlobSaver concurrently saves incoming blobs to the repo. type BlobSaver struct { repo Saver m sync.Mutex knownBlobs restic.BlobSet ch chan<- saveBlobJob wg sync.WaitGroup } // NewBlobSaver returns a new blob. A worker pool is started, it is stopped // when ctx is cancelled. func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { ch := make(chan saveBlobJob, 2*int(workers)) s := &BlobSaver{ repo: repo, knownBlobs: restic.NewBlobSet(), ch: ch, } for i := uint(0); i < workers; i++ { s.wg.Add(1) go s.worker(ctx, &s.wg, ch) } return s } // Save stores a blob in the repo. It checks the index and the known blobs // before saving anything. The second return parameter is true if the blob was // previously unknown. func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf Buffer) FutureBlob { ch := make(chan saveBlobResponse, 1) s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch} return FutureBlob{ch: ch, length: len(buf.Data)} } // FutureBlob is returned by SaveBlob and will return the data once it has been processed. type FutureBlob struct { ch <-chan saveBlobResponse length int res saveBlobResponse } func (s *FutureBlob) wait() { res, ok := <-s.ch if ok { s.res = res } } // ID returns the ID of the blob after it has been saved. func (s *FutureBlob) ID() restic.ID { s.wait() return s.res.id } // Known returns whether or not the blob was already known. func (s *FutureBlob) Known() bool { s.wait() return s.res.known } // Err returns the error which may have occurred during save. func (s *FutureBlob) Err() error { s.wait() return s.res.err } // Length returns the length of the blob. func (s *FutureBlob) Length() int { return s.length } type saveBlobJob struct { restic.BlobType buf Buffer ch chan<- saveBlobResponse } type saveBlobResponse struct { id restic.ID known bool err error } func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) saveBlobResponse { id := restic.Hash(buf) h := restic.BlobHandle{ID: id, Type: t} // check if another goroutine has already saved this blob known := false s.m.Lock() if s.knownBlobs.Has(h) { known = true } else { s.knownBlobs.Insert(h) known = false } s.m.Unlock() // blob is already known, nothing to do if known { return saveBlobResponse{ id: id, known: true, } } // check if the repo knows this blob if s.repo.Index().Has(id, t) { return saveBlobResponse{ id: id, known: true, } } // otherwise we're responsible for saving it _, err := s.repo.SaveBlob(ctx, t, buf, id) return saveBlobResponse{ id: id, known: false, err: err, } } func (s *BlobSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveBlobJob) { defer wg.Done() for { var job saveBlobJob select { case <-ctx.Done(): return case job = <-jobs: } job.ch <- s.saveBlob(ctx, job.BlobType, job.buf.Data) close(job.ch) job.buf.Release() } }