diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 91a14f9b0..316f71d6f 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -48,7 +48,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "9bca75c48d6c31becfbb127702b425e7226052e3" + "Rev": "e2fa4e6ac1cc41a73bc9fd467878ecbf65df5cc3" }, { "ImportPath": "github.com/vitrun/qart/coding", diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go index 299f6f6c9..6207e6815 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go @@ -7,10 +7,8 @@ package cache import ( - "fmt" "math/rand" "runtime" - "strings" "sync" "sync/atomic" "testing" @@ -225,16 +223,16 @@ func TestLRUCache_Purge(t *testing.T) { } type testingCacheObjectCounter struct { - created uint32 - released uint32 + created uint + released uint } func (c *testingCacheObjectCounter) createOne() { - atomic.AddUint32(&c.created, 1) + c.created++ } func (c *testingCacheObjectCounter) releaseOne() { - atomic.AddUint32(&c.released, 1) + c.released++ } type testingCacheObject struct { @@ -243,17 +241,75 @@ type testingCacheObject struct { ns, key uint64 - releaseCalled uint32 + releaseCalled bool } func (x *testingCacheObject) Release() { - if atomic.CompareAndSwapUint32(&x.releaseCalled, 0, 1) { + if !x.releaseCalled { + x.releaseCalled = true x.cnt.releaseOne() } else { x.t.Errorf("duplicate setfin NS#%d KEY#%s", x.ns, x.key) } } +func TestLRUCache_ConcurrentSetGet(t *testing.T) { + runtime.GOMAXPROCS(runtime.NumCPU()) + + seed := time.Now().UnixNano() + t.Logf("seed=%d", seed) + + const ( + N = 2000000 + M = 4000 + C = 3 + ) + + var set, get uint32 + + wg := &sync.WaitGroup{} + c := NewLRUCache(M / 4) + for ni := uint64(0); ni < C; ni++ { + r0 := rand.New(rand.NewSource(seed + int64(ni))) + r1 := rand.New(rand.NewSource(seed + int64(ni) + 1)) + ns := c.GetNamespace(ni) + + wg.Add(2) + go func(ns Namespace, r *rand.Rand) { + for i := 0; i < N; i++ { + x := uint64(r.Int63n(M)) + o := ns.Get(x, func() (int, interface{}) { + atomic.AddUint32(&set, 1) + return 1, x + }) + if v := o.Value().(uint64); v != x { + t.Errorf("#%d invalid value, got=%d", x, v) + } + o.Release() + } + wg.Done() + }(ns, r0) + go func(ns Namespace, r *rand.Rand) { + for i := 0; i < N; i++ { + x := uint64(r.Int63n(M)) + o := ns.Get(x, nil) + if o != nil { + atomic.AddUint32(&get, 1) + if v := o.Value().(uint64); v != x { + t.Errorf("#%d invalid value, got=%d", x, v) + } + o.Release() + } + } + wg.Done() + }(ns, r1) + } + + wg.Wait() + + t.Logf("set=%d get=%d", set, get) +} + func TestLRUCache_Finalizer(t *testing.T) { const ( capacity = 100 @@ -262,10 +318,6 @@ func TestLRUCache_Finalizer(t *testing.T) { keymax = 8000 ) - runtime.GOMAXPROCS(runtime.NumCPU()) - defer runtime.GOMAXPROCS(1) - - wg := &sync.WaitGroup{} cnt := &testingCacheObjectCounter{} c := NewLRUCache(capacity) @@ -273,38 +325,40 @@ func TestLRUCache_Finalizer(t *testing.T) { type instance struct { seed int64 rnd *rand.Rand - ns uint64 - effective int32 + nsid uint64 + ns Namespace + effective int handles []Handle handlesMap map[uint64]int delete bool purge bool zap bool - wantDel int32 - delfinCalledAll int32 - delfinCalledEff int32 - purgefinCalled int32 + wantDel int + delfinCalled int + delfinCalledAll int + delfinCalledEff int + purgefinCalled int } - instanceGet := func(p *instance, ns Namespace, key uint64) { - h := ns.Get(key, func() (charge int, value interface{}) { + instanceGet := func(p *instance, key uint64) { + h := p.ns.Get(key, func() (charge int, value interface{}) { to := &testingCacheObject{ t: t, cnt: cnt, - ns: p.ns, + ns: p.nsid, key: key, } - atomic.AddInt32(&p.effective, 1) + p.effective++ cnt.createOne() return 1, releaserFunc{func() { to.Release() - atomic.AddInt32(&p.effective, -1) + p.effective-- }, to} }) p.handles = append(p.handles, h) p.handlesMap[key] = p.handlesMap[key] + 1 } - instanceRelease := func(p *instance, ns Namespace, i int) { + instanceRelease := func(p *instance, i int) { h := p.handles[i] key := h.Value().(releaserFunc).value.(*testingCacheObject).key if n := p.handlesMap[key]; n == 0 { @@ -319,55 +373,71 @@ func TestLRUCache_Finalizer(t *testing.T) { p.handles[len(p.handles) : len(p.handles)+1][0] = nil } - seeds := make([]int64, goroutines) - instances := make([]instance, goroutines) + seed := time.Now().UnixNano() + t.Logf("seed=%d", seed) + + instances := make([]*instance, goroutines) for i := range instances { - p := &instances[i] + p := &instance{} p.handlesMap = make(map[uint64]int) - if seeds[i] == 0 { - seeds[i] = time.Now().UnixNano() - } - p.seed = seeds[i] + p.seed = seed + int64(i) p.rnd = rand.New(rand.NewSource(p.seed)) - p.ns = uint64(i) + p.nsid = uint64(i) + p.ns = c.GetNamespace(p.nsid) p.delete = i%6 == 0 p.purge = i%8 == 0 p.zap = i%12 == 0 || i%3 == 0 + instances[i] = p } - seedsStr := make([]string, len(seeds)) - for i, seed := range seeds { - seedsStr[i] = fmt.Sprint(seed) - } - t.Logf("seeds := []int64{%s}", strings.Join(seedsStr, ", ")) - - // Get and release. - for i := range instances { - p := &instances[i] - - wg.Add(1) - go func(p *instance) { - defer wg.Done() - - ns := c.GetNamespace(p.ns) - for i := 0; i < iterations; i++ { - if len(p.handles) == 0 || p.rnd.Int()%2 == 0 { - instanceGet(p, ns, uint64(p.rnd.Intn(keymax))) - } else { - instanceRelease(p, ns, p.rnd.Intn(len(p.handles))) + runr := rand.New(rand.NewSource(seed - 1)) + run := func(rnd *rand.Rand, x []*instance, init func(p *instance) bool, fn func(p *instance, i int) bool) { + var ( + rx []*instance + rn []int + ) + if init == nil { + rx = append([]*instance{}, x...) + rn = make([]int, len(x)) + } else { + for _, p := range x { + if init(p) { + rx = append(rx, p) + rn = append(rn, 0) } } - }(p) + } + for len(rx) > 0 { + i := rand.Intn(len(rx)) + if fn(rx[i], rn[i]) { + rn[i]++ + } else { + rx = append(rx[:i], rx[i+1:]...) + rn = append(rn[:i], rn[i+1:]...) + } + } } - wg.Wait() + + // Get and release. + run(runr, instances, nil, func(p *instance, i int) bool { + if i < iterations { + if len(p.handles) == 0 || p.rnd.Int()%2 == 0 { + instanceGet(p, uint64(p.rnd.Intn(keymax))) + } else { + instanceRelease(p, p.rnd.Intn(len(p.handles))) + } + return true + } else { + return false + } + }) if used, cap := c.Used(), c.Capacity(); used > cap { t.Errorf("Used > capacity, used=%d cap=%d", used, cap) } // Check effective objects. - for i := range instances { - p := &instances[i] + for i, p := range instances { if int(p.effective) < len(p.handlesMap) { t.Errorf("#%d effective objects < acquired handle, eo=%d ah=%d", i, p.effective, len(p.handlesMap)) } @@ -377,103 +447,93 @@ func TestLRUCache_Finalizer(t *testing.T) { t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size()) } - // Delete and purge. - for i := range instances { - p := &instances[i] + // First delete. + run(runr, instances, func(p *instance) bool { p.wantDel = p.effective - - wg.Add(1) - go func(p *instance) { - defer wg.Done() - - ns := c.GetNamespace(p.ns) - - if p.delete { - for key := uint64(0); key < keymax; key++ { - _, wantExist := p.handlesMap[key] - gotExist := ns.Delete(key, func(exist, pending bool) { - atomic.AddInt32(&p.delfinCalledAll, 1) - if exist { - atomic.AddInt32(&p.delfinCalledEff, 1) - } - }) - if !gotExist && wantExist { - t.Errorf("delete on NS#%d KEY#%d not found", p.ns, key) - } - } - - var delfinCalled int - for key := uint64(0); key < keymax; key++ { - func(key uint64) { - gotExist := ns.Delete(key, func(exist, pending bool) { - if exist && !pending { - t.Errorf("delete fin on NS#%d KEY#%d exist and not pending for deletion", p.ns, key) - } - delfinCalled++ - }) - if gotExist { - t.Errorf("delete on NS#%d KEY#%d found", p.ns, key) - } - }(key) - } - if delfinCalled != keymax { - t.Errorf("(2) #%d not all delete fin called, diff=%d", p.ns, keymax-delfinCalled) + return p.delete + }, func(p *instance, i int) bool { + key := uint64(i) + if key < keymax { + _, wantExist := p.handlesMap[key] + gotExist := p.ns.Delete(key, func(exist, pending bool) { + p.delfinCalledAll++ + if exist { + p.delfinCalledEff++ } + }) + if !gotExist && wantExist { + t.Errorf("delete on NS#%d KEY#%d not found", p.nsid, key) } + return true + } else { + return false + } + }) - if p.purge { - ns.Purge(func(ns, key uint64) { - atomic.AddInt32(&p.purgefinCalled, 1) - }) + // Second delete. + run(runr, instances, func(p *instance) bool { + p.delfinCalled = 0 + return p.delete + }, func(p *instance, i int) bool { + key := uint64(i) + if key < keymax { + gotExist := p.ns.Delete(key, func(exist, pending bool) { + if exist && !pending { + t.Errorf("delete fin on NS#%d KEY#%d exist and not pending for deletion", p.nsid, key) + } + p.delfinCalled++ + }) + if gotExist { + t.Errorf("delete on NS#%d KEY#%d found", p.nsid, key) } - }(p) - } - wg.Wait() + return true + } else { + if p.delfinCalled != keymax { + t.Errorf("(2) #%d not all delete fin called, diff=%d", p.ns, keymax-p.delfinCalled) + } + return false + } + }) + + // Purge. + run(runr, instances, func(p *instance) bool { + return p.purge + }, func(p *instance, i int) bool { + p.ns.Purge(func(ns, key uint64) { + p.purgefinCalled++ + }) + return false + }) if want := int(cnt.created - cnt.released); c.Size() != want { t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size()) } // Release. - for i := range instances { - p := &instances[i] - - if !p.zap { - wg.Add(1) - go func(p *instance) { - defer wg.Done() - - ns := c.GetNamespace(p.ns) - for i := len(p.handles) - 1; i >= 0; i-- { - instanceRelease(p, ns, i) - } - }(p) + run(runr, instances, func(p *instance) bool { + return !p.zap + }, func(p *instance, i int) bool { + if len(p.handles) > 0 { + instanceRelease(p, len(p.handles)-1) + return true + } else { + return false } - } - wg.Wait() + }) if want := int(cnt.created - cnt.released); c.Size() != want { t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size()) } // Zap. - for i := range instances { - p := &instances[i] - - if p.zap { - wg.Add(1) - go func(p *instance) { - defer wg.Done() - - ns := c.GetNamespace(p.ns) - ns.Zap() - - p.handles = nil - p.handlesMap = nil - }(p) - } - } - wg.Wait() + run(runr, instances, func(p *instance) bool { + return p.zap + }, func(p *instance, i int) bool { + p.ns.Zap() + p.handles = nil + p.handlesMap = nil + return false + }) if want := int(cnt.created - cnt.released); c.Size() != want { t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size()) @@ -485,23 +545,21 @@ func TestLRUCache_Finalizer(t *testing.T) { c.Purge(nil) - for i := range instances { - p := &instances[i] - + for _, p := range instances { if p.delete { if p.delfinCalledAll != keymax { - t.Errorf("#%d not all delete fin called, purge=%v zap=%v diff=%d", p.ns, p.purge, p.zap, keymax-p.delfinCalledAll) + t.Errorf("#%d not all delete fin called, purge=%v zap=%v diff=%d", p.nsid, p.purge, p.zap, keymax-p.delfinCalledAll) } if p.delfinCalledEff != p.wantDel { - t.Errorf("#%d not all effective delete fin called, diff=%d", p.ns, p.wantDel-p.delfinCalledEff) + t.Errorf("#%d not all effective delete fin called, diff=%d", p.nsid, p.wantDel-p.delfinCalledEff) } if p.purge && p.purgefinCalled > 0 { - t.Errorf("#%d some purge fin called, delete=%v zap=%v n=%d", p.ns, p.delete, p.zap, p.purgefinCalled) + t.Errorf("#%d some purge fin called, delete=%v zap=%v n=%d", p.nsid, p.delete, p.zap, p.purgefinCalled) } } else { if p.purge { if p.purgefinCalled != p.wantDel { - t.Errorf("#%d not all purge fin called, delete=%v zap=%v diff=%d", p.ns, p.delete, p.zap, p.wantDel-p.purgefinCalled) + t.Errorf("#%d not all purge fin called, delete=%v zap=%v diff=%d", p.nsid, p.delete, p.zap, p.wantDel-p.purgefinCalled) } } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go index ca608be2e..d56c6eac3 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go @@ -124,6 +124,9 @@ func (c *lruCache) Zap() { func (c *lruCache) evict() { top := &c.recent for n := c.recent.rPrev; c.used > c.capacity && n != top; { + if n.state != nodeEffective { + panic("evicting non effective node") + } n.state = nodeEvicted n.rRemove() n.derefNB() @@ -324,6 +327,10 @@ func (ns *lruNs) Get(key uint64, setf SetFunc) Handle { // Bump to front. n.rRemove() n.rInsert(&ns.lru.recent) + case nodeDeleted: + // Do nothing. + default: + panic("invalid state") } n.ref++ @@ -472,8 +479,10 @@ func (n *lruNode) fin() { r.Release() } if n.purgefin != nil { + if n.delfin != nil { + panic("conflicting delete and purge fin") + } n.purgefin(n.ns.id, n.key) - n.delfin = nil n.purgefin = nil } else if n.delfin != nil { n.delfin(true, false) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go index b38cdedc5..4c9032084 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -260,7 +260,7 @@ func (db *DB) memCompaction() { return c.commit(db.journalFile.Num(), db.frozenSeq) }, nil) - db.logf("mem@flush commited F·%d T·%v", len(c.rec.addedTables), stats.duration) + db.logf("mem@flush committed F·%d T·%v", len(c.rec.addedTables), stats.duration) for _, r := range c.rec.addedTables { stats.write += r.size @@ -478,7 +478,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { }, nil) resultSize := int(stats[1].write) - db.logf("table@compaction commited F%s S%s D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), dropCnt, stats[1].duration) + db.logf("table@compaction committed F%s S%s D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), dropCnt, stats[1].duration) // Save compaction stats for i := range stats { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go index c34c7abae..49c44059b 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -321,9 +321,13 @@ func (i *dbIter) Release() { } func (i *dbIter) SetReleaser(releaser util.Releaser) { - if i.dir != dirReleased { - i.releaser = releaser + if i.dir == dirReleased { + panic(util.ErrReleased) } + if i.releaser != nil && releaser != nil { + panic(util.ErrHasReleaser) + } + i.releaser = releaser } func (i *dbIter) Error() error { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go index 54f3e7401..b71906813 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -1577,11 +1577,7 @@ func TestDb_BloomFilter(t *testing.T) { return fmt.Sprintf("key%06d", i) } - const ( - n = 10000 - indexOverhead = 19898 - filterOverhead = 19799 - ) + const n = 10000 // Populate multiple layers for i := 0; i < n; i++ { @@ -1605,7 +1601,7 @@ func TestDb_BloomFilter(t *testing.T) { cnt := int(h.stor.ReadCounter()) t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt) - if min, max := n+indexOverhead+filterOverhead, n+indexOverhead+filterOverhead+2*n/100; cnt < min || cnt > max { + if min, max := n, n+2*n/100; cnt < min || cnt > max { t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt) } @@ -1616,7 +1612,7 @@ func TestDb_BloomFilter(t *testing.T) { } cnt = int(h.stor.ReadCounter()) t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt) - if max := 3*n/100 + indexOverhead + filterOverhead; cnt > max { + if max := 3 * n / 100; cnt > max { t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt) } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/array_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/array_iter.go index 9b4b72741..a23ab05f7 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/array_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/array_iter.go @@ -40,13 +40,19 @@ type basicArrayIterator struct { util.BasicReleaser array BasicArray pos int + err error } func (i *basicArrayIterator) Valid() bool { - return i.pos >= 0 && i.pos < i.array.Len() + return i.pos >= 0 && i.pos < i.array.Len() && !i.Released() } func (i *basicArrayIterator) First() bool { + if i.Released() { + i.err = ErrIterReleased + return false + } + if i.array.Len() == 0 { i.pos = -1 return false @@ -56,6 +62,11 @@ func (i *basicArrayIterator) First() bool { } func (i *basicArrayIterator) Last() bool { + if i.Released() { + i.err = ErrIterReleased + return false + } + n := i.array.Len() if n == 0 { i.pos = 0 @@ -66,6 +77,11 @@ func (i *basicArrayIterator) Last() bool { } func (i *basicArrayIterator) Seek(key []byte) bool { + if i.Released() { + i.err = ErrIterReleased + return false + } + n := i.array.Len() if n == 0 { i.pos = 0 @@ -79,6 +95,11 @@ func (i *basicArrayIterator) Seek(key []byte) bool { } func (i *basicArrayIterator) Next() bool { + if i.Released() { + i.err = ErrIterReleased + return false + } + i.pos++ if n := i.array.Len(); i.pos >= n { i.pos = n @@ -88,6 +109,11 @@ func (i *basicArrayIterator) Next() bool { } func (i *basicArrayIterator) Prev() bool { + if i.Released() { + i.err = ErrIterReleased + return false + } + i.pos-- if i.pos < 0 { i.pos = -1 @@ -96,7 +122,7 @@ func (i *basicArrayIterator) Prev() bool { return true } -func (i *basicArrayIterator) Error() error { return nil } +func (i *basicArrayIterator) Error() error { return i.err } type arrayIterator struct { basicArrayIterator diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter.go index 1e99a2bf6..8353b357d 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter.go @@ -26,9 +26,10 @@ type indexedIterator struct { strict bool strictGet bool - data Iterator - err error - errf func(err error) + data Iterator + err error + errf func(err error) + closed bool } func (i *indexedIterator) setData() { @@ -50,6 +51,15 @@ func (i *indexedIterator) clearData() { i.data = nil } +func (i *indexedIterator) indexErr() { + if err := i.index.Error(); err != nil { + if i.errf != nil { + i.errf(err) + } + i.err = err + } +} + func (i *indexedIterator) dataErr() bool { if i.errf != nil { if err := i.data.Error(); err != nil { @@ -72,9 +82,13 @@ func (i *indexedIterator) Valid() bool { func (i *indexedIterator) First() bool { if i.err != nil { return false + } else if i.Released() { + i.err = ErrIterReleased + return false } if !i.index.First() { + i.indexErr() i.clearData() return false } @@ -85,9 +99,13 @@ func (i *indexedIterator) First() bool { func (i *indexedIterator) Last() bool { if i.err != nil { return false + } else if i.Released() { + i.err = ErrIterReleased + return false } if !i.index.Last() { + i.indexErr() i.clearData() return false } @@ -105,9 +123,13 @@ func (i *indexedIterator) Last() bool { func (i *indexedIterator) Seek(key []byte) bool { if i.err != nil { return false + } else if i.Released() { + i.err = ErrIterReleased + return false } if !i.index.Seek(key) { + i.indexErr() i.clearData() return false } @@ -125,6 +147,9 @@ func (i *indexedIterator) Seek(key []byte) bool { func (i *indexedIterator) Next() bool { if i.err != nil { return false + } else if i.Released() { + i.err = ErrIterReleased + return false } switch { @@ -136,6 +161,7 @@ func (i *indexedIterator) Next() bool { fallthrough case i.data == nil: if !i.index.Next() { + i.indexErr() return false } i.setData() @@ -147,6 +173,9 @@ func (i *indexedIterator) Next() bool { func (i *indexedIterator) Prev() bool { if i.err != nil { return false + } else if i.Released() { + i.err = ErrIterReleased + return false } switch { @@ -158,6 +187,7 @@ func (i *indexedIterator) Prev() bool { fallthrough case i.data == nil: if !i.index.Prev() { + i.indexErr() return false } i.setData() diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go index 1b80184e8..c2522860b 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/iter.go @@ -14,6 +14,10 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +var ( + ErrIterReleased = errors.New("leveldb/iterator: iterator released") +) + // IteratorSeeker is the interface that wraps the 'seeks method'. type IteratorSeeker interface { // First moves the iterator to the first key/value pair. If the iterator @@ -100,28 +104,13 @@ type ErrorCallbackSetter interface { } type emptyIterator struct { - releaser util.Releaser - released bool - err error + util.BasicReleaser + err error } func (i *emptyIterator) rErr() { - if i.err == nil && i.released { - i.err = errors.New("leveldb/iterator: iterator released") - } -} - -func (i *emptyIterator) Release() { - if i.releaser != nil { - i.releaser.Release() - i.releaser = nil - } - i.released = true -} - -func (i *emptyIterator) SetReleaser(releaser util.Releaser) { - if !i.released { - i.releaser = releaser + if i.err == nil && i.Released() { + i.err = ErrIterReleased } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter.go index c8314c4e5..8370e25e7 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter.go @@ -7,16 +7,10 @@ package iterator import ( - "errors" - "github.com/syndtr/goleveldb/leveldb/comparer" "github.com/syndtr/goleveldb/leveldb/util" ) -var ( - ErrIterReleased = errors.New("leveldb/iterator: iterator released") -) - type dir int const ( @@ -274,9 +268,13 @@ func (i *mergedIterator) Release() { } func (i *mergedIterator) SetReleaser(releaser util.Releaser) { - if i.dir != dirReleased { - i.releaser = releaser + if i.dir == dirReleased { + panic(util.ErrReleased) } + if i.releaser != nil && releaser != nil { + panic(util.ErrHasReleaser) + } + i.releaser = releaser } func (i *mergedIterator) Error() error { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go index 37a8b5740..83ff7bc65 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go @@ -8,6 +8,7 @@ package memdb import ( + "errors" "math/rand" "sync" @@ -17,7 +18,8 @@ import ( ) var ( - ErrNotFound = util.ErrNotFound + ErrNotFound = util.ErrNotFound + ErrIterReleased = errors.New("leveldb/memdb: iterator released") ) const tMaxHeight = 12 @@ -29,6 +31,7 @@ type dbIter struct { node int forward bool key, value []byte + err error } func (i *dbIter) fill(checkStart, checkLimit bool) bool { @@ -59,6 +62,11 @@ func (i *dbIter) Valid() bool { } func (i *dbIter) First() bool { + if i.Released() { + i.err = ErrIterReleased + return false + } + i.forward = true i.p.mu.RLock() defer i.p.mu.RUnlock() @@ -71,9 +79,11 @@ func (i *dbIter) First() bool { } func (i *dbIter) Last() bool { - if i.p == nil { + if i.Released() { + i.err = ErrIterReleased return false } + i.forward = false i.p.mu.RLock() defer i.p.mu.RUnlock() @@ -86,9 +96,11 @@ func (i *dbIter) Last() bool { } func (i *dbIter) Seek(key []byte) bool { - if i.p == nil { + if i.Released() { + i.err = ErrIterReleased return false } + i.forward = true i.p.mu.RLock() defer i.p.mu.RUnlock() @@ -100,9 +112,11 @@ func (i *dbIter) Seek(key []byte) bool { } func (i *dbIter) Next() bool { - if i.p == nil { + if i.Released() { + i.err = ErrIterReleased return false } + if i.node == 0 { if !i.forward { return i.First() @@ -117,9 +131,11 @@ func (i *dbIter) Next() bool { } func (i *dbIter) Prev() bool { - if i.p == nil { + if i.Released() { + i.err = ErrIterReleased return false } + if i.node == 0 { if i.forward { return i.Last() @@ -141,10 +157,10 @@ func (i *dbIter) Value() []byte { return i.value } -func (i *dbIter) Error() error { return nil } +func (i *dbIter) Error() error { return i.err } func (i *dbIter) Release() { - if i.p != nil { + if !i.Released() { i.p = nil i.node = 0 i.key = nil diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go index 50a990948..715c9f5ba 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -147,7 +147,7 @@ func (s *session) fillRecord(r *sessionRecord, snapshot bool) { } } -// Mark if record has been commited, this will update session state; +// Mark if record has been committed, this will update session state; // need external synchronization. func (s *session) recordCommited(r *sessionRecord) { if r.has(recJournalNum) { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go index a1b04d827..87c4e155a 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go @@ -7,7 +7,9 @@ package leveldb import ( + "fmt" "sort" + "sync" "sync/atomic" "github.com/syndtr/goleveldb/leveldb/cache" @@ -276,6 +278,8 @@ type tOps struct { cache cache.Cache cacheNS cache.Namespace bpool *util.BufferPool + mu sync.Mutex + closed bool } // Creates an empty table and returns table writer. @@ -322,9 +326,42 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { return } +type trWrapper struct { + *table.Reader + t *tOps + ref int +} + +func (w *trWrapper) Release() { + if w.ref != 0 && !w.t.closed { + panic(fmt.Sprintf("BUG: invalid ref %d, refer to issue #72", w.ref)) + } + w.Reader.Release() +} + +type trCacheHandleWrapper struct { + cache.Handle + t *tOps + released bool +} + +func (w *trCacheHandleWrapper) Release() { + w.t.mu.Lock() + defer w.t.mu.Unlock() + + if !w.released { + w.released = true + w.Value().(*trWrapper).ref-- + } + w.Handle.Release() +} + // Opens table. It returns a cache handle, which should // be released after use. func (t *tOps) open(f *tFile) (ch cache.Handle, err error) { + t.mu.Lock() + defer t.mu.Unlock() + num := f.file.Num() ch = t.cacheNS.Get(num, func() (charge int, value interface{}) { var r storage.Reader @@ -337,11 +374,13 @@ func (t *tOps) open(f *tFile) (ch cache.Handle, err error) { if bc := t.s.o.GetBlockCache(); bc != nil { bcacheNS = bc.GetNamespace(num) } - return 1, table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o) + return 1, &trWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), t, 0} }) if ch == nil && err == nil { err = ErrClosed } + ch.Value().(*trWrapper).ref++ + ch = &trCacheHandleWrapper{ch, t, false} return } @@ -353,7 +392,7 @@ func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []b return nil, nil, err } defer ch.Release() - return ch.Value().(*table.Reader).Find(key, ro) + return ch.Value().(*trWrapper).Find(key, ro) } // Returns approximate offset of the given key. @@ -363,7 +402,7 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) { return } defer ch.Release() - offset_, err := ch.Value().(*table.Reader).OffsetOf(key) + offset_, err := ch.Value().(*trWrapper).OffsetOf(key) return uint64(offset_), err } @@ -373,7 +412,7 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite if err != nil { return iterator.NewEmptyIterator(err) } - iter := ch.Value().(*table.Reader).NewIterator(slice, ro) + iter := ch.Value().(*trWrapper).NewIterator(slice, ro) iter.SetReleaser(ch) return iter } @@ -381,6 +420,9 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite // Removes table from persistent storage. It waits until // no one use the the table. func (t *tOps) remove(f *tFile) { + t.mu.Lock() + defer t.mu.Unlock() + num := f.file.Num() t.cacheNS.Delete(num, func(exist, pending bool) { if !pending { @@ -399,6 +441,10 @@ func (t *tOps) remove(f *tFile) { // Closes the table ops instance. It will close all tables, // regadless still used or not. func (t *tOps) close() { + t.mu.Lock() + defer t.mu.Unlock() + + t.closed = true t.cache.Zap() t.bpool.Close() } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go index 5ec2b3e53..ab62c44ed 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -13,6 +13,7 @@ import ( "io" "sort" "strings" + "sync" "code.google.com/p/snappy-go/snappy" @@ -25,8 +26,9 @@ import ( ) var ( - ErrNotFound = util.ErrNotFound - ErrIterReleased = errors.New("leveldb/table: iterator released") + ErrNotFound = util.ErrNotFound + ErrReaderReleased = errors.New("leveldb/table: reader released") + ErrIterReleased = errors.New("leveldb/table: iterator released") ) func max(x, y int) int { @@ -134,9 +136,7 @@ func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releas } func (b *block) Release() { - if b.tr.bpool != nil { - b.tr.bpool.Put(b.data) - } + b.tr.bpool.Put(b.data) b.tr = nil b.data = nil } @@ -439,7 +439,7 @@ func (i *blockIter) Value() []byte { } func (i *blockIter) Release() { - if i.dir > dirReleased { + if i.dir != dirReleased { i.block = nil i.prevNode = nil i.prevKeys = nil @@ -458,9 +458,13 @@ func (i *blockIter) Release() { } func (i *blockIter) SetReleaser(releaser util.Releaser) { - if i.dir > dirReleased { - i.releaser = releaser + if i.dir == dirReleased { + panic(util.ErrReleased) } + if i.releaser != nil && releaser != nil { + panic(util.ErrHasReleaser) + } + i.releaser = releaser } func (i *blockIter) Valid() bool { @@ -495,9 +499,7 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool { } func (b *filterBlock) Release() { - if b.tr.bpool != nil { - b.tr.bpool.Put(b.data) - } + b.tr.bpool.Put(b.data) b.tr = nil b.data = nil } @@ -519,15 +521,17 @@ func (i *indexIter) Get() iterator.Iterator { if n == 0 { return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid table (bad data block handle)")) } + var slice *util.Range if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) { slice = i.slice } - return i.blockIter.block.tr.getDataIter(dataBH, slice, i.checksum, i.fillCache) + return i.blockIter.block.tr.getDataIterErr(dataBH, slice, i.checksum, i.fillCache) } // Reader is a table reader. type Reader struct { + mu sync.RWMutex reader io.ReaderAt cache cache.Namespace err error @@ -540,6 +544,8 @@ type Reader struct { dataEnd int64 indexBH, filterBH blockHandle + indexBlock *block + filterBlock *filterBlock } func verifyChecksum(data []byte) bool { @@ -688,6 +694,20 @@ func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterB return b, b, err } +func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) { + if r.indexBlock == nil { + return r.readBlockCached(r.indexBH, true, fillCache) + } + return r.indexBlock, util.NoopReleaser{}, nil +} + +func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) { + if r.filterBlock == nil { + return r.readFilterBlockCached(r.filterBH, fillCache) + } + return r.filterBlock, util.NoopReleaser{}, nil +} + func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { b, rel, err := r.readBlockCached(dataBH, checksum, fillCache) if err != nil { @@ -696,6 +716,17 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi return b.newIterator(slice, false, rel) } +func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { + r.mu.RLock() + defer r.mu.RUnlock() + + if r.err != nil { + return iterator.NewEmptyIterator(r.err) + } + + return r.getDataIter(dataBH, slice, checksum, fillCache) +} + // NewIterator creates an iterator from the table. // // Slice allows slicing the iterator to only contains keys in the given @@ -708,22 +739,25 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi // // Also read Iterator documentation of the leveldb/iterator package. func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { + r.mu.RLock() + defer r.mu.RUnlock() + if r.err != nil { return iterator.NewEmptyIterator(r.err) } fillCache := !ro.GetDontFillCache() - b, rel, err := r.readBlockCached(r.indexBH, true, fillCache) + indexBlock, rel, err := r.getIndexBlock(fillCache) if err != nil { return iterator.NewEmptyIterator(err) } index := &indexIter{ - blockIter: b.newIterator(slice, true, rel), + blockIter: indexBlock.newIterator(slice, true, rel), slice: slice, checksum: ro.GetStrict(opt.StrictBlockChecksum), fillCache: !ro.GetDontFillCache(), } - return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), false) + return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), true) } // Find finds key/value pair whose key is greater than or equal to the @@ -733,12 +767,15 @@ func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It // The caller should not modify the contents of the returned slice, but // it is safe to modify the contents of the argument after Find returns. func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err error) { + r.mu.RLock() + defer r.mu.RUnlock() + if r.err != nil { err = r.err return } - indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true) + indexBlock, rel, err := r.getIndexBlock(true) if err != nil { return } @@ -759,7 +796,7 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err return } if r.filter != nil { - filterBlock, rel, ferr := r.readFilterBlockCached(r.filterBH, true) + filterBlock, rel, ferr := r.getFilterBlock(true) if ferr == nil { if !filterBlock.contains(dataBH.offset, key) { rel.Release() @@ -779,9 +816,13 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err } // Don't use block buffer, no need to copy the buffer. rkey = data.Key() - // Use block buffer, and since the buffer will be recycled, the buffer - // need to be copied. - value = append([]byte{}, data.Value()...) + if r.bpool == nil { + value = data.Value() + } else { + // Use block buffer, and since the buffer will be recycled, the buffer + // need to be copied. + value = append([]byte{}, data.Value()...) + } return } @@ -791,6 +832,9 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err // The caller should not modify the contents of the returned slice, but // it is safe to modify the contents of the argument after Get returns. func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { + r.mu.RLock() + defer r.mu.RUnlock() + if r.err != nil { err = r.err return @@ -808,6 +852,9 @@ func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) // // It is safe to modify the contents of the argument after Get returns. func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { + r.mu.RLock() + defer r.mu.RUnlock() + if r.err != nil { err = r.err return @@ -840,12 +887,24 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { // Release implements util.Releaser. // It also close the file if it is an io.Closer. func (r *Reader) Release() { + r.mu.Lock() + defer r.mu.Unlock() + if closer, ok := r.reader.(io.Closer); ok { closer.Close() } + if r.indexBlock != nil { + r.indexBlock.Release() + r.indexBlock = nil + } + if r.filterBlock != nil { + r.filterBlock.Release() + r.filterBlock = nil + } r.reader = nil r.cache = nil r.bpool = nil + r.err = ErrReaderReleased } // NewReader creates a new initialized table reader for the file. @@ -853,9 +912,6 @@ func (r *Reader) Release() { // // The returned table reader instance is goroutine-safe. func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.BufferPool, o *opt.Options) *Reader { - if bpool == nil { - bpool = util.NewBufferPool(o.GetBlockSize() + blockTrailerLen) - } r := &Reader{ reader: f, cache: cache, @@ -930,5 +986,22 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf } metaIter.Release() metaBlock.Release() + + // Cache index and filter block locally, since we don't have global cache. + if cache == nil { + r.indexBlock, r.err = r.readBlock(r.indexBH, true) + if r.err != nil { + return r + } + if r.filter != nil { + r.filterBlock, err = r.readFilterBlock(r.filterBH) + if err != nil { + // Don't use filter then. + r.filter = nil + r.filterBH = blockHandle{} + } + } + } + return r } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go index 554e28ebd..aea39dca8 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go @@ -54,6 +54,10 @@ func (p *BufferPool) poolNum(n int) int { // Get returns buffer with length of n. func (p *BufferPool) Get(n int) []byte { + if p == nil { + return make([]byte, n) + } + atomic.AddUint32(&p.get, 1) poolNum := p.poolNum(n) @@ -145,6 +149,10 @@ func (p *BufferPool) Get(n int) []byte { // Put adds given buffer to the pool. func (p *BufferPool) Put(b []byte) { + if p == nil { + return + } + atomic.AddUint32(&p.put, 1) pool := p.pool[p.poolNum(cap(b))] @@ -156,6 +164,10 @@ func (p *BufferPool) Put(b []byte) { } func (p *BufferPool) Close() { + if p == nil { + return + } + select { case p.close <- struct{}{}: default: @@ -163,6 +175,10 @@ func (p *BufferPool) Close() { } func (p *BufferPool) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}", p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss) } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/util.go index 229c7d41f..f690e4842 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/util.go @@ -12,7 +12,9 @@ import ( ) var ( - ErrNotFound = errors.New("leveldb: not found") + ErrNotFound = errors.New("leveldb: not found") + ErrReleased = errors.New("leveldb: resource already relesed") + ErrHasReleaser = errors.New("leveldb: releaser already defined") ) // Releaser is the interface that wraps the basic Release method. @@ -27,23 +29,46 @@ type ReleaseSetter interface { // SetReleaser associates the given releaser to the resources. The // releaser will be called once coresponding resources released. // Calling SetReleaser with nil will clear the releaser. + // + // This will panic if a releaser already present or coresponding + // resource is already released. Releaser should be cleared first + // before assigned a new one. SetReleaser(releaser Releaser) } // BasicReleaser provides basic implementation of Releaser and ReleaseSetter. type BasicReleaser struct { releaser Releaser + released bool +} + +// Released returns whether Release method already called. +func (r *BasicReleaser) Released() bool { + return r.released } // Release implements Releaser.Release. func (r *BasicReleaser) Release() { - if r.releaser != nil { - r.releaser.Release() - r.releaser = nil + if !r.released { + if r.releaser != nil { + r.releaser.Release() + r.releaser = nil + } + r.released = true } } // SetReleaser implements ReleaseSetter.SetReleaser. func (r *BasicReleaser) SetReleaser(releaser Releaser) { + if r.released { + panic(ErrReleased) + } + if r.releaser != nil && releaser != nil { + panic(ErrHasReleaser) + } r.releaser = releaser } + +type NoopReleaser struct{} + +func (NoopReleaser) Release() {}