2017-03-07 11:57:45 +01:00

1136 lines
27 KiB
Go

// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package table
import (
"encoding/binary"
"fmt"
"io"
"sort"
"strings"
"sync"
"github.com/golang/snappy"
"github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
// Reader errors.
var (
ErrNotFound = errors.ErrNotFound
ErrReaderReleased = errors.New("leveldb/table: reader released")
ErrIterReleased = errors.New("leveldb/table: iterator released")
)
// ErrCorrupted describes error due to corruption. This error will be wrapped
// with errors.ErrCorrupted.
type ErrCorrupted struct {
Pos int64
Size int64
Kind string
Reason string
}
func (e *ErrCorrupted) Error() string {
return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason)
}
func max(x, y int) int {
if x > y {
return x
}
return y
}
type block struct {
bpool *util.BufferPool
bh blockHandle
data []byte
restartsLen int
restartsOffset int
}
func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
offset++ // shared always zero, since this is a restart point
v1, n1 := binary.Uvarint(b.data[offset:]) // key length
_, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
m := offset + n1 + n2
return cmp.Compare(b.data[m:m+int(v1)], key) > 0
}) + rstart - 1
if index < rstart {
// The smallest key is greater-than key sought.
index = rstart
}
offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
return
}
func (b *block) restartIndex(rstart, rlimit, offset int) int {
return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
}) + rstart - 1
}
func (b *block) restartOffset(index int) int {
return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
}
func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) {
if offset >= b.restartsOffset {
if offset != b.restartsOffset {
err = &ErrCorrupted{Reason: "entries offset not aligned"}
}
return
}
v0, n0 := binary.Uvarint(b.data[offset:]) // Shared prefix length
v1, n1 := binary.Uvarint(b.data[offset+n0:]) // Key length
v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length
m := n0 + n1 + n2
n = m + int(v1) + int(v2)
if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset {
err = &ErrCorrupted{Reason: "entries corrupted"}
return
}
key = b.data[offset+m : offset+m+int(v1)]
value = b.data[offset+m+int(v1) : offset+n]
nShared = int(v0)
return
}
func (b *block) Release() {
b.bpool.Put(b.data)
b.bpool = nil
b.data = nil
}
type dir int
const (
dirReleased dir = iota - 1
dirSOI
dirEOI
dirBackward
dirForward
)
type blockIter struct {
tr *Reader
block *block
blockReleaser util.Releaser
releaser util.Releaser
key, value []byte
offset int
// Previous offset, only filled by Next.
prevOffset int
prevNode []int
prevKeys []byte
restartIndex int
// Iterator direction.
dir dir
// Restart index slice range.
riStart int
riLimit int
// Offset slice range.
offsetStart int
offsetRealStart int
offsetLimit int
// Error.
err error
}
func (i *blockIter) sErr(err error) {
i.err = err
i.key = nil
i.value = nil
i.prevNode = nil
i.prevKeys = nil
}
func (i *blockIter) reset() {
if i.dir == dirBackward {
i.prevNode = i.prevNode[:0]
i.prevKeys = i.prevKeys[:0]
}
i.restartIndex = i.riStart
i.offset = i.offsetStart
i.dir = dirSOI
i.key = i.key[:0]
i.value = nil
}
func (i *blockIter) isFirst() bool {
switch i.dir {
case dirForward:
return i.prevOffset == i.offsetRealStart
case dirBackward:
return len(i.prevNode) == 1 && i.restartIndex == i.riStart
}
return false
}
func (i *blockIter) isLast() bool {
switch i.dir {
case dirForward, dirBackward:
return i.offset == i.offsetLimit
}
return false
}
func (i *blockIter) First() bool {
if i.err != nil {
return false
} else if i.dir == dirReleased {
i.err = ErrIterReleased
return false
}
if i.dir == dirBackward {
i.prevNode = i.prevNode[:0]
i.prevKeys = i.prevKeys[:0]
}
i.dir = dirSOI
return i.Next()
}
func (i *blockIter) Last() bool {
if i.err != nil {
return false
} else if i.dir == dirReleased {
i.err = ErrIterReleased
return false
}
if i.dir == dirBackward {
i.prevNode = i.prevNode[:0]
i.prevKeys = i.prevKeys[:0]
}
i.dir = dirEOI
return i.Prev()
}
func (i *blockIter) Seek(key []byte) bool {
if i.err != nil {
return false
} else if i.dir == dirReleased {
i.err = ErrIterReleased
return false
}
ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key)
if err != nil {
i.sErr(err)
return false
}
i.restartIndex = ri
i.offset = max(i.offsetStart, offset)
if i.dir == dirSOI || i.dir == dirEOI {
i.dir = dirForward
}
for i.Next() {
if i.tr.cmp.Compare(i.key, key) >= 0 {
return true
}
}
return false
}
func (i *blockIter) Next() bool {
if i.dir == dirEOI || i.err != nil {
return false
} else if i.dir == dirReleased {
i.err = ErrIterReleased
return false
}
if i.dir == dirSOI {
i.restartIndex = i.riStart
i.offset = i.offsetStart
} else if i.dir == dirBackward {
i.prevNode = i.prevNode[:0]
i.prevKeys = i.prevKeys[:0]
}
for i.offset < i.offsetRealStart {
key, value, nShared, n, err := i.block.entry(i.offset)
if err != nil {
i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
return false
}
if n == 0 {
i.dir = dirEOI
return false
}
i.key = append(i.key[:nShared], key...)
i.value = value
i.offset += n
}
if i.offset >= i.offsetLimit {
i.dir = dirEOI
if i.offset != i.offsetLimit {
i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
}
return false
}
key, value, nShared, n, err := i.block.entry(i.offset)
if err != nil {
i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
return false
}
if n == 0 {
i.dir = dirEOI
return false
}
i.key = append(i.key[:nShared], key...)
i.value = value
i.prevOffset = i.offset
i.offset += n
i.dir = dirForward
return true
}
func (i *blockIter) Prev() bool {
if i.dir == dirSOI || i.err != nil {
return false
} else if i.dir == dirReleased {
i.err = ErrIterReleased
return false
}
var ri int
if i.dir == dirForward {
// Change direction.
i.offset = i.prevOffset
if i.offset == i.offsetRealStart {
i.dir = dirSOI
return false
}
ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset)
i.dir = dirBackward
} else if i.dir == dirEOI {
// At the end of iterator.
i.restartIndex = i.riLimit
i.offset = i.offsetLimit
if i.offset == i.offsetRealStart {
i.dir = dirSOI
return false
}
ri = i.riLimit - 1
i.dir = dirBackward
} else if len(i.prevNode) == 1 {
// This is the end of a restart range.
i.offset = i.prevNode[0]
i.prevNode = i.prevNode[:0]
if i.restartIndex == i.riStart {
i.dir = dirSOI
return false
}
i.restartIndex--
ri = i.restartIndex
} else {
// In the middle of restart range, get from cache.
n := len(i.prevNode) - 3
node := i.prevNode[n:]
i.prevNode = i.prevNode[:n]
// Get the key.
ko := node[0]
i.key = append(i.key[:0], i.prevKeys[ko:]...)
i.prevKeys = i.prevKeys[:ko]
// Get the value.
vo := node[1]
vl := vo + node[2]
i.value = i.block.data[vo:vl]
i.offset = vl
return true
}
// Build entries cache.
i.key = i.key[:0]
i.value = nil
offset := i.block.restartOffset(ri)
if offset == i.offset {
ri--
if ri < 0 {
i.dir = dirSOI
return false
}
offset = i.block.restartOffset(ri)
}
i.prevNode = append(i.prevNode, offset)
for {
key, value, nShared, n, err := i.block.entry(offset)
if err != nil {
i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
return false
}
if offset >= i.offsetRealStart {
if i.value != nil {
// Appends 3 variables:
// 1. Previous keys offset
// 2. Value offset in the data block
// 3. Value length
i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value))
i.prevKeys = append(i.prevKeys, i.key...)
}
i.value = value
}
i.key = append(i.key[:nShared], key...)
offset += n
// Stop if target offset reached.
if offset >= i.offset {
if offset != i.offset {
i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
return false
}
break
}
}
i.restartIndex = ri
i.offset = offset
return true
}
func (i *blockIter) Key() []byte {
if i.err != nil || i.dir <= dirEOI {
return nil
}
return i.key
}
func (i *blockIter) Value() []byte {
if i.err != nil || i.dir <= dirEOI {
return nil
}
return i.value
}
func (i *blockIter) Release() {
if i.dir != dirReleased {
i.tr = nil
i.block = nil
i.prevNode = nil
i.prevKeys = nil
i.key = nil
i.value = nil
i.dir = dirReleased
if i.blockReleaser != nil {
i.blockReleaser.Release()
i.blockReleaser = nil
}
if i.releaser != nil {
i.releaser.Release()
i.releaser = nil
}
}
}
func (i *blockIter) SetReleaser(releaser util.Releaser) {
if i.dir == dirReleased {
panic(util.ErrReleased)
}
if i.releaser != nil && releaser != nil {
panic(util.ErrHasReleaser)
}
i.releaser = releaser
}
func (i *blockIter) Valid() bool {
return i.err == nil && (i.dir == dirBackward || i.dir == dirForward)
}
func (i *blockIter) Error() error {
return i.err
}
type filterBlock struct {
bpool *util.BufferPool
data []byte
oOffset int
baseLg uint
filtersNum int
}
func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool {
i := int(offset >> b.baseLg)
if i < b.filtersNum {
o := b.data[b.oOffset+i*4:]
n := int(binary.LittleEndian.Uint32(o))
m := int(binary.LittleEndian.Uint32(o[4:]))
if n < m && m <= b.oOffset {
return filter.Contains(b.data[n:m], key)
} else if n == m {
return false
}
}
return true
}
func (b *filterBlock) Release() {
b.bpool.Put(b.data)
b.bpool = nil
b.data = nil
}
type indexIter struct {
*blockIter
tr *Reader
slice *util.Range
// Options
fillCache bool
}
func (i *indexIter) Get() iterator.Iterator {
value := i.Value()
if value == nil {
return nil
}
dataBH, n := decodeBlockHandle(value)
if n == 0 {
return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle"))
}
var slice *util.Range
if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
slice = i.slice
}
return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache)
}
// Reader is a table reader.
type Reader struct {
mu sync.RWMutex
fd storage.FileDesc
reader io.ReaderAt
cache *cache.NamespaceGetter
err error
bpool *util.BufferPool
// Options
o *opt.Options
cmp comparer.Comparer
filter filter.Filter
verifyChecksum bool
dataEnd int64
metaBH, indexBH, filterBH blockHandle
indexBlock *block
filterBlock *filterBlock
}
func (r *Reader) blockKind(bh blockHandle) string {
switch bh.offset {
case r.metaBH.offset:
return "meta-block"
case r.indexBH.offset:
return "index-block"
case r.filterBH.offset:
if r.filterBH.length > 0 {
return "filter-block"
}
}
return "data-block"
}
func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error {
return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
}
func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error {
return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason)
}
func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error {
if cerr, ok := err.(*ErrCorrupted); ok {
cerr.Pos = int64(bh.offset)
cerr.Size = int64(bh.length)
cerr.Kind = r.blockKind(bh)
return &errors.ErrCorrupted{Fd: r.fd, Err: cerr}
}
return err
}
func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) {
data := r.bpool.Get(int(bh.length + blockTrailerLen))
if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF {
return nil, err
}
if verifyChecksum {
n := bh.length + 1
checksum0 := binary.LittleEndian.Uint32(data[n:])
checksum1 := util.NewCRC(data[:n]).Value()
if checksum0 != checksum1 {
r.bpool.Put(data)
return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1))
}
}
switch data[bh.length] {
case blockTypeNoCompression:
data = data[:bh.length]
case blockTypeSnappyCompression:
decLen, err := snappy.DecodedLen(data[:bh.length])
if err != nil {
r.bpool.Put(data)
return nil, r.newErrCorruptedBH(bh, err.Error())
}
decData := r.bpool.Get(decLen)
decData, err = snappy.Decode(decData, data[:bh.length])
r.bpool.Put(data)
if err != nil {
r.bpool.Put(decData)
return nil, r.newErrCorruptedBH(bh, err.Error())
}
data = decData
default:
r.bpool.Put(data)
return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length]))
}
return data, nil
}
func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) {
data, err := r.readRawBlock(bh, verifyChecksum)
if err != nil {
return nil, err
}
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
b := &block{
bpool: r.bpool,
bh: bh,
data: data,
restartsLen: restartsLen,
restartsOffset: len(data) - (restartsLen+1)*4,
}
return b, nil
}
func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) {
if r.cache != nil {
var (
err error
ch *cache.Handle
)
if fillCache {
ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
var b *block
b, err = r.readBlock(bh, verifyChecksum)
if err != nil {
return 0, nil
}
return cap(b.data), b
})
} else {
ch = r.cache.Get(bh.offset, nil)
}
if ch != nil {
b, ok := ch.Value().(*block)
if !ok {
ch.Release()
return nil, nil, errors.New("leveldb/table: inconsistent block type")
}
return b, ch, err
} else if err != nil {
return nil, nil, err
}
}
b, err := r.readBlock(bh, verifyChecksum)
return b, b, err
}
func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
data, err := r.readRawBlock(bh, true)
if err != nil {
return nil, err
}
n := len(data)
if n < 5 {
return nil, r.newErrCorruptedBH(bh, "too short")
}
m := n - 5
oOffset := int(binary.LittleEndian.Uint32(data[m:]))
if oOffset > m {
return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset")
}
b := &filterBlock{
bpool: r.bpool,
data: data,
oOffset: oOffset,
baseLg: uint(data[n-1]),
filtersNum: (m - oOffset) / 4,
}
return b, nil
}
func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
if r.cache != nil {
var (
err error
ch *cache.Handle
)
if fillCache {
ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
var b *filterBlock
b, err = r.readFilterBlock(bh)
if err != nil {
return 0, nil
}
return cap(b.data), b
})
} else {
ch = r.cache.Get(bh.offset, nil)
}
if ch != nil {
b, ok := ch.Value().(*filterBlock)
if !ok {
ch.Release()
return nil, nil, errors.New("leveldb/table: inconsistent block type")
}
return b, ch, err
} else if err != nil {
return nil, nil, err
}
}
b, err := r.readFilterBlock(bh)
return b, b, err
}
func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) {
if r.indexBlock == nil {
return r.readBlockCached(r.indexBH, true, fillCache)
}
return r.indexBlock, util.NoopReleaser{}, nil
}
func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) {
if r.filterBlock == nil {
return r.readFilterBlockCached(r.filterBH, fillCache)
}
return r.filterBlock, util.NoopReleaser{}, nil
}
func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter {
bi := &blockIter{
tr: r,
block: b,
blockReleaser: bReleaser,
// Valid key should never be nil.
key: make([]byte, 0),
dir: dirSOI,
riStart: 0,
riLimit: b.restartsLen,
offsetStart: 0,
offsetRealStart: 0,
offsetLimit: b.restartsOffset,
}
if slice != nil {
if slice.Start != nil {
if bi.Seek(slice.Start) {
bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
bi.offsetStart = b.restartOffset(bi.riStart)
bi.offsetRealStart = bi.prevOffset
} else {
bi.riStart = b.restartsLen
bi.offsetStart = b.restartsOffset
bi.offsetRealStart = b.restartsOffset
}
}
if slice.Limit != nil {
if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
bi.offsetLimit = bi.prevOffset
bi.riLimit = bi.restartIndex + 1
}
}
bi.reset()
if bi.offsetStart > bi.offsetLimit {
bi.sErr(errors.New("leveldb/table: invalid slice range"))
}
}
return bi
}
func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache)
if err != nil {
return iterator.NewEmptyIterator(err)
}
return r.newBlockIter(b, rel, slice, false)
}
func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
return iterator.NewEmptyIterator(r.err)
}
return r.getDataIter(dataBH, slice, verifyChecksum, fillCache)
}
// NewIterator creates an iterator from the table.
//
// Slice allows slicing the iterator to only contains keys in the given
// range. A nil Range.Start is treated as a key before all keys in the
// table. And a nil Range.Limit is treated as a key after all keys in
// the table.
//
// The returned iterator is not safe for concurrent use and should be released
// after use.
//
// Also read Iterator documentation of the leveldb/iterator package.
func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
return iterator.NewEmptyIterator(r.err)
}
fillCache := !ro.GetDontFillCache()
indexBlock, rel, err := r.getIndexBlock(fillCache)
if err != nil {
return iterator.NewEmptyIterator(err)
}
index := &indexIter{
blockIter: r.newBlockIter(indexBlock, rel, slice, true),
tr: r,
slice: slice,
fillCache: !ro.GetDontFillCache(),
}
return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader))
}
func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
err = r.err
return
}
indexBlock, rel, err := r.getIndexBlock(true)
if err != nil {
return
}
defer rel.Release()
index := r.newBlockIter(indexBlock, nil, nil, true)
defer index.Release()
if !index.Seek(key) {
if err = index.Error(); err == nil {
err = ErrNotFound
}
return
}
dataBH, n := decodeBlockHandle(index.Value())
if n == 0 {
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
return nil, nil, r.err
}
// The filter should only used for exact match.
if filtered && r.filter != nil {
filterBlock, frel, ferr := r.getFilterBlock(true)
if ferr == nil {
if !filterBlock.contains(r.filter, dataBH.offset, key) {
frel.Release()
return nil, nil, ErrNotFound
}
frel.Release()
} else if !errors.IsCorrupted(ferr) {
return nil, nil, ferr
}
}
data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
if !data.Seek(key) {
data.Release()
if err = data.Error(); err != nil {
return
}
// The nearest greater-than key is the first key of the next block.
if !index.Next() {
if err = index.Error(); err == nil {
err = ErrNotFound
}
return
}
dataBH, n = decodeBlockHandle(index.Value())
if n == 0 {
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
return nil, nil, r.err
}
data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
if !data.Next() {
data.Release()
if err = data.Error(); err == nil {
err = ErrNotFound
}
return
}
}
// Key doesn't use block buffer, no need to copy the buffer.
rkey = data.Key()
if !noValue {
if r.bpool == nil {
value = data.Value()
} else {
// Value does use block buffer, and since the buffer will be
// recycled, it need to be copied.
value = append([]byte{}, data.Value()...)
}
}
data.Release()
return
}
// Find finds key/value pair whose key is greater than or equal to the
// given key. It returns ErrNotFound if the table doesn't contain
// such pair.
// If filtered is true then the nearest 'block' will be checked against
// 'filter data' (if present) and will immediately return ErrNotFound if
// 'filter data' indicates that such pair doesn't exist.
//
// The caller may modify the contents of the returned slice as it is its
// own copy.
// It is safe to modify the contents of the argument after Find returns.
func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) {
return r.find(key, filtered, ro, false)
}
// FindKey finds key that is greater than or equal to the given key.
// It returns ErrNotFound if the table doesn't contain such key.
// If filtered is true then the nearest 'block' will be checked against
// 'filter data' (if present) and will immediately return ErrNotFound if
// 'filter data' indicates that such key doesn't exist.
//
// The caller may modify the contents of the returned slice as it is its
// own copy.
// It is safe to modify the contents of the argument after Find returns.
func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) {
rkey, _, err = r.find(key, filtered, ro, true)
return
}
// Get gets the value for the given key. It returns errors.ErrNotFound
// if the table does not contain the key.
//
// The caller may modify the contents of the returned slice as it is its
// own copy.
// It is safe to modify the contents of the argument after Find returns.
func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
err = r.err
return
}
rkey, value, err := r.find(key, false, ro, false)
if err == nil && r.cmp.Compare(rkey, key) != 0 {
value = nil
err = ErrNotFound
}
return
}
// OffsetOf returns approximate offset for the given key.
//
// It is safe to modify the contents of the argument after Get returns.
func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
err = r.err
return
}
indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
if err != nil {
return
}
defer rel.Release()
index := r.newBlockIter(indexBlock, nil, nil, true)
defer index.Release()
if index.Seek(key) {
dataBH, n := decodeBlockHandle(index.Value())
if n == 0 {
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
return
}
offset = int64(dataBH.offset)
return
}
err = index.Error()
if err == nil {
offset = r.dataEnd
}
return
}
// Release implements util.Releaser.
// It also close the file if it is an io.Closer.
func (r *Reader) Release() {
r.mu.Lock()
defer r.mu.Unlock()
if closer, ok := r.reader.(io.Closer); ok {
closer.Close()
}
if r.indexBlock != nil {
r.indexBlock.Release()
r.indexBlock = nil
}
if r.filterBlock != nil {
r.filterBlock.Release()
r.filterBlock = nil
}
r.reader = nil
r.cache = nil
r.bpool = nil
r.err = ErrReaderReleased
}
// NewReader creates a new initialized table reader for the file.
// The fi, cache and bpool is optional and can be nil.
//
// The returned table reader instance is safe for concurrent use.
func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
if f == nil {
return nil, errors.New("leveldb/table: nil file")
}
r := &Reader{
fd: fd,
reader: f,
cache: cache,
bpool: bpool,
o: o,
cmp: o.GetComparer(),
verifyChecksum: o.GetStrict(opt.StrictBlockChecksum),
}
if size < footerLen {
r.err = r.newErrCorrupted(0, size, "table", "too small")
return r, nil
}
footerPos := size - footerLen
var footer [footerLen]byte
if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF {
return nil, err
}
if string(footer[footerLen-len(magic):footerLen]) != magic {
r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number")
return r, nil
}
var n int
// Decode the metaindex block handle.
r.metaBH, n = decodeBlockHandle(footer[:])
if n == 0 {
r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle")
return r, nil
}
// Decode the index block handle.
r.indexBH, n = decodeBlockHandle(footer[n:])
if n == 0 {
r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle")
return r, nil
}
// Read metaindex block.
metaBlock, err := r.readBlock(r.metaBH, true)
if err != nil {
if errors.IsCorrupted(err) {
r.err = err
return r, nil
}
return nil, err
}
// Set data end.
r.dataEnd = int64(r.metaBH.offset)
// Read metaindex.
metaIter := r.newBlockIter(metaBlock, nil, nil, true)
for metaIter.Next() {
key := string(metaIter.Key())
if !strings.HasPrefix(key, "filter.") {
continue
}
fn := key[7:]
if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
r.filter = f0
} else {
for _, f0 := range o.GetAltFilters() {
if f0.Name() == fn {
r.filter = f0
break
}
}
}
if r.filter != nil {
filterBH, n := decodeBlockHandle(metaIter.Value())
if n == 0 {
continue
}
r.filterBH = filterBH
// Update data end.
r.dataEnd = int64(filterBH.offset)
break
}
}
metaIter.Release()
metaBlock.Release()
// Cache index and filter block locally, since we don't have global cache.
if cache == nil {
r.indexBlock, err = r.readBlock(r.indexBH, true)
if err != nil {
if errors.IsCorrupted(err) {
r.err = err
return r, nil
}
return nil, err
}
if r.filter != nil {
r.filterBlock, err = r.readFilterBlock(r.filterBH)
if err != nil {
if !errors.IsCorrupted(err) {
return nil, err
}
// Don't use filter then.
r.filter = nil
}
}
}
return r, nil
}