diff --git a/vendor/github.com/syndtr/goleveldb/LICENSE b/vendor/github.com/syndtr/goleveldb/LICENSE new file mode 100644 index 000000000..4a772d1ab --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/LICENSE @@ -0,0 +1,24 @@ +Copyright 2012 Suryandaru Triandana +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/syndtr/goleveldb/README.md b/vendor/github.com/syndtr/goleveldb/README.md new file mode 100644 index 000000000..259286f55 --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/README.md @@ -0,0 +1,105 @@ +This is an implementation of the [LevelDB key/value database](http:code.google.com/p/leveldb) in the [Go programming language](http:golang.org). + +[![Build Status](https://travis-ci.org/syndtr/goleveldb.png?branch=master)](https://travis-ci.org/syndtr/goleveldb) + +Installation +----------- + + go get github.com/syndtr/goleveldb/leveldb + +Requirements +----------- + +* Need at least `go1.4` or newer. + +Usage +----------- + +Create or open a database: +```go +db, err := leveldb.OpenFile("path/to/db", nil) +... +defer db.Close() +... +``` +Read or modify the database content: +```go +// Remember that the contents of the returned slice should not be modified. +data, err := db.Get([]byte("key"), nil) +... +err = db.Put([]byte("key"), []byte("value"), nil) +... +err = db.Delete([]byte("key"), nil) +... +``` + +Iterate over database content: +```go +iter := db.NewIterator(nil, nil) +for iter.Next() { + // Remember that the contents of the returned slice should not be modified, and + // only valid until the next call to Next. + key := iter.Key() + value := iter.Value() + ... +} +iter.Release() +err = iter.Error() +... +``` +Seek-then-Iterate: +```go +iter := db.NewIterator(nil, nil) +for ok := iter.Seek(key); ok; ok = iter.Next() { + // Use key/value. + ... +} +iter.Release() +err = iter.Error() +... +``` +Iterate over subset of database content: +```go +iter := db.NewIterator(&util.Range{Start: []byte("foo"), Limit: []byte("xoo")}, nil) +for iter.Next() { + // Use key/value. + ... +} +iter.Release() +err = iter.Error() +... +``` +Iterate over subset of database content with a particular prefix: +```go +iter := db.NewIterator(util.BytesPrefix([]byte("foo-")), nil) +for iter.Next() { + // Use key/value. + ... +} +iter.Release() +err = iter.Error() +... +``` +Batch writes: +```go +batch := new(leveldb.Batch) +batch.Put([]byte("foo"), []byte("value")) +batch.Put([]byte("bar"), []byte("another value")) +batch.Delete([]byte("baz")) +err = db.Write(batch, nil) +... +``` +Use bloom filter: +```go +o := &opt.Options{ + Filter: filter.NewBloomFilter(10), +} +db, err := leveldb.OpenFile("path/to/db", o) +... +defer db.Close() +... +``` +Documentation +----------- + +You can read package documentation [here](http:godoc.org/github.com/syndtr/goleveldb). diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/bench_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/bench_test.go index 5c587b93d..12a849621 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/bench_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/bench_test.go @@ -13,6 +13,7 @@ import ( "os" "path/filepath" "runtime" + "sync/atomic" "testing" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -462,3 +463,47 @@ func BenchmarkDBGetRandom(b *testing.B) { p.gets() p.close() } + +func BenchmarkDBReadConcurrent(b *testing.B) { + p := openDBBench(b, false) + p.populate(b.N) + p.fill() + p.gc() + defer p.close() + + b.ResetTimer() + b.SetBytes(116) + + b.RunParallel(func(pb *testing.PB) { + iter := p.newIter() + defer iter.Release() + for pb.Next() && iter.Next() { + } + }) +} + +func BenchmarkDBReadConcurrent2(b *testing.B) { + p := openDBBench(b, false) + p.populate(b.N) + p.fill() + p.gc() + defer p.close() + + b.ResetTimer() + b.SetBytes(116) + + var dir uint32 + b.RunParallel(func(pb *testing.PB) { + iter := p.newIter() + defer iter.Release() + if atomic.AddUint32(&dir, 1)%2 == 0 { + for pb.Next() && iter.Next() { + } + } else { + if pb.Next() && iter.Last() { + for pb.Next() && iter.Prev() { + } + } + } + }) +} diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/cache/bench_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/cache/bench_test.go new file mode 100644 index 000000000..89aef69ab --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/leveldb/cache/bench_test.go @@ -0,0 +1,29 @@ +// Copyright (c) 2012, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package cache + +import ( + "math/rand" + "testing" + "time" +) + +func BenchmarkLRUCache(b *testing.B) { + c := NewCache(NewLRU(10000)) + + b.SetParallelism(10) + b.RunParallel(func(pb *testing.PB) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for pb.Next() { + key := uint64(r.Intn(1000000)) + c.Get(0, key, func() (int, Value) { + return 1, key + }).Release() + } + }) +} diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go index 9be048731..37d08eb86 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -1740,139 +1740,165 @@ func TestDB_BloomFilter(t *testing.T) { } func TestDB_Concurrent(t *testing.T) { - const n, secs, maxkey = 4, 2, 1000 + const n, secs, maxkey = 4, 6, 1000 + h := newDbHarness(t) + defer h.close() - runtime.GOMAXPROCS(n) - trun(t, func(h *dbHarness) { - var closeWg sync.WaitGroup - var stop uint32 - var cnt [n]uint32 + runtime.GOMAXPROCS(runtime.NumCPU()) - for i := 0; i < n; i++ { - closeWg.Add(1) - go func(i int) { - var put, get, found uint - defer func() { - t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d", - i, cnt[i], put, get, found, get-found) - closeWg.Done() - }() + var ( + closeWg sync.WaitGroup + stop uint32 + cnt [n]uint32 + ) - rnd := rand.New(rand.NewSource(int64(1000 + i))) - for atomic.LoadUint32(&stop) == 0 { - x := cnt[i] + for i := 0; i < n; i++ { + closeWg.Add(1) + go func(i int) { + var put, get, found uint + defer func() { + t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d", + i, cnt[i], put, get, found, get-found) + closeWg.Done() + }() - k := rnd.Intn(maxkey) - kstr := fmt.Sprintf("%016d", k) + rnd := rand.New(rand.NewSource(int64(1000 + i))) + for atomic.LoadUint32(&stop) == 0 { + x := cnt[i] - if (rnd.Int() % 2) > 0 { - put++ - h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x)) - } else { - get++ - v, err := h.db.Get([]byte(kstr), h.ro) - if err == nil { - found++ - rk, ri, rx := 0, -1, uint32(0) - fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx) - if rk != k { - t.Errorf("invalid key want=%d got=%d", k, rk) - } - if ri < 0 || ri >= n { - t.Error("invalid goroutine number: ", ri) - } else { - tx := atomic.LoadUint32(&(cnt[ri])) - if rx > tx { - t.Errorf("invalid seq number, %d > %d ", rx, tx) - } - } - } else if err != ErrNotFound { - t.Error("Get: got error: ", err) - return + k := rnd.Intn(maxkey) + kstr := fmt.Sprintf("%016d", k) + + if (rnd.Int() % 2) > 0 { + put++ + h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x)) + } else { + get++ + v, err := h.db.Get([]byte(kstr), h.ro) + if err == nil { + found++ + rk, ri, rx := 0, -1, uint32(0) + fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx) + if rk != k { + t.Errorf("invalid key want=%d got=%d", k, rk) } + if ri < 0 || ri >= n { + t.Error("invalid goroutine number: ", ri) + } else { + tx := atomic.LoadUint32(&(cnt[ri])) + if rx > tx { + t.Errorf("invalid seq number, %d > %d ", rx, tx) + } + } + } else if err != ErrNotFound { + t.Error("Get: got error: ", err) + return } - atomic.AddUint32(&cnt[i], 1) } - }(i) - } + atomic.AddUint32(&cnt[i], 1) + } + }(i) + } - time.Sleep(secs * time.Second) - atomic.StoreUint32(&stop, 1) - closeWg.Wait() - }) - - runtime.GOMAXPROCS(1) + time.Sleep(secs * time.Second) + atomic.StoreUint32(&stop, 1) + closeWg.Wait() } -func TestDB_Concurrent2(t *testing.T) { - const n, n2 = 4, 4000 +func TestDB_ConcurrentIterator(t *testing.T) { + const n, n2 = 4, 1000 + h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30}) + defer h.close() - runtime.GOMAXPROCS(n*2 + 2) - truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30}, func(h *dbHarness) { - var closeWg sync.WaitGroup - var stop uint32 + runtime.GOMAXPROCS(runtime.NumCPU()) - for i := 0; i < n; i++ { - closeWg.Add(1) - go func(i int) { - for k := 0; atomic.LoadUint32(&stop) == 0; k++ { - h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10)) + var ( + closeWg sync.WaitGroup + stop uint32 + ) + + for i := 0; i < n; i++ { + closeWg.Add(1) + go func(i int) { + for k := 0; atomic.LoadUint32(&stop) == 0; k++ { + h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10)) + } + closeWg.Done() + }(i) + } + + for i := 0; i < n; i++ { + closeWg.Add(1) + go func(i int) { + for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- { + h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10)) + } + closeWg.Done() + }(i) + } + + cmp := comparer.DefaultComparer + for i := 0; i < n2; i++ { + closeWg.Add(1) + go func(i int) { + it := h.db.NewIterator(nil, nil) + var pk []byte + for it.Next() { + kk := it.Key() + if cmp.Compare(kk, pk) <= 0 { + t.Errorf("iter %d: %q is successor of %q", i, pk, kk) } - closeWg.Done() - }(i) - } - - for i := 0; i < n; i++ { - closeWg.Add(1) - go func(i int) { - for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- { - h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10)) + pk = append(pk[:0], kk...) + var k, vk, vi int + if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil { + t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err) + } else if n < 1 { + t.Errorf("iter %d: Cannot parse key %q", i, it.Key()) } - closeWg.Done() - }(i) - } - - cmp := comparer.DefaultComparer - for i := 0; i < n2; i++ { - closeWg.Add(1) - go func(i int) { - it := h.db.NewIterator(nil, nil) - var pk []byte - for it.Next() { - kk := it.Key() - if cmp.Compare(kk, pk) <= 0 { - t.Errorf("iter %d: %q is successor of %q", i, pk, kk) - } - pk = append(pk[:0], kk...) - var k, vk, vi int - if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil { - t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err) - } else if n < 1 { - t.Errorf("iter %d: Cannot parse key %q", i, it.Key()) - } - if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil { - t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err) - } else if n < 2 { - t.Errorf("iter %d: Cannot parse value %q", i, it.Value()) - } - - if vk != k { - t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk) - } + if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil { + t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err) + } else if n < 2 { + t.Errorf("iter %d: Cannot parse value %q", i, it.Value()) } - if err := it.Error(); err != nil { - t.Errorf("iter %d: Got error: %v", i, err) + + if vk != k { + t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk) } - it.Release() - closeWg.Done() - }(i) - } + } + if err := it.Error(); err != nil { + t.Errorf("iter %d: Got error: %v", i, err) + } + it.Release() + closeWg.Done() + }(i) + } - atomic.StoreUint32(&stop, 1) - closeWg.Wait() - }) + atomic.StoreUint32(&stop, 1) + closeWg.Wait() +} - runtime.GOMAXPROCS(1) +func TestDB_ConcurrentWrite(t *testing.T) { + const n, niter = 10, 10000 + h := newDbHarness(t) + defer h.close() + + runtime.GOMAXPROCS(runtime.NumCPU()) + + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for k := 0; k < niter; k++ { + kstr := fmt.Sprintf("%d.%d", i, k) + vstr := fmt.Sprintf("v%d", k) + h.put(kstr, vstr) + // Key should immediately available after put returns. + h.getVal(kstr, vstr) + } + }(i) + } + wg.Wait() } func TestDB_CreateReopenDbOnFile(t *testing.T) { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go index fb7896139..5576761fe 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -166,15 +166,15 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { merged := 0 danglingMerge := false defer func() { + for i := 0; i < merged; i++ { + db.writeAckC <- err + } if danglingMerge { // Only one dangling merge at most, so this is safe. db.writeMergedC <- false } else { <-db.writeLockC } - for i := 0; i < merged; i++ { - db.writeAckC <- err - } }() mdb, mdbFree, err := db.flush(b.size()) diff --git a/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/key.go b/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/key.go new file mode 100644 index 000000000..c9f696381 --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/key.go @@ -0,0 +1,137 @@ +package main + +import ( + "encoding/binary" + "fmt" + + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +type ErrIkeyCorrupted struct { + Ikey []byte + Reason string +} + +func (e *ErrIkeyCorrupted) Error() string { + return fmt.Sprintf("leveldb: iKey %q corrupted: %s", e.Ikey, e.Reason) +} + +func newErrIkeyCorrupted(ikey []byte, reason string) error { + return errors.NewErrCorrupted(storage.FileDesc{}, &ErrIkeyCorrupted{append([]byte{}, ikey...), reason}) +} + +type kType int + +func (kt kType) String() string { + switch kt { + case ktDel: + return "d" + case ktVal: + return "v" + } + return "x" +} + +// Value types encoded as the last component of internal keys. +// Don't modify; this value are saved to disk. +const ( + ktDel kType = iota + ktVal +) + +// ktSeek defines the kType that should be passed when constructing an +// internal key for seeking to a particular sequence number (since we +// sort sequence numbers in decreasing order and the value type is +// embedded as the low 8 bits in the sequence number in internal keys, +// we need to use the highest-numbered ValueType, not the lowest). +const ktSeek = ktVal + +const ( + // Maximum value possible for sequence number; the 8-bits are + // used by value type, so its can packed together in single + // 64-bit integer. + kMaxSeq uint64 = (uint64(1) << 56) - 1 + // Maximum value possible for packed sequence number and type. + kMaxNum uint64 = (kMaxSeq << 8) | uint64(ktSeek) +) + +// Maximum number encoded in bytes. +var kMaxNumBytes = make([]byte, 8) + +func init() { + binary.LittleEndian.PutUint64(kMaxNumBytes, kMaxNum) +} + +type iKey []byte + +func newIkey(ukey []byte, seq uint64, kt kType) iKey { + if seq > kMaxSeq { + panic("leveldb: invalid sequence number") + } else if kt > ktVal { + panic("leveldb: invalid type") + } + + ik := make(iKey, len(ukey)+8) + copy(ik, ukey) + binary.LittleEndian.PutUint64(ik[len(ukey):], (seq<<8)|uint64(kt)) + return ik +} + +func parseIkey(ik []byte) (ukey []byte, seq uint64, kt kType, err error) { + if len(ik) < 8 { + return nil, 0, 0, newErrIkeyCorrupted(ik, "invalid length") + } + num := binary.LittleEndian.Uint64(ik[len(ik)-8:]) + seq, kt = uint64(num>>8), kType(num&0xff) + if kt > ktVal { + return nil, 0, 0, newErrIkeyCorrupted(ik, "invalid type") + } + ukey = ik[:len(ik)-8] + return +} + +func validIkey(ik []byte) bool { + _, _, _, err := parseIkey(ik) + return err == nil +} + +func (ik iKey) assert() { + if ik == nil { + panic("leveldb: nil iKey") + } + if len(ik) < 8 { + panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid length", ik, len(ik))) + } +} + +func (ik iKey) ukey() []byte { + ik.assert() + return ik[:len(ik)-8] +} + +func (ik iKey) num() uint64 { + ik.assert() + return binary.LittleEndian.Uint64(ik[len(ik)-8:]) +} + +func (ik iKey) parseNum() (seq uint64, kt kType) { + num := ik.num() + seq, kt = uint64(num>>8), kType(num&0xff) + if kt > ktVal { + panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid type %#x", ik, len(ik), kt)) + } + return +} + +func (ik iKey) String() string { + if ik == nil { + return "" + } + + if ukey, seq, kt, err := parseIkey(ik); err == nil { + return fmt.Sprintf("%x,%s%d", ukey, kt, seq) + } else { + return "" + } +} diff --git a/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/main.go b/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/main.go new file mode 100644 index 000000000..cf8466e9c --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/manualtest/dbstress/main.go @@ -0,0 +1,628 @@ +package main + +import ( + "crypto/rand" + "encoding/binary" + "flag" + "fmt" + "log" + mrand "math/rand" + "net/http" + _ "net/http/pprof" + "os" + "os/signal" + "path" + "runtime" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/storage" + "github.com/syndtr/goleveldb/leveldb/table" + "github.com/syndtr/goleveldb/leveldb/util" +) + +var ( + dbPath = path.Join(os.TempDir(), "goleveldb-testdb") + openFilesCacheCapacity = 500 + keyLen = 63 + valueLen = 256 + numKeys = arrayInt{100000, 1332, 531, 1234, 9553, 1024, 35743} + httpProf = "127.0.0.1:5454" + transactionProb = 0.5 + enableBlockCache = false + enableCompression = false + enableBufferPool = false + + wg = new(sync.WaitGroup) + done, fail uint32 + + bpool *util.BufferPool +) + +type arrayInt []int + +func (a arrayInt) String() string { + var str string + for i, n := range a { + if i > 0 { + str += "," + } + str += strconv.Itoa(n) + } + return str +} + +func (a *arrayInt) Set(str string) error { + var na arrayInt + for _, s := range strings.Split(str, ",") { + s = strings.TrimSpace(s) + if s != "" { + n, err := strconv.Atoi(s) + if err != nil { + return err + } + na = append(na, n) + } + } + *a = na + return nil +} + +func init() { + flag.StringVar(&dbPath, "db", dbPath, "testdb path") + flag.IntVar(&openFilesCacheCapacity, "openfilescachecap", openFilesCacheCapacity, "open files cache capacity") + flag.IntVar(&keyLen, "keylen", keyLen, "key length") + flag.IntVar(&valueLen, "valuelen", valueLen, "value length") + flag.Var(&numKeys, "numkeys", "num keys") + flag.StringVar(&httpProf, "httpprof", httpProf, "http pprof listen addr") + flag.Float64Var(&transactionProb, "transactionprob", transactionProb, "probablity of writes using transaction") + flag.BoolVar(&enableBufferPool, "enablebufferpool", enableBufferPool, "enable buffer pool") + flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache") + flag.BoolVar(&enableCompression, "enablecompression", enableCompression, "enable block compression") +} + +func randomData(dst []byte, ns, prefix byte, i uint32, dataLen int) []byte { + if dataLen < (2+4+4)*2+4 { + panic("dataLen is too small") + } + if cap(dst) < dataLen { + dst = make([]byte, dataLen) + } else { + dst = dst[:dataLen] + } + half := (dataLen - 4) / 2 + if _, err := rand.Reader.Read(dst[2 : half-8]); err != nil { + panic(err) + } + dst[0] = ns + dst[1] = prefix + binary.LittleEndian.PutUint32(dst[half-8:], i) + binary.LittleEndian.PutUint32(dst[half-8:], i) + binary.LittleEndian.PutUint32(dst[half-4:], util.NewCRC(dst[:half-4]).Value()) + full := half * 2 + copy(dst[half:full], dst[:half]) + if full < dataLen-4 { + if _, err := rand.Reader.Read(dst[full : dataLen-4]); err != nil { + panic(err) + } + } + binary.LittleEndian.PutUint32(dst[dataLen-4:], util.NewCRC(dst[:dataLen-4]).Value()) + return dst +} + +func dataSplit(data []byte) (data0, data1 []byte) { + n := (len(data) - 4) / 2 + return data[:n], data[n : n+n] +} + +func dataNS(data []byte) byte { + return data[0] +} + +func dataPrefix(data []byte) byte { + return data[1] +} + +func dataI(data []byte) uint32 { + return binary.LittleEndian.Uint32(data[(len(data)-4)/2-8:]) +} + +func dataChecksum(data []byte) (uint32, uint32) { + checksum0 := binary.LittleEndian.Uint32(data[len(data)-4:]) + checksum1 := util.NewCRC(data[:len(data)-4]).Value() + return checksum0, checksum1 +} + +func dataPrefixSlice(ns, prefix byte) *util.Range { + return util.BytesPrefix([]byte{ns, prefix}) +} + +func dataNsSlice(ns byte) *util.Range { + return util.BytesPrefix([]byte{ns}) +} + +type testingStorage struct { + storage.Storage +} + +func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) { + r, err := ts.Open(fd) + if err != nil { + log.Fatal(err) + } + defer r.Close() + + size, err := r.Seek(0, os.SEEK_END) + if err != nil { + log.Fatal(err) + } + + o := &opt.Options{ + DisableLargeBatchTransaction: true, + Strict: opt.NoStrict, + } + if checksum { + o.Strict = opt.StrictBlockChecksum | opt.StrictReader + } + tr, err := table.NewReader(r, size, fd, nil, bpool, o) + if err != nil { + log.Fatal(err) + } + defer tr.Release() + + checkData := func(i int, t string, data []byte) bool { + if len(data) == 0 { + panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fd, i, t)) + } + + checksum0, checksum1 := dataChecksum(data) + if checksum0 != checksum1 { + atomic.StoreUint32(&fail, 1) + atomic.StoreUint32(&done, 1) + corrupted = true + + data0, data1 := dataSplit(data) + data0c0, data0c1 := dataChecksum(data0) + data1c0, data1c1 := dataChecksum(data1) + log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)", + fd, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1) + return true + } + return false + } + + iter := tr.NewIterator(nil, nil) + defer iter.Release() + for i := 0; iter.Next(); i++ { + ukey, _, kt, kerr := parseIkey(iter.Key()) + if kerr != nil { + atomic.StoreUint32(&fail, 1) + atomic.StoreUint32(&done, 1) + corrupted = true + + log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr) + return + } + if checkData(i, "key", ukey) { + return + } + if kt == ktVal && checkData(i, "value", iter.Value()) { + return + } + } + if err := iter.Error(); err != nil { + if errors.IsCorrupted(err) { + atomic.StoreUint32(&fail, 1) + atomic.StoreUint32(&done, 1) + corrupted = true + + log.Printf("FATAL: [%v] Corruption detected: %v", fd, err) + } else { + log.Fatal(err) + } + } + + return +} + +func (ts *testingStorage) Remove(fd storage.FileDesc) error { + if atomic.LoadUint32(&fail) == 1 { + return nil + } + + if fd.Type == storage.TypeTable { + if ts.scanTable(fd, true) { + return nil + } + } + return ts.Storage.Remove(fd) +} + +type latencyStats struct { + mark time.Time + dur, min, max time.Duration + num int +} + +func (s *latencyStats) start() { + s.mark = time.Now() +} + +func (s *latencyStats) record(n int) { + if s.mark.IsZero() { + panic("not started") + } + dur := time.Now().Sub(s.mark) + dur1 := dur / time.Duration(n) + if dur1 < s.min || s.min == 0 { + s.min = dur1 + } + if dur1 > s.max { + s.max = dur1 + } + s.dur += dur + s.num += n + s.mark = time.Time{} +} + +func (s *latencyStats) ratePerSec() int { + durSec := s.dur / time.Second + if durSec > 0 { + return s.num / int(durSec) + } + return s.num +} + +func (s *latencyStats) avg() time.Duration { + if s.num > 0 { + return s.dur / time.Duration(s.num) + } + return 0 +} + +func (s *latencyStats) add(x *latencyStats) { + if x.min < s.min || s.min == 0 { + s.min = x.min + } + if x.max > s.max { + s.max = x.max + } + s.dur += x.dur + s.num += x.num +} + +func main() { + flag.Parse() + + if enableBufferPool { + bpool = util.NewBufferPool(opt.DefaultBlockSize + 128) + } + + log.Printf("Test DB stored at %q", dbPath) + if httpProf != "" { + log.Printf("HTTP pprof listening at %q", httpProf) + runtime.SetBlockProfileRate(1) + go func() { + if err := http.ListenAndServe(httpProf, nil); err != nil { + log.Fatalf("HTTPPROF: %v", err) + } + }() + } + + runtime.GOMAXPROCS(runtime.NumCPU()) + + os.RemoveAll(dbPath) + stor, err := storage.OpenFile(dbPath, false) + if err != nil { + log.Fatal(err) + } + tstor := &testingStorage{stor} + defer tstor.Close() + + fatalf := func(err error, format string, v ...interface{}) { + atomic.StoreUint32(&fail, 1) + atomic.StoreUint32(&done, 1) + log.Printf("FATAL: "+format, v...) + if err != nil && errors.IsCorrupted(err) { + cerr := err.(*errors.ErrCorrupted) + if !cerr.Fd.Nil() && cerr.Fd.Type == storage.TypeTable { + log.Print("FATAL: corruption detected, scanning...") + if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) { + log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd) + } + } + } + runtime.Goexit() + } + + if openFilesCacheCapacity == 0 { + openFilesCacheCapacity = -1 + } + o := &opt.Options{ + OpenFilesCacheCapacity: openFilesCacheCapacity, + DisableBufferPool: !enableBufferPool, + DisableBlockCache: !enableBlockCache, + ErrorIfExist: true, + Compression: opt.NoCompression, + } + if enableCompression { + o.Compression = opt.DefaultCompression + } + + db, err := leveldb.Open(tstor, o) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + var ( + mu = &sync.Mutex{} + gGetStat = &latencyStats{} + gIterStat = &latencyStats{} + gWriteStat = &latencyStats{} + gTrasactionStat = &latencyStats{} + startTime = time.Now() + + writeReq = make(chan *leveldb.Batch) + writeAck = make(chan error) + writeAckAck = make(chan struct{}) + ) + + go func() { + for b := range writeReq { + + var err error + if mrand.Float64() < transactionProb { + log.Print("> Write using transaction") + gTrasactionStat.start() + var tr *leveldb.Transaction + if tr, err = db.OpenTransaction(); err == nil { + if err = tr.Write(b, nil); err == nil { + if err = tr.Commit(); err == nil { + gTrasactionStat.record(b.Len()) + } + } else { + tr.Discard() + } + } + } else { + gWriteStat.start() + if err = db.Write(b, nil); err == nil { + gWriteStat.record(b.Len()) + } + } + writeAck <- err + <-writeAckAck + } + }() + + go func() { + for { + time.Sleep(3 * time.Second) + + log.Print("------------------------") + + log.Printf("> Elapsed=%v", time.Now().Sub(startTime)) + mu.Lock() + log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d", + gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec()) + log.Printf("> IterLatencyMin=%v IterLatencyMax=%v IterLatencyAvg=%v IterRatePerSec=%d", + gIterStat.min, gIterStat.max, gIterStat.avg(), gIterStat.ratePerSec()) + log.Printf("> WriteLatencyMin=%v WriteLatencyMax=%v WriteLatencyAvg=%v WriteRatePerSec=%d", + gWriteStat.min, gWriteStat.max, gWriteStat.avg(), gWriteStat.ratePerSec()) + log.Printf("> TransactionLatencyMin=%v TransactionLatencyMax=%v TransactionLatencyAvg=%v TransactionRatePerSec=%d", + gTrasactionStat.min, gTrasactionStat.max, gTrasactionStat.avg(), gTrasactionStat.ratePerSec()) + mu.Unlock() + + cachedblock, _ := db.GetProperty("leveldb.cachedblock") + openedtables, _ := db.GetProperty("leveldb.openedtables") + alivesnaps, _ := db.GetProperty("leveldb.alivesnaps") + aliveiters, _ := db.GetProperty("leveldb.aliveiters") + blockpool, _ := db.GetProperty("leveldb.blockpool") + log.Printf("> BlockCache=%s OpenedTables=%s AliveSnaps=%s AliveIter=%s BlockPool=%q", + cachedblock, openedtables, alivesnaps, aliveiters, blockpool) + + log.Print("------------------------") + } + }() + + for ns, numKey := range numKeys { + func(ns, numKey int) { + log.Printf("[%02d] STARTING: numKey=%d", ns, numKey) + + keys := make([][]byte, numKey) + for i := range keys { + keys[i] = randomData(nil, byte(ns), 1, uint32(i), keyLen) + } + + wg.Add(1) + go func() { + var wi uint32 + defer func() { + log.Printf("[%02d] WRITER DONE #%d", ns, wi) + wg.Done() + }() + + var ( + b = new(leveldb.Batch) + k2, v2 []byte + nReader int32 + ) + for atomic.LoadUint32(&done) == 0 { + log.Printf("[%02d] WRITER #%d", ns, wi) + + b.Reset() + for _, k1 := range keys { + k2 = randomData(k2, byte(ns), 2, wi, keyLen) + v2 = randomData(v2, byte(ns), 3, wi, valueLen) + b.Put(k2, v2) + b.Put(k1, k2) + } + writeReq <- b + if err := <-writeAck; err != nil { + writeAckAck <- struct{}{} + fatalf(err, "[%02d] WRITER #%d db.Write: %v", ns, wi, err) + } + + snap, err := db.GetSnapshot() + if err != nil { + writeAckAck <- struct{}{} + fatalf(err, "[%02d] WRITER #%d db.GetSnapshot: %v", ns, wi, err) + } + + writeAckAck <- struct{}{} + + wg.Add(1) + atomic.AddInt32(&nReader, 1) + go func(snapwi uint32, snap *leveldb.Snapshot) { + var ( + ri int + iterStat = &latencyStats{} + getStat = &latencyStats{} + ) + defer func() { + mu.Lock() + gGetStat.add(getStat) + gIterStat.add(iterStat) + mu.Unlock() + + atomic.AddInt32(&nReader, -1) + log.Printf("[%02d] READER #%d.%d DONE Snap=%v Alive=%d IterLatency=%v GetLatency=%v", ns, snapwi, ri, snap, atomic.LoadInt32(&nReader), iterStat.avg(), getStat.avg()) + snap.Release() + wg.Done() + }() + + stopi := snapwi + 3 + for (ri < 3 || atomic.LoadUint32(&wi) < stopi) && atomic.LoadUint32(&done) == 0 { + var n int + iter := snap.NewIterator(dataPrefixSlice(byte(ns), 1), nil) + iterStat.start() + for iter.Next() { + k1 := iter.Key() + k2 := iter.Value() + iterStat.record(1) + + if dataNS(k2) != byte(ns) { + fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key NS: want=%d got=%d", ns, snapwi, ri, n, ns, dataNS(k2)) + } + + kwritei := dataI(k2) + if kwritei != snapwi { + fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key iter num: %d", ns, snapwi, ri, n, kwritei) + } + + getStat.start() + v2, err := snap.Get(k2, nil) + if err != nil { + fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2) + } + getStat.record(1) + + if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 { + err := &errors.ErrCorrupted{Fd: storage.FileDesc{0xff, 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)} + fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2) + } + + n++ + iterStat.start() + } + iter.Release() + if err := iter.Error(); err != nil { + fatalf(err, "[%02d] READER #%d.%d K%d iter.Error: %v", ns, snapwi, ri, numKey, err) + } + if n != numKey { + fatalf(nil, "[%02d] READER #%d.%d missing keys: want=%d got=%d", ns, snapwi, ri, numKey, n) + } + + ri++ + } + }(wi, snap) + + atomic.AddUint32(&wi, 1) + } + }() + + delB := new(leveldb.Batch) + wg.Add(1) + go func() { + var ( + i int + iterStat = &latencyStats{} + ) + defer func() { + log.Printf("[%02d] SCANNER DONE #%d", ns, i) + wg.Done() + }() + + time.Sleep(2 * time.Second) + + for atomic.LoadUint32(&done) == 0 { + var n int + delB.Reset() + iter := db.NewIterator(dataNsSlice(byte(ns)), nil) + iterStat.start() + for iter.Next() && atomic.LoadUint32(&done) == 0 { + k := iter.Key() + v := iter.Value() + iterStat.record(1) + + for ci, x := range [...][]byte{k, v} { + checksum0, checksum1 := dataChecksum(x) + if checksum0 != checksum1 { + if ci == 0 { + fatalf(nil, "[%02d] SCANNER %d.%d invalid key checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v) + } else { + fatalf(nil, "[%02d] SCANNER %d.%d invalid value checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v) + } + } + } + + if dataPrefix(k) == 2 || mrand.Int()%999 == 0 { + delB.Delete(k) + } + + n++ + iterStat.start() + } + iter.Release() + if err := iter.Error(); err != nil { + fatalf(err, "[%02d] SCANNER #%d.%d iter.Error: %v", ns, i, n, err) + } + + if n > 0 { + log.Printf("[%02d] SCANNER #%d IterLatency=%v", ns, i, iterStat.avg()) + } + + if delB.Len() > 0 && atomic.LoadUint32(&done) == 0 { + t := time.Now() + writeReq <- delB + if err := <-writeAck; err != nil { + writeAckAck <- struct{}{} + fatalf(err, "[%02d] SCANNER #%d db.Write: %v", ns, i, err) + } else { + writeAckAck <- struct{}{} + } + log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Now().Sub(t)) + } + + i++ + } + }() + }(ns, numKey) + } + + go func() { + sig := make(chan os.Signal) + signal.Notify(sig, os.Interrupt, os.Kill) + log.Printf("Got signal: %v, exiting...", <-sig) + atomic.StoreUint32(&done, 1) + }() + + wg.Wait() +} diff --git a/vendor/github.com/syndtr/goleveldb/manualtest/filelock/main.go b/vendor/github.com/syndtr/goleveldb/manualtest/filelock/main.go new file mode 100644 index 000000000..192951fac --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/manualtest/filelock/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "bufio" + "bytes" + "flag" + "fmt" + "os" + "os/exec" + "path/filepath" + + "github.com/syndtr/goleveldb/leveldb/storage" +) + +var ( + filename string + child bool +) + +func init() { + flag.StringVar(&filename, "filename", filepath.Join(os.TempDir(), "goleveldb_filelock_test"), "Filename used for testing") + flag.BoolVar(&child, "child", false, "This is the child") +} + +func runChild() error { + var args []string + args = append(args, os.Args[1:]...) + args = append(args, "-child") + cmd := exec.Command(os.Args[0], args...) + var out bytes.Buffer + cmd.Stdout = &out + err := cmd.Run() + r := bufio.NewReader(&out) + for { + line, _, e1 := r.ReadLine() + if e1 != nil { + break + } + fmt.Println("[Child]", string(line)) + } + return err +} + +func main() { + flag.Parse() + + fmt.Printf("Using path: %s\n", filename) + if child { + fmt.Println("Child flag set.") + } + + stor, err := storage.OpenFile(filename, false) + if err != nil { + fmt.Printf("Could not open storage: %s", err) + os.Exit(10) + } + + if !child { + fmt.Println("Executing child -- first test (expecting error)") + err := runChild() + if err == nil { + fmt.Println("Expecting error from child") + } else if err.Error() != "exit status 10" { + fmt.Println("Got unexpected error from child:", err) + } else { + fmt.Printf("Got error from child: %s (expected)\n", err) + } + } + + err = stor.Close() + if err != nil { + fmt.Printf("Error when closing storage: %s", err) + os.Exit(11) + } + + if !child { + fmt.Println("Executing child -- second test") + err := runChild() + if err != nil { + fmt.Println("Got unexpected error from child:", err) + } + } + + os.RemoveAll(filename) +} diff --git a/vendor/manifest b/vendor/manifest index 950b91c8e..62199ce00 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -73,6 +73,12 @@ "revision": "91dfa3a59c5b233fef9a346a1460f6e2bc889d93", "branch": "master" }, + { + "importpath": "github.com/syndtr/goleveldb", + "repository": "https://github.com/syndtr/goleveldb", + "revision": "917f41c560270110ceb73c5b38be2a9127387071", + "branch": "master" + }, { "importpath": "github.com/syndtr/goleveldb/leveldb", "repository": "https://github.com/syndtr/goleveldb",