Update goleveldb

This commit is contained in:
Jakob Borg 2014-08-14 12:14:48 +02:00
parent 7555fe065e
commit f80f5b3bda
9 changed files with 591 additions and 49 deletions

4
Godeps/Godeps.json generated
View File

@ -1,6 +1,6 @@
{ {
"ImportPath": "github.com/syncthing/syncthing", "ImportPath": "github.com/syncthing/syncthing",
"GoVersion": "go1.3", "GoVersion": "go1.3.1",
"Packages": [ "Packages": [
"./cmd/..." "./cmd/..."
], ],
@ -49,7 +49,7 @@
}, },
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "c9d6b7be1428942d4cf4f54055b991a8513392eb" "Rev": "6f6f5d93f7499d2c505c2839c1d6b28b25a2ce21"
}, },
{ {
"ImportPath": "github.com/vitrun/qart/coding", "ImportPath": "github.com/vitrun/qart/coding",

View File

@ -13,6 +13,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"sync/atomic"
"testing" "testing"
"github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/iterator"
@ -170,7 +171,7 @@ func (p *dbBench) writes(perBatch int) {
b.SetBytes(116) b.SetBytes(116)
} }
func (p *dbBench) drop() { func (p *dbBench) gc() {
p.keys, p.values = nil, nil p.keys, p.values = nil, nil
runtime.GC() runtime.GC()
} }
@ -249,6 +250,7 @@ func (p *dbBench) newIter() iterator.Iterator {
} }
func (p *dbBench) close() { func (p *dbBench) close() {
p.b.Log(p.db.s.tops.bpool)
p.db.Close() p.db.Close()
p.stor.Close() p.stor.Close()
os.RemoveAll(benchDB) os.RemoveAll(benchDB)
@ -331,7 +333,7 @@ func BenchmarkDBRead(b *testing.B) {
p := openDBBench(b, false) p := openDBBench(b, false)
p.populate(b.N) p.populate(b.N)
p.fill() p.fill()
p.drop() p.gc()
iter := p.newIter() iter := p.newIter()
b.ResetTimer() b.ResetTimer()
@ -343,6 +345,50 @@ func BenchmarkDBRead(b *testing.B) {
p.close() p.close()
} }
func BenchmarkDBReadConcurrent(b *testing.B) {
p := openDBBench(b, false)
p.populate(b.N)
p.fill()
p.gc()
defer p.close()
b.ResetTimer()
b.SetBytes(116)
b.RunParallel(func(pb *testing.PB) {
iter := p.newIter()
defer iter.Release()
for pb.Next() && iter.Next() {
}
})
}
func BenchmarkDBReadConcurrent2(b *testing.B) {
p := openDBBench(b, false)
p.populate(b.N)
p.fill()
p.gc()
defer p.close()
b.ResetTimer()
b.SetBytes(116)
var dir uint32
b.RunParallel(func(pb *testing.PB) {
iter := p.newIter()
defer iter.Release()
if atomic.AddUint32(&dir, 1)%2 == 0 {
for pb.Next() && iter.Next() {
}
} else {
if pb.Next() && iter.Last() {
for pb.Next() && iter.Prev() {
}
}
}
})
}
func BenchmarkDBReadGC(b *testing.B) { func BenchmarkDBReadGC(b *testing.B) {
p := openDBBench(b, false) p := openDBBench(b, false)
p.populate(b.N) p.populate(b.N)
@ -362,7 +408,7 @@ func BenchmarkDBReadUncompressed(b *testing.B) {
p := openDBBench(b, true) p := openDBBench(b, true)
p.populate(b.N) p.populate(b.N)
p.fill() p.fill()
p.drop() p.gc()
iter := p.newIter() iter := p.newIter()
b.ResetTimer() b.ResetTimer()
@ -379,7 +425,7 @@ func BenchmarkDBReadTable(b *testing.B) {
p.populate(b.N) p.populate(b.N)
p.fill() p.fill()
p.reopen() p.reopen()
p.drop() p.gc()
iter := p.newIter() iter := p.newIter()
b.ResetTimer() b.ResetTimer()
@ -395,7 +441,7 @@ func BenchmarkDBReadReverse(b *testing.B) {
p := openDBBench(b, false) p := openDBBench(b, false)
p.populate(b.N) p.populate(b.N)
p.fill() p.fill()
p.drop() p.gc()
iter := p.newIter() iter := p.newIter()
b.ResetTimer() b.ResetTimer()
@ -413,7 +459,7 @@ func BenchmarkDBReadReverseTable(b *testing.B) {
p.populate(b.N) p.populate(b.N)
p.fill() p.fill()
p.reopen() p.reopen()
p.drop() p.gc()
iter := p.newIter() iter := p.newIter()
b.ResetTimer() b.ResetTimer()

View File

@ -257,6 +257,7 @@ func recoverTable(s *session, o *opt.Options) error {
var mSeq uint64 var mSeq uint64
var good, corrupted int var good, corrupted int
rec := new(sessionRecord) rec := new(sessionRecord)
bpool := util.NewBufferPool(o.GetBlockSize() + 5)
buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) { buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) {
tmp = s.newTemp() tmp = s.newTemp()
writer, err := tmp.Create() writer, err := tmp.Create()
@ -314,7 +315,7 @@ func recoverTable(s *session, o *opt.Options) error {
var tSeq uint64 var tSeq uint64
var tgood, tcorrupted, blockerr int var tgood, tcorrupted, blockerr int
var imin, imax []byte var imin, imax []byte
tr := table.NewReader(reader, size, nil, o) tr := table.NewReader(reader, size, nil, bpool, o)
iter := tr.NewIterator(nil, nil) iter := tr.NewIterator(nil, nil)
iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) { iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) {
s.logf("table@recovery found error @%d %q", file.Num(), err) s.logf("table@recovery found error @%d %q", file.Num(), err)

View File

@ -110,7 +110,7 @@ type ErrCorrupted struct {
} }
func (e ErrCorrupted) Error() string { func (e ErrCorrupted) Error() string {
return fmt.Sprintf("leveldb/journal: corrupted %d bytes: %s", e.Size, e.Reason) return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
} }
// Dropper is the interface that wrap simple Drop method. The Drop // Dropper is the interface that wrap simple Drop method. The Drop

View File

@ -12,6 +12,7 @@ package journal
import ( import (
"bytes" "bytes"
"encoding/binary"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -380,6 +381,94 @@ func TestCorrupt_MissingLastBlock(t *testing.T) {
if err != io.ErrUnexpectedEOF { if err != io.ErrUnexpectedEOF {
t.Fatalf("read #1: unexpected error: %v", err) t.Fatalf("read #1: unexpected error: %v", err)
} }
if _, err := r.Next(); err != io.EOF {
t.Fatalf("last next: unexpected error: %v", err)
}
}
func TestCorrupt_CorruptedFirstBlock(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(buf)
// First record.
ww, err := w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil {
t.Fatalf("write #0: unexpected error: %v", err)
}
// Second record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil {
t.Fatalf("write #1: unexpected error: %v", err)
}
// Third record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil {
t.Fatalf("write #2: unexpected error: %v", err)
}
// Fourth record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+2)); err != nil {
t.Fatalf("write #3: unexpected error: %v", err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
b := buf.Bytes()
// Corrupting block #0.
for i := 0; i < 1024; i++ {
b[i] = '1'
}
r := NewReader(bytes.NewReader(b), dropper{t}, false, true)
// First read (third record).
rr, err := r.Next()
if err != nil {
t.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #0: %v", err)
}
if want := int64(blockSize-headerSize) + 1; n != want {
t.Fatalf("read #0: got %d bytes want %d", n, want)
}
// Second read (fourth record).
rr, err = r.Next()
if err != nil {
t.Fatal(err)
}
n, err = io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #1: %v", err)
}
if want := int64(blockSize-headerSize) + 2; n != want {
t.Fatalf("read #1: got %d bytes want %d", n, want)
}
if _, err := r.Next(); err != io.EOF {
t.Fatalf("last next: unexpected error: %v", err)
}
} }
func TestCorrupt_CorruptedMiddleBlock(t *testing.T) { func TestCorrupt_CorruptedMiddleBlock(t *testing.T) {
@ -435,7 +524,7 @@ func TestCorrupt_CorruptedMiddleBlock(t *testing.T) {
r := NewReader(bytes.NewReader(b), dropper{t}, false, true) r := NewReader(bytes.NewReader(b), dropper{t}, false, true)
// First read. // First read (first record).
rr, err := r.Next() rr, err := r.Next()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -448,7 +537,7 @@ func TestCorrupt_CorruptedMiddleBlock(t *testing.T) {
t.Fatalf("read #0: got %d bytes want %d", n, want) t.Fatalf("read #0: got %d bytes want %d", n, want)
} }
// Second read. // Second read (second record).
rr, err = r.Next() rr, err = r.Next()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -458,7 +547,7 @@ func TestCorrupt_CorruptedMiddleBlock(t *testing.T) {
t.Fatalf("read #1: unexpected error: %v", err) t.Fatalf("read #1: unexpected error: %v", err)
} }
// Third read. // Third read (fourth record).
rr, err = r.Next() rr, err = r.Next()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -470,4 +559,260 @@ func TestCorrupt_CorruptedMiddleBlock(t *testing.T) {
if want := int64(blockSize-headerSize) + 2; n != want { if want := int64(blockSize-headerSize) + 2; n != want {
t.Fatalf("read #2: got %d bytes want %d", n, want) t.Fatalf("read #2: got %d bytes want %d", n, want)
} }
if _, err := r.Next(); err != io.EOF {
t.Fatalf("last next: unexpected error: %v", err)
}
}
func TestCorrupt_CorruptedLastBlock(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(buf)
// First record.
ww, err := w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil {
t.Fatalf("write #0: unexpected error: %v", err)
}
// Second record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil {
t.Fatalf("write #1: unexpected error: %v", err)
}
// Third record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil {
t.Fatalf("write #2: unexpected error: %v", err)
}
// Fourth record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+2)); err != nil {
t.Fatalf("write #3: unexpected error: %v", err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
b := buf.Bytes()
// Corrupting block #3.
for i := len(b) - 1; i > len(b)-1024; i-- {
b[i] = '1'
}
r := NewReader(bytes.NewReader(b), dropper{t}, false, true)
// First read (first record).
rr, err := r.Next()
if err != nil {
t.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #0: %v", err)
}
if want := int64(blockSize / 2); n != want {
t.Fatalf("read #0: got %d bytes want %d", n, want)
}
// Second read (second record).
rr, err = r.Next()
if err != nil {
t.Fatal(err)
}
n, err = io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #1: %v", err)
}
if want := int64(blockSize - headerSize); n != want {
t.Fatalf("read #1: got %d bytes want %d", n, want)
}
// Third read (third record).
rr, err = r.Next()
if err != nil {
t.Fatal(err)
}
n, err = io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #2: %v", err)
}
if want := int64(blockSize-headerSize) + 1; n != want {
t.Fatalf("read #2: got %d bytes want %d", n, want)
}
// Fourth read (fourth record).
rr, err = r.Next()
if err != nil {
t.Fatal(err)
}
n, err = io.Copy(ioutil.Discard, rr)
if err != io.ErrUnexpectedEOF {
t.Fatalf("read #3: unexpected error: %v", err)
}
if _, err := r.Next(); err != io.EOF {
t.Fatalf("last next: unexpected error: %v", err)
}
}
func TestCorrupt_FirstChuckLengthOverflow(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(buf)
// First record.
ww, err := w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil {
t.Fatalf("write #0: unexpected error: %v", err)
}
// Second record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil {
t.Fatalf("write #1: unexpected error: %v", err)
}
// Third record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil {
t.Fatalf("write #2: unexpected error: %v", err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
b := buf.Bytes()
// Corrupting record #1.
x := blockSize
binary.LittleEndian.PutUint16(b[x+4:], 0xffff)
r := NewReader(bytes.NewReader(b), dropper{t}, false, true)
// First read (first record).
rr, err := r.Next()
if err != nil {
t.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #0: %v", err)
}
if want := int64(blockSize / 2); n != want {
t.Fatalf("read #0: got %d bytes want %d", n, want)
}
// Second read (second record).
rr, err = r.Next()
if err != nil {
t.Fatal(err)
}
n, err = io.Copy(ioutil.Discard, rr)
if err != io.ErrUnexpectedEOF {
t.Fatalf("read #1: unexpected error: %v", err)
}
if _, err := r.Next(); err != io.EOF {
t.Fatalf("last next: unexpected error: %v", err)
}
}
func TestCorrupt_MiddleChuckLengthOverflow(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(buf)
// First record.
ww, err := w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil {
t.Fatalf("write #0: unexpected error: %v", err)
}
// Second record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil {
t.Fatalf("write #1: unexpected error: %v", err)
}
// Third record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil {
t.Fatalf("write #2: unexpected error: %v", err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
b := buf.Bytes()
// Corrupting record #1.
x := blockSize/2 + headerSize
binary.LittleEndian.PutUint16(b[x+4:], 0xffff)
r := NewReader(bytes.NewReader(b), dropper{t}, false, true)
// First read (first record).
rr, err := r.Next()
if err != nil {
t.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #0: %v", err)
}
if want := int64(blockSize / 2); n != want {
t.Fatalf("read #0: got %d bytes want %d", n, want)
}
// Second read (third record).
rr, err = r.Next()
if err != nil {
t.Fatal(err)
}
n, err = io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #1: %v", err)
}
if want := int64(blockSize-headerSize) + 1; n != want {
t.Fatalf("read #1: got %d bytes want %d", n, want)
}
if _, err := r.Next(); err != io.EOF {
t.Fatalf("last next: unexpected error: %v", err)
}
} }

View File

@ -275,6 +275,7 @@ type tOps struct {
s *session s *session
cache cache.Cache cache cache.Cache
cacheNS cache.Namespace cacheNS cache.Namespace
bpool *util.BufferPool
} }
// Creates an empty table and returns table writer. // Creates an empty table and returns table writer.
@ -340,7 +341,7 @@ func (t *tOps) open(f *tFile) (c cache.Object, err error) {
} }
ok = true ok = true
value = table.NewReader(r, int64(f.size), cacheNS, o) value = table.NewReader(r, int64(f.size), cacheNS, t.bpool, o)
charge = 1 charge = 1
fin = func() { fin = func() {
r.Close() r.Close()
@ -412,8 +413,12 @@ func (t *tOps) close() {
// Creates new initialized table ops instance. // Creates new initialized table ops instance.
func newTableOps(s *session, cacheCap int) *tOps { func newTableOps(s *session, cacheCap int) *tOps {
c := cache.NewLRUCache(cacheCap) c := cache.NewLRUCache(cacheCap)
ns := c.GetNamespace(0) return &tOps{
return &tOps{s, c, ns} s: s,
cache: c,
cacheNS: c.GetNamespace(0),
bpool: util.NewBufferPool(s.o.GetBlockSize() + 5),
}
} }
// tWriter wraps the table writer. It keep track of file descriptor // tWriter wraps the table writer. It keep track of file descriptor

View File

@ -13,7 +13,6 @@ import (
"io" "io"
"sort" "sort"
"strings" "strings"
"sync"
"code.google.com/p/snappy-go/snappy" "code.google.com/p/snappy-go/snappy"
@ -438,6 +437,7 @@ func (i *blockIter) Value() []byte {
} }
func (i *blockIter) Release() { func (i *blockIter) Release() {
if i.dir > dirReleased {
i.prevNode = nil i.prevNode = nil
i.prevKeys = nil i.prevKeys = nil
i.key = nil i.key = nil
@ -452,6 +452,7 @@ func (i *blockIter) Release() {
i.releaser = nil i.releaser = nil
} }
} }
}
func (i *blockIter) SetReleaser(releaser util.Releaser) { func (i *blockIter) SetReleaser(releaser util.Releaser) {
if i.dir > dirReleased { if i.dir > dirReleased {
@ -520,6 +521,7 @@ type Reader struct {
reader io.ReaderAt reader io.ReaderAt
cache cache.Namespace cache cache.Namespace
err error err error
bpool *util.BufferPool
// Options // Options
cmp comparer.Comparer cmp comparer.Comparer
filter filter.Filter filter filter.Filter
@ -529,8 +531,6 @@ type Reader struct {
dataEnd int64 dataEnd int64
indexBlock *block indexBlock *block
filterBlock *filterBlock filterBlock *filterBlock
blockPool sync.Pool
} }
func verifyChecksum(data []byte) bool { func verifyChecksum(data []byte) bool {
@ -541,13 +541,7 @@ func verifyChecksum(data []byte) bool {
} }
func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) { func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) {
data, _ := r.blockPool.Get().([]byte) // data is either nil or a valid []byte from the pool data := r.bpool.Get(int(bh.length + blockTrailerLen))
if l := bh.length + blockTrailerLen; uint64(len(data)) >= l {
data = data[:l]
} else {
r.blockPool.Put(data)
data = make([]byte, l)
}
if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF { if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF {
return nil, err return nil, err
} }
@ -560,14 +554,16 @@ func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) {
case blockTypeNoCompression: case blockTypeNoCompression:
data = data[:bh.length] data = data[:bh.length]
case blockTypeSnappyCompression: case blockTypeSnappyCompression:
var err error decLen, err := snappy.DecodedLen(data[:bh.length])
decData, _ := r.blockPool.Get().([]byte) if err != nil {
decData, err = snappy.Decode(decData, data[:bh.length]) return nil, err
}
tmp := data
data, err = snappy.Decode(r.bpool.Get(decLen), tmp[:bh.length])
r.bpool.Put(tmp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r.blockPool.Put(data[:cap(data)])
data = decData
default: default:
return nil, fmt.Errorf("leveldb/table: Reader: unknown block compression type: %d", data[bh.length]) return nil, fmt.Errorf("leveldb/table: Reader: unknown block compression type: %d", data[bh.length])
} }
@ -614,6 +610,18 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB
return b, nil return b, nil
} }
type releaseBlock struct {
r *Reader
b *block
}
func (r releaseBlock) Release() {
if r.b.data != nil {
r.r.bpool.Put(r.b.data)
r.b.data = nil
}
}
func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
if r.cache != nil { if r.cache != nil {
// Get/set block cache. // Get/set block cache.
@ -628,6 +636,10 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi
ok = true ok = true
value = dataBlock value = dataBlock
charge = int(dataBH.length) charge = int(dataBH.length)
fin = func() {
r.bpool.Put(dataBlock.data)
dataBlock.data = nil
}
} }
return return
}) })
@ -650,7 +662,7 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi
if err != nil { if err != nil {
return iterator.NewEmptyIterator(err) return iterator.NewEmptyIterator(err)
} }
iter := dataBlock.newIterator(slice, false, nil) iter := dataBlock.newIterator(slice, false, releaseBlock{r, dataBlock})
return iter return iter
} }
@ -720,8 +732,11 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
} }
return return
} }
// Don't use block buffer, no need to copy the buffer.
rkey = data.Key() rkey = data.Key()
value = data.Value() // Use block buffer, and since the buffer will be recycled, the buffer
// need to be copied.
value = append([]byte{}, data.Value()...)
return return
} }
@ -772,13 +787,17 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
} }
// NewReader creates a new initialized table reader for the file. // NewReader creates a new initialized table reader for the file.
// The cache is optional and can be nil. // The cache and bpool is optional and can be nil.
// //
// The returned table reader instance is goroutine-safe. // The returned table reader instance is goroutine-safe.
func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, o *opt.Options) *Reader { func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.BufferPool, o *opt.Options) *Reader {
if bpool == nil {
bpool = util.NewBufferPool(o.GetBlockSize() + blockTrailerLen)
}
r := &Reader{ r := &Reader{
reader: f, reader: f,
cache: cache, cache: cache,
bpool: bpool,
cmp: o.GetComparer(), cmp: o.GetComparer(),
checksum: o.GetStrict(opt.StrictBlockChecksum), checksum: o.GetStrict(opt.StrictBlockChecksum),
strictIter: o.GetStrict(opt.StrictIterator), strictIter: o.GetStrict(opt.StrictIterator),

View File

@ -59,7 +59,7 @@ var _ = testutil.Defer(func() {
It("Should be able to approximate offset of a key correctly", func() { It("Should be able to approximate offset of a key correctly", func() {
Expect(err).ShouldNot(HaveOccurred()) Expect(err).ShouldNot(HaveOccurred())
tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, o) tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, nil, o)
CheckOffset := func(key string, expect, threshold int) { CheckOffset := func(key string, expect, threshold int) {
offset, err := tr.OffsetOf([]byte(key)) offset, err := tr.OffsetOf([]byte(key))
Expect(err).ShouldNot(HaveOccurred()) Expect(err).ShouldNot(HaveOccurred())
@ -95,7 +95,7 @@ var _ = testutil.Defer(func() {
tw.Close() tw.Close()
// Opening the table. // Opening the table.
tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, o) tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, nil, o)
return tableWrapper{tr} return tableWrapper{tr}
} }
Test := func(kv *testutil.KeyValue, body func(r *Reader)) func() { Test := func(kv *testutil.KeyValue, body func(r *Reader)) func() {

View File

@ -0,0 +1,126 @@
// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package util
import (
"fmt"
"sync"
"sync/atomic"
)
type buffer struct {
b []byte
miss int
}
// BufferPool is a 'buffer pool'.
type BufferPool struct {
pool [4]sync.Pool
size [3]uint32
sizeMiss [3]uint32
baseline0 int
baseline1 int
baseline2 int
less uint32
equal uint32
greater uint32
miss uint32
}
func (p *BufferPool) poolNum(n int) int {
switch {
case n <= p.baseline0:
return 0
case n <= p.baseline1:
return 1
case n <= p.baseline2:
return 2
default:
return 3
}
}
// Get returns buffer with length of n.
func (p *BufferPool) Get(n int) []byte {
if poolNum := p.poolNum(n); poolNum == 0 {
// Fast path.
if b, ok := p.pool[0].Get().([]byte); ok {
switch {
case cap(b) > n:
atomic.AddUint32(&p.less, 1)
return b[:n]
case cap(b) == n:
atomic.AddUint32(&p.equal, 1)
return b[:n]
default:
panic("not reached")
}
} else {
atomic.AddUint32(&p.miss, 1)
}
return make([]byte, n, p.baseline0)
} else {
sizePtr := &p.size[poolNum-1]
if b, ok := p.pool[poolNum].Get().([]byte); ok {
switch {
case cap(b) > n:
atomic.AddUint32(&p.less, 1)
return b[:n]
case cap(b) == n:
atomic.AddUint32(&p.equal, 1)
return b[:n]
default:
atomic.AddUint32(&p.greater, 1)
if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) {
p.pool[poolNum].Put(b)
}
}
} else {
atomic.AddUint32(&p.miss, 1)
}
if size := atomic.LoadUint32(sizePtr); uint32(n) > size {
if size == 0 {
atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n))
} else {
sizeMissPtr := &p.sizeMiss[poolNum-1]
if atomic.AddUint32(sizeMissPtr, 1) == 20 {
atomic.StoreUint32(sizePtr, uint32(n))
atomic.StoreUint32(sizeMissPtr, 0)
}
}
return make([]byte, n)
} else {
return make([]byte, n, size)
}
}
}
// Put adds given buffer to the pool.
func (p *BufferPool) Put(b []byte) {
p.pool[p.poolNum(cap(b))].Put(b)
}
func (p *BufferPool) String() string {
return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v L·%d E·%d G·%d M·%d}",
p.baseline0, p.size, p.sizeMiss, p.less, p.equal, p.greater, p.miss)
}
// NewBufferPool creates a new initialized 'buffer pool'.
func NewBufferPool(baseline int) *BufferPool {
if baseline <= 0 {
panic("baseline can't be <= 0")
}
return &BufferPool{
baseline0: baseline,
baseline1: baseline * 2,
baseline2: baseline * 4,
}
}