diff --git a/lib/protocol/bufferpool.go b/lib/protocol/bufferpool.go index 17ad2f386..20c330777 100644 --- a/lib/protocol/bufferpool.go +++ b/lib/protocol/bufferpool.go @@ -2,56 +2,71 @@ package protocol -import "sync" +import ( + "fmt" + "sync" + "sync/atomic" +) // Global pool to get buffers from. Requires Blocksizes to be initialised, // therefore it is initialized in the same init() as BlockSizes var BufferPool bufferPool type bufferPool struct { - pools []sync.Pool + puts int64 + skips int64 + misses int64 + pools []sync.Pool + hits []int64 // start of slice allocation is always aligned } func newBufferPool() bufferPool { - return bufferPool{make([]sync.Pool, len(BlockSizes))} + return bufferPool{ + pools: make([]sync.Pool, len(BlockSizes)), + hits: make([]int64, len(BlockSizes)), + } } func (p *bufferPool) Get(size int) []byte { // Too big, isn't pooled if size > MaxBlockSize { + atomic.AddInt64(&p.skips, 1) return make([]byte, size) } - var i int - for i = range BlockSizes { - if size <= BlockSizes[i] { - break - } - } - var bs []byte + // Try the fitting and all bigger pools - for j := i; j < len(BlockSizes); j++ { + bkt := getBucketForLen(size) + for j := bkt; j < len(BlockSizes); j++ { if intf := p.pools[j].Get(); intf != nil { - bs = *intf.(*[]byte) + atomic.AddInt64(&p.hits[j], 1) + bs := *intf.(*[]byte) return bs[:size] } } - // All pools are empty, must allocate. - return make([]byte, BlockSizes[i])[:size] + + atomic.AddInt64(&p.misses, 1) + + // All pools are empty, must allocate. For very small slices where we + // didn't have a block to reuse, just allocate a small slice instead of + // a large one. We won't be able to reuse it, but avoid some overhead. + if size < MinBlockSize/64 { + return make([]byte, size) + } + return make([]byte, BlockSizes[bkt])[:size] } -// Put makes the given byte slice availabe again in the global pool +// Put makes the given byte slice available again in the global pool. +// You must only Put() slices that were returned by Get() or Upgrade(). func (p *bufferPool) Put(bs []byte) { - c := cap(bs) - // Don't buffer huge byte slices - if c > 2*MaxBlockSize { + // Don't buffer slices outside of our pool range + if cap(bs) > MaxBlockSize || cap(bs) < MinBlockSize { + atomic.AddInt64(&p.skips, 1) return } - for i := range BlockSizes { - if c >= BlockSizes[i] { - p.pools[i].Put(&bs) - return - } - } + + atomic.AddInt64(&p.puts, 1) + bkt := putBucketForCap(cap(bs)) + p.pools[bkt].Put(&bs) } // Upgrade grows the buffer to the requested size, while attempting to reuse @@ -67,3 +82,31 @@ func (p *bufferPool) Upgrade(bs []byte, size int) []byte { p.Put(bs) return p.Get(size) } + +// getBucketForLen returns the bucket where we should get a slice of a +// certain length. Each bucket is guaranteed to hold slices that are +// precisely the block size for that bucket, so if the block size is larger +// than our size we are good. +func getBucketForLen(len int) int { + for i, blockSize := range BlockSizes { + if len <= blockSize { + return i + } + } + + panic(fmt.Sprintf("bug: tried to get impossible block len %d", len)) +} + +// putBucketForCap returns the bucket where we should put a slice of a +// certain capacity. Each bucket is guaranteed to hold slices that are +// precisely the block size for that bucket, so we just find the matching +// one. +func putBucketForCap(cap int) int { + for i, blockSize := range BlockSizes { + if cap == blockSize { + return i + } + } + + panic(fmt.Sprintf("bug: tried to put impossible block cap %d", cap)) +} diff --git a/lib/protocol/bufferpool_test.go b/lib/protocol/bufferpool_test.go new file mode 100644 index 000000000..ab889ba88 --- /dev/null +++ b/lib/protocol/bufferpool_test.go @@ -0,0 +1,132 @@ +// Copyright (C) 2019 The Protocol Authors. + +package protocol + +import ( + "sync" + "testing" + "time" + + "github.com/syncthing/syncthing/lib/rand" +) + +func TestGetBucketNumbers(t *testing.T) { + cases := []struct { + size int + bkt int + panics bool + }{ + {size: 1024, bkt: 0}, + {size: MinBlockSize, bkt: 0}, + {size: MinBlockSize + 1, bkt: 1}, + {size: 2*MinBlockSize - 1, bkt: 1}, + {size: 2 * MinBlockSize, bkt: 1}, + {size: 2*MinBlockSize + 1, bkt: 2}, + {size: MaxBlockSize, bkt: len(BlockSizes) - 1}, + {size: MaxBlockSize + 1, panics: true}, + } + + for _, tc := range cases { + if tc.panics { + shouldPanic(t, func() { getBucketForLen(tc.size) }) + } else { + res := getBucketForLen(tc.size) + if res != tc.bkt { + t.Errorf("block of size %d should get from bucket %d, not %d", tc.size, tc.bkt, res) + } + } + } +} + +func TestPutBucketNumbers(t *testing.T) { + cases := []struct { + size int + bkt int + panics bool + }{ + {size: 1024, panics: true}, + {size: MinBlockSize, bkt: 0}, + {size: MinBlockSize + 1, panics: true}, + {size: 2 * MinBlockSize, bkt: 1}, + {size: MaxBlockSize, bkt: len(BlockSizes) - 1}, + {size: MaxBlockSize + 1, panics: true}, + } + + for _, tc := range cases { + if tc.panics { + shouldPanic(t, func() { putBucketForCap(tc.size) }) + } else { + res := putBucketForCap(tc.size) + if res != tc.bkt { + t.Errorf("block of size %d should put into bucket %d, not %d", tc.size, tc.bkt, res) + } + } + } +} + +func TestStressBufferPool(t *testing.T) { + if testing.Short() { + t.Skip() + } + + const routines = 10 + const runtime = 2 * time.Second + + bp := newBufferPool() + t0 := time.Now() + + var wg sync.WaitGroup + fail := make(chan struct{}, routines) + for i := 0; i < routines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for time.Since(t0) < runtime { + blocks := make([][]byte, 10) + for i := range blocks { + // Request a block of random size with the range + // covering smaller-than-min to larger-than-max and + // everything in between. + want := rand.Intn(1.5 * MaxBlockSize) + blocks[i] = bp.Get(want) + if len(blocks[i]) != want { + fail <- struct{}{} + return + } + } + for i := range blocks { + bp.Put(blocks[i]) + } + } + }() + } + + wg.Wait() + select { + case <-fail: + t.Fatal("a block was bad size") + default: + } + + t.Log(bp.puts, bp.skips, bp.misses, bp.hits) + if bp.puts == 0 || bp.skips == 0 || bp.misses == 0 { + t.Error("didn't exercise some paths") + } + var hits int64 + for _, h := range bp.hits { + hits += h + } + if hits == 0 { + t.Error("didn't exercise some paths") + } +} + +func shouldPanic(t *testing.T, fn func()) { + defer func() { + if r := recover(); r == nil { + t.Errorf("did not panic") + } + }() + + fn() +}