diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index b5e2cd3b3..5b3fd611a 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -172,7 +172,7 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID, // adds a newline after each object) buf = append(buf, '\n') - b := Buffer{Data: buf} + b := &Buffer{Data: buf} res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) if res.Err() != nil { return restic.ID{}, s, res.Err() diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 5e45d7175..1863d440e 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -45,7 +45,7 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { // Save stores a blob in the repo. It checks the index and the known blobs // before saving anything. The second return parameter is true if the blob was // previously unknown. -func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf Buffer) FutureBlob { +func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { ch := make(chan saveBlobResponse, 1) s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch} @@ -91,7 +91,7 @@ func (s *FutureBlob) Length() int { type saveBlobJob struct { restic.BlobType - buf Buffer + buf *Buffer ch chan<- saveBlobResponse } diff --git a/internal/archiver/buffer.go b/internal/archiver/buffer.go index c97d990cf..ef7131322 100644 --- a/internal/archiver/buffer.go +++ b/internal/archiver/buffer.go @@ -9,19 +9,19 @@ import ( // be called so the underlying slice is put back into the pool. type Buffer struct { Data []byte - Put func([]byte) + Put func(*Buffer) } // Release puts the buffer back into the pool it came from. -func (b Buffer) Release() { +func (b *Buffer) Release() { if b.Put != nil { - b.Put(b.Data) + b.Put(b) } } // BufferPool implements a limited set of reusable buffers. type BufferPool struct { - ch chan []byte + ch chan *Buffer chM sync.Mutex defaultSize int clearOnce sync.Once @@ -33,7 +33,7 @@ type BufferPool struct { // back. func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool { b := &BufferPool{ - ch: make(chan []byte, max), + ch: make(chan *Buffer, max), defaultSize: defaultSize, } go func() { @@ -44,22 +44,29 @@ func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool { } // Get returns a new buffer, either from the pool or newly allocated. -func (pool *BufferPool) Get() Buffer { - b := Buffer{Put: pool.put} - +func (pool *BufferPool) Get() *Buffer { pool.chM.Lock() defer pool.chM.Unlock() select { case buf := <-pool.ch: - b.Data = buf + return buf default: - b.Data = make([]byte, pool.defaultSize) + } + + b := &Buffer{ + Put: pool.Put, + Data: make([]byte, pool.defaultSize), } return b } -func (pool *BufferPool) put(b []byte) { +// Put returns a buffer to the pool for reuse. +func (pool *BufferPool) Put(b *Buffer) { + if cap(b.Data) > pool.defaultSize { + return + } + pool.chM.Lock() defer pool.chM.Unlock() select { @@ -68,14 +75,6 @@ func (pool *BufferPool) put(b []byte) { } } -// Put returns a buffer to the pool for reuse. -func (pool *BufferPool) Put(b Buffer) { - if cap(b.Data) > pool.defaultSize { - return - } - pool.put(b.Data) -} - // clear empties the buffer so that all items can be garbage collected. func (pool *BufferPool) clear() { pool.clearOnce.Do(func() { diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index bab2d66fd..276528564 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -61,20 +61,19 @@ type FileSaver struct { NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error) } -// NewFileSaver returns a new file saver. A worker pool with workers is +// NewFileSaver returns a new file saver. A worker pool with fileWorkers is // started, it is stopped when ctx is cancelled. func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { - ch := make(chan saveFileJob, fileWorkers) + ch := make(chan saveFileJob) - poolSize := fileWorkers - if blobWorkers > fileWorkers { - poolSize = blobWorkers - } + debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers) + + poolSize := fileWorkers + blobWorkers s := &FileSaver{ fs: fs, blobSaver: blobSaver, - saveFilePool: NewBufferPool(ctx, int(poolSize)*3/2, chunker.MaxSize/2), + saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize), pol: pol, ch: ch, @@ -156,6 +155,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat buf.Release() break } + buf.Data = chunk.Data size += uint64(chunk.Length)