diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 3fa5f6201..d96e9fdbb 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/syncthing/syncthing", - "GoVersion": "go1.3", + "GoVersion": "go1.3.1", "Packages": [ "./cmd/..." ], @@ -49,7 +49,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "c9d6b7be1428942d4cf4f54055b991a8513392eb" + "Rev": "6f6f5d93f7499d2c505c2839c1d6b28b25a2ce21" }, { "ImportPath": "github.com/vitrun/qart/coding", diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go index ea6801a89..1b790402b 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go @@ -13,6 +13,7 @@ import ( "os" "path/filepath" "runtime" + "sync/atomic" "testing" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -170,7 +171,7 @@ func (p *dbBench) writes(perBatch int) { b.SetBytes(116) } -func (p *dbBench) drop() { +func (p *dbBench) gc() { p.keys, p.values = nil, nil runtime.GC() } @@ -249,6 +250,7 @@ func (p *dbBench) newIter() iterator.Iterator { } func (p *dbBench) close() { + p.b.Log(p.db.s.tops.bpool) p.db.Close() p.stor.Close() os.RemoveAll(benchDB) @@ -331,7 +333,7 @@ func BenchmarkDBRead(b *testing.B) { p := openDBBench(b, false) p.populate(b.N) p.fill() - p.drop() + p.gc() iter := p.newIter() b.ResetTimer() @@ -343,6 +345,50 @@ func BenchmarkDBRead(b *testing.B) { p.close() } +func BenchmarkDBReadConcurrent(b *testing.B) { + p := openDBBench(b, false) + p.populate(b.N) + p.fill() + p.gc() + defer p.close() + + b.ResetTimer() + b.SetBytes(116) + + b.RunParallel(func(pb *testing.PB) { + iter := p.newIter() + defer iter.Release() + for pb.Next() && iter.Next() { + } + }) +} + +func BenchmarkDBReadConcurrent2(b *testing.B) { + p := openDBBench(b, false) + p.populate(b.N) + p.fill() + p.gc() + defer p.close() + + b.ResetTimer() + b.SetBytes(116) + + var dir uint32 + b.RunParallel(func(pb *testing.PB) { + iter := p.newIter() + defer iter.Release() + if atomic.AddUint32(&dir, 1)%2 == 0 { + for pb.Next() && iter.Next() { + } + } else { + if pb.Next() && iter.Last() { + for pb.Next() && iter.Prev() { + } + } + } + }) +} + func BenchmarkDBReadGC(b *testing.B) { p := openDBBench(b, false) p.populate(b.N) @@ -362,7 +408,7 @@ func BenchmarkDBReadUncompressed(b *testing.B) { p := openDBBench(b, true) p.populate(b.N) p.fill() - p.drop() + p.gc() iter := p.newIter() b.ResetTimer() @@ -379,7 +425,7 @@ func BenchmarkDBReadTable(b *testing.B) { p.populate(b.N) p.fill() p.reopen() - p.drop() + p.gc() iter := p.newIter() b.ResetTimer() @@ -395,7 +441,7 @@ func BenchmarkDBReadReverse(b *testing.B) { p := openDBBench(b, false) p.populate(b.N) p.fill() - p.drop() + p.gc() iter := p.newIter() b.ResetTimer() @@ -413,7 +459,7 @@ func BenchmarkDBReadReverseTable(b *testing.B) { p.populate(b.N) p.fill() p.reopen() - p.drop() + p.gc() iter := p.newIter() b.ResetTimer() diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index 4d178b9c0..b29b5f1e7 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -257,6 +257,7 @@ func recoverTable(s *session, o *opt.Options) error { var mSeq uint64 var good, corrupted int rec := new(sessionRecord) + bpool := util.NewBufferPool(o.GetBlockSize() + 5) buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) { tmp = s.newTemp() writer, err := tmp.Create() @@ -314,7 +315,7 @@ func recoverTable(s *session, o *opt.Options) error { var tSeq uint64 var tgood, tcorrupted, blockerr int var imin, imax []byte - tr := table.NewReader(reader, size, nil, o) + tr := table.NewReader(reader, size, nil, bpool, o) iter := tr.NewIterator(nil, nil) iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) { s.logf("table@recovery found error @%d %q", file.Num(), err) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go index 2e2cc351e..e9a19ebcd 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go @@ -110,7 +110,7 @@ type ErrCorrupted struct { } func (e ErrCorrupted) Error() string { - return fmt.Sprintf("leveldb/journal: corrupted %d bytes: %s", e.Size, e.Reason) + return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size) } // Dropper is the interface that wrap simple Drop method. The Drop diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal_test.go index bde5cec22..0fcf22599 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal_test.go @@ -12,6 +12,7 @@ package journal import ( "bytes" + "encoding/binary" "fmt" "io" "io/ioutil" @@ -380,6 +381,94 @@ func TestCorrupt_MissingLastBlock(t *testing.T) { if err != io.ErrUnexpectedEOF { t.Fatalf("read #1: unexpected error: %v", err) } + + if _, err := r.Next(); err != io.EOF { + t.Fatalf("last next: unexpected error: %v", err) + } +} + +func TestCorrupt_CorruptedFirstBlock(t *testing.T) { + buf := new(bytes.Buffer) + + w := NewWriter(buf) + + // First record. + ww, err := w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil { + t.Fatalf("write #0: unexpected error: %v", err) + } + + // Second record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil { + t.Fatalf("write #1: unexpected error: %v", err) + } + + // Third record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil { + t.Fatalf("write #2: unexpected error: %v", err) + } + + // Fourth record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+2)); err != nil { + t.Fatalf("write #3: unexpected error: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + b := buf.Bytes() + // Corrupting block #0. + for i := 0; i < 1024; i++ { + b[i] = '1' + } + + r := NewReader(bytes.NewReader(b), dropper{t}, false, true) + + // First read (third record). + rr, err := r.Next() + if err != nil { + t.Fatal(err) + } + n, err := io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #0: %v", err) + } + if want := int64(blockSize-headerSize) + 1; n != want { + t.Fatalf("read #0: got %d bytes want %d", n, want) + } + + // Second read (fourth record). + rr, err = r.Next() + if err != nil { + t.Fatal(err) + } + n, err = io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #1: %v", err) + } + if want := int64(blockSize-headerSize) + 2; n != want { + t.Fatalf("read #1: got %d bytes want %d", n, want) + } + + if _, err := r.Next(); err != io.EOF { + t.Fatalf("last next: unexpected error: %v", err) + } } func TestCorrupt_CorruptedMiddleBlock(t *testing.T) { @@ -435,7 +524,7 @@ func TestCorrupt_CorruptedMiddleBlock(t *testing.T) { r := NewReader(bytes.NewReader(b), dropper{t}, false, true) - // First read. + // First read (first record). rr, err := r.Next() if err != nil { t.Fatal(err) @@ -448,7 +537,7 @@ func TestCorrupt_CorruptedMiddleBlock(t *testing.T) { t.Fatalf("read #0: got %d bytes want %d", n, want) } - // Second read. + // Second read (second record). rr, err = r.Next() if err != nil { t.Fatal(err) @@ -458,7 +547,7 @@ func TestCorrupt_CorruptedMiddleBlock(t *testing.T) { t.Fatalf("read #1: unexpected error: %v", err) } - // Third read. + // Third read (fourth record). rr, err = r.Next() if err != nil { t.Fatal(err) @@ -470,4 +559,260 @@ func TestCorrupt_CorruptedMiddleBlock(t *testing.T) { if want := int64(blockSize-headerSize) + 2; n != want { t.Fatalf("read #2: got %d bytes want %d", n, want) } + + if _, err := r.Next(); err != io.EOF { + t.Fatalf("last next: unexpected error: %v", err) + } +} + +func TestCorrupt_CorruptedLastBlock(t *testing.T) { + buf := new(bytes.Buffer) + + w := NewWriter(buf) + + // First record. + ww, err := w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil { + t.Fatalf("write #0: unexpected error: %v", err) + } + + // Second record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil { + t.Fatalf("write #1: unexpected error: %v", err) + } + + // Third record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil { + t.Fatalf("write #2: unexpected error: %v", err) + } + + // Fourth record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+2)); err != nil { + t.Fatalf("write #3: unexpected error: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + b := buf.Bytes() + // Corrupting block #3. + for i := len(b) - 1; i > len(b)-1024; i-- { + b[i] = '1' + } + + r := NewReader(bytes.NewReader(b), dropper{t}, false, true) + + // First read (first record). + rr, err := r.Next() + if err != nil { + t.Fatal(err) + } + n, err := io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #0: %v", err) + } + if want := int64(blockSize / 2); n != want { + t.Fatalf("read #0: got %d bytes want %d", n, want) + } + + // Second read (second record). + rr, err = r.Next() + if err != nil { + t.Fatal(err) + } + n, err = io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #1: %v", err) + } + if want := int64(blockSize - headerSize); n != want { + t.Fatalf("read #1: got %d bytes want %d", n, want) + } + + // Third read (third record). + rr, err = r.Next() + if err != nil { + t.Fatal(err) + } + n, err = io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #2: %v", err) + } + if want := int64(blockSize-headerSize) + 1; n != want { + t.Fatalf("read #2: got %d bytes want %d", n, want) + } + + // Fourth read (fourth record). + rr, err = r.Next() + if err != nil { + t.Fatal(err) + } + n, err = io.Copy(ioutil.Discard, rr) + if err != io.ErrUnexpectedEOF { + t.Fatalf("read #3: unexpected error: %v", err) + } + + if _, err := r.Next(); err != io.EOF { + t.Fatalf("last next: unexpected error: %v", err) + } +} + +func TestCorrupt_FirstChuckLengthOverflow(t *testing.T) { + buf := new(bytes.Buffer) + + w := NewWriter(buf) + + // First record. + ww, err := w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil { + t.Fatalf("write #0: unexpected error: %v", err) + } + + // Second record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil { + t.Fatalf("write #1: unexpected error: %v", err) + } + + // Third record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil { + t.Fatalf("write #2: unexpected error: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + b := buf.Bytes() + // Corrupting record #1. + x := blockSize + binary.LittleEndian.PutUint16(b[x+4:], 0xffff) + + r := NewReader(bytes.NewReader(b), dropper{t}, false, true) + + // First read (first record). + rr, err := r.Next() + if err != nil { + t.Fatal(err) + } + n, err := io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #0: %v", err) + } + if want := int64(blockSize / 2); n != want { + t.Fatalf("read #0: got %d bytes want %d", n, want) + } + + // Second read (second record). + rr, err = r.Next() + if err != nil { + t.Fatal(err) + } + n, err = io.Copy(ioutil.Discard, rr) + if err != io.ErrUnexpectedEOF { + t.Fatalf("read #1: unexpected error: %v", err) + } + + if _, err := r.Next(); err != io.EOF { + t.Fatalf("last next: unexpected error: %v", err) + } +} + +func TestCorrupt_MiddleChuckLengthOverflow(t *testing.T) { + buf := new(bytes.Buffer) + + w := NewWriter(buf) + + // First record. + ww, err := w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil { + t.Fatalf("write #0: unexpected error: %v", err) + } + + // Second record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil { + t.Fatalf("write #1: unexpected error: %v", err) + } + + // Third record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil { + t.Fatalf("write #2: unexpected error: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + b := buf.Bytes() + // Corrupting record #1. + x := blockSize/2 + headerSize + binary.LittleEndian.PutUint16(b[x+4:], 0xffff) + + r := NewReader(bytes.NewReader(b), dropper{t}, false, true) + + // First read (first record). + rr, err := r.Next() + if err != nil { + t.Fatal(err) + } + n, err := io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #0: %v", err) + } + if want := int64(blockSize / 2); n != want { + t.Fatalf("read #0: got %d bytes want %d", n, want) + } + + // Second read (third record). + rr, err = r.Next() + if err != nil { + t.Fatal(err) + } + n, err = io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #1: %v", err) + } + if want := int64(blockSize-headerSize) + 1; n != want { + t.Fatalf("read #1: got %d bytes want %d", n, want) + } + + if _, err := r.Next(); err != io.EOF { + t.Fatalf("last next: unexpected error: %v", err) + } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go index b44120e5b..46dd599be 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go @@ -275,6 +275,7 @@ type tOps struct { s *session cache cache.Cache cacheNS cache.Namespace + bpool *util.BufferPool } // Creates an empty table and returns table writer. @@ -340,7 +341,7 @@ func (t *tOps) open(f *tFile) (c cache.Object, err error) { } ok = true - value = table.NewReader(r, int64(f.size), cacheNS, o) + value = table.NewReader(r, int64(f.size), cacheNS, t.bpool, o) charge = 1 fin = func() { r.Close() @@ -412,8 +413,12 @@ func (t *tOps) close() { // Creates new initialized table ops instance. func newTableOps(s *session, cacheCap int) *tOps { c := cache.NewLRUCache(cacheCap) - ns := c.GetNamespace(0) - return &tOps{s, c, ns} + return &tOps{ + s: s, + cache: c, + cacheNS: c.GetNamespace(0), + bpool: util.NewBufferPool(s.o.GetBlockSize() + 5), + } } // tWriter wraps the table writer. It keep track of file descriptor diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go index 84267dd3a..0c62a3c62 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -13,7 +13,6 @@ import ( "io" "sort" "strings" - "sync" "code.google.com/p/snappy-go/snappy" @@ -438,18 +437,20 @@ func (i *blockIter) Value() []byte { } func (i *blockIter) Release() { - i.prevNode = nil - i.prevKeys = nil - i.key = nil - i.value = nil - i.dir = dirReleased - if i.cache != nil { - i.cache.Release() - i.cache = nil - } - if i.releaser != nil { - i.releaser.Release() - i.releaser = nil + if i.dir > dirReleased { + i.prevNode = nil + i.prevKeys = nil + i.key = nil + i.value = nil + i.dir = dirReleased + if i.cache != nil { + i.cache.Release() + i.cache = nil + } + if i.releaser != nil { + i.releaser.Release() + i.releaser = nil + } } } @@ -520,6 +521,7 @@ type Reader struct { reader io.ReaderAt cache cache.Namespace err error + bpool *util.BufferPool // Options cmp comparer.Comparer filter filter.Filter @@ -529,8 +531,6 @@ type Reader struct { dataEnd int64 indexBlock *block filterBlock *filterBlock - - blockPool sync.Pool } func verifyChecksum(data []byte) bool { @@ -541,13 +541,7 @@ func verifyChecksum(data []byte) bool { } func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) { - data, _ := r.blockPool.Get().([]byte) // data is either nil or a valid []byte from the pool - if l := bh.length + blockTrailerLen; uint64(len(data)) >= l { - data = data[:l] - } else { - r.blockPool.Put(data) - data = make([]byte, l) - } + data := r.bpool.Get(int(bh.length + blockTrailerLen)) if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF { return nil, err } @@ -560,14 +554,16 @@ func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) { case blockTypeNoCompression: data = data[:bh.length] case blockTypeSnappyCompression: - var err error - decData, _ := r.blockPool.Get().([]byte) - decData, err = snappy.Decode(decData, data[:bh.length]) + decLen, err := snappy.DecodedLen(data[:bh.length]) + if err != nil { + return nil, err + } + tmp := data + data, err = snappy.Decode(r.bpool.Get(decLen), tmp[:bh.length]) + r.bpool.Put(tmp) if err != nil { return nil, err } - r.blockPool.Put(data[:cap(data)]) - data = decData default: return nil, fmt.Errorf("leveldb/table: Reader: unknown block compression type: %d", data[bh.length]) } @@ -614,6 +610,18 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB return b, nil } +type releaseBlock struct { + r *Reader + b *block +} + +func (r releaseBlock) Release() { + if r.b.data != nil { + r.r.bpool.Put(r.b.data) + r.b.data = nil + } +} + func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { if r.cache != nil { // Get/set block cache. @@ -628,6 +636,10 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi ok = true value = dataBlock charge = int(dataBH.length) + fin = func() { + r.bpool.Put(dataBlock.data) + dataBlock.data = nil + } } return }) @@ -650,7 +662,7 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi if err != nil { return iterator.NewEmptyIterator(err) } - iter := dataBlock.newIterator(slice, false, nil) + iter := dataBlock.newIterator(slice, false, releaseBlock{r, dataBlock}) return iter } @@ -720,8 +732,11 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err } return } + // Don't use block buffer, no need to copy the buffer. rkey = data.Key() - value = data.Value() + // Use block buffer, and since the buffer will be recycled, the buffer + // need to be copied. + value = append([]byte{}, data.Value()...) return } @@ -772,13 +787,17 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { } // NewReader creates a new initialized table reader for the file. -// The cache is optional and can be nil. +// The cache and bpool is optional and can be nil. // // The returned table reader instance is goroutine-safe. -func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, o *opt.Options) *Reader { +func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.BufferPool, o *opt.Options) *Reader { + if bpool == nil { + bpool = util.NewBufferPool(o.GetBlockSize() + blockTrailerLen) + } r := &Reader{ reader: f, cache: cache, + bpool: bpool, cmp: o.GetComparer(), checksum: o.GetStrict(opt.StrictBlockChecksum), strictIter: o.GetStrict(opt.StrictIterator), diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go index d7d3b2a4b..0751cf529 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go @@ -59,7 +59,7 @@ var _ = testutil.Defer(func() { It("Should be able to approximate offset of a key correctly", func() { Expect(err).ShouldNot(HaveOccurred()) - tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, o) + tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, nil, o) CheckOffset := func(key string, expect, threshold int) { offset, err := tr.OffsetOf([]byte(key)) Expect(err).ShouldNot(HaveOccurred()) @@ -95,7 +95,7 @@ var _ = testutil.Defer(func() { tw.Close() // Opening the table. - tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, o) + tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, nil, o) return tableWrapper{tr} } Test := func(kv *testutil.KeyValue, body func(r *Reader)) func() { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go new file mode 100644 index 000000000..edafdfdeb --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go @@ -0,0 +1,126 @@ +// Copyright (c) 2014, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package util + +import ( + "fmt" + "sync" + "sync/atomic" +) + +type buffer struct { + b []byte + miss int +} + +// BufferPool is a 'buffer pool'. +type BufferPool struct { + pool [4]sync.Pool + size [3]uint32 + sizeMiss [3]uint32 + baseline0 int + baseline1 int + baseline2 int + + less uint32 + equal uint32 + greater uint32 + miss uint32 +} + +func (p *BufferPool) poolNum(n int) int { + switch { + case n <= p.baseline0: + return 0 + case n <= p.baseline1: + return 1 + case n <= p.baseline2: + return 2 + default: + return 3 + } +} + +// Get returns buffer with length of n. +func (p *BufferPool) Get(n int) []byte { + if poolNum := p.poolNum(n); poolNum == 0 { + // Fast path. + if b, ok := p.pool[0].Get().([]byte); ok { + switch { + case cap(b) > n: + atomic.AddUint32(&p.less, 1) + return b[:n] + case cap(b) == n: + atomic.AddUint32(&p.equal, 1) + return b[:n] + default: + panic("not reached") + } + } else { + atomic.AddUint32(&p.miss, 1) + } + + return make([]byte, n, p.baseline0) + } else { + sizePtr := &p.size[poolNum-1] + + if b, ok := p.pool[poolNum].Get().([]byte); ok { + switch { + case cap(b) > n: + atomic.AddUint32(&p.less, 1) + return b[:n] + case cap(b) == n: + atomic.AddUint32(&p.equal, 1) + return b[:n] + default: + atomic.AddUint32(&p.greater, 1) + if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) { + p.pool[poolNum].Put(b) + } + } + } else { + atomic.AddUint32(&p.miss, 1) + } + + if size := atomic.LoadUint32(sizePtr); uint32(n) > size { + if size == 0 { + atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n)) + } else { + sizeMissPtr := &p.sizeMiss[poolNum-1] + if atomic.AddUint32(sizeMissPtr, 1) == 20 { + atomic.StoreUint32(sizePtr, uint32(n)) + atomic.StoreUint32(sizeMissPtr, 0) + } + } + return make([]byte, n) + } else { + return make([]byte, n, size) + } + } +} + +// Put adds given buffer to the pool. +func (p *BufferPool) Put(b []byte) { + p.pool[p.poolNum(cap(b))].Put(b) +} + +func (p *BufferPool) String() string { + return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v L·%d E·%d G·%d M·%d}", + p.baseline0, p.size, p.sizeMiss, p.less, p.equal, p.greater, p.miss) +} + +// NewBufferPool creates a new initialized 'buffer pool'. +func NewBufferPool(baseline int) *BufferPool { + if baseline <= 0 { + panic("baseline can't be <= 0") + } + return &BufferPool{ + baseline0: baseline, + baseline1: baseline * 2, + baseline2: baseline * 4, + } +}