Update goleveldb

This commit is contained in:
Jakob Borg 2014-08-12 09:24:36 +02:00
parent 97dda6a4bb
commit adcbe13ecd
5 changed files with 214 additions and 61 deletions

2
Godeps/Godeps.json generated
View File

@ -49,7 +49,7 @@
}, },
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "c5955912e3287376475731c5bc59c79a5a799105" "Rev": "fcb3916f495c513b7eab70340f030e942e67b7da"
}, },
{ {
"ImportPath": "github.com/vitrun/qart/coding", "ImportPath": "github.com/vitrun/qart/coding",

View File

@ -481,10 +481,11 @@ func (db *DB) recoverJournal() error {
buf.Reset() buf.Reset()
if _, err := buf.ReadFrom(r); err != nil { if _, err := buf.ReadFrom(r); err != nil {
if strict { if err == io.ErrUnexpectedEOF {
continue
} else {
return err return err
} }
continue
} }
if err := batch.decode(buf.Bytes()); err != nil { if err := batch.decode(buf.Bytes()); err != nil {
return err return err

View File

@ -103,18 +103,18 @@ type flusher interface {
Flush() error Flush() error
} }
// DroppedError is the error type that passed to Dropper.Drop method. // ErrCorrupted is the error type that generated by corrupted block or chunk.
type DroppedError struct { type ErrCorrupted struct {
Size int Size int
Reason string Reason string
} }
func (e DroppedError) Error() string { func (e ErrCorrupted) Error() string {
return fmt.Sprintf("leveldb/journal: dropped %d bytes: %s", e.Size, e.Reason) return fmt.Sprintf("leveldb/journal: corrupted %d bytes: %s", e.Size, e.Reason)
} }
// Dropper is the interface that wrap simple Drop method. The Drop // Dropper is the interface that wrap simple Drop method. The Drop
// method will be called when the journal reader dropping a chunk. // method will be called when the journal reader dropping a block or chunk.
type Dropper interface { type Dropper interface {
Drop(err error) Drop(err error)
} }
@ -158,76 +158,78 @@ func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
} }
} }
var errSkip = errors.New("leveldb/journal: skipped")
func (r *Reader) corrupt(n int, reason string, skip bool) error {
if r.dropper != nil {
r.dropper.Drop(ErrCorrupted{n, reason})
}
if r.strict && !skip {
r.err = ErrCorrupted{n, reason}
return r.err
}
return errSkip
}
// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
// next block into the buffer if necessary. // next block into the buffer if necessary.
func (r *Reader) nextChunk(wantFirst, skip bool) error { func (r *Reader) nextChunk(first bool) error {
for { for {
if r.j+headerSize <= r.n { if r.j+headerSize <= r.n {
checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4]) checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6]) length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
chunkType := r.buf[r.j+6] chunkType := r.buf[r.j+6]
var err error
if checksum == 0 && length == 0 && chunkType == 0 { if checksum == 0 && length == 0 && chunkType == 0 {
// Drop entire block. // Drop entire block.
err = DroppedError{r.n - r.j, "zero header"} m := r.n - r.j
r.i = r.n r.i = r.n
r.j = r.n r.j = r.n
return r.corrupt(m, "zero header", false)
} else { } else {
m := r.n - r.j m := r.n - r.j
r.i = r.j + headerSize r.i = r.j + headerSize
r.j = r.j + headerSize + int(length) r.j = r.j + headerSize + int(length)
if r.j > r.n { if r.j > r.n {
// Drop entire block. // Drop entire block.
err = DroppedError{m, "chunk length overflows block"}
r.i = r.n r.i = r.n
r.j = r.n r.j = r.n
return r.corrupt(m, "chunk length overflows block", false)
} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
// Drop entire block. // Drop entire block.
err = DroppedError{m, "checksum mismatch"}
r.i = r.n r.i = r.n
r.j = r.n r.j = r.n
return r.corrupt(m, "checksum mismatch", false)
} }
} }
if wantFirst && err == nil && chunkType != fullChunkType && chunkType != firstChunkType { if first && chunkType != fullChunkType && chunkType != firstChunkType {
if skip { m := r.j - r.i
// The chunk are intentionally skipped. r.i = r.j
if chunkType == lastChunkType { // Report the error, but skip it.
skip = false return r.corrupt(m+headerSize, "orphan chunk", true)
} }
continue
} else {
// Drop the chunk.
err = DroppedError{r.j - r.i + headerSize, "orphan chunk"}
}
}
if err == nil {
r.last = chunkType == fullChunkType || chunkType == lastChunkType r.last = chunkType == fullChunkType || chunkType == lastChunkType
} else { return nil
if r.dropper != nil {
r.dropper.Drop(err)
} }
if r.strict {
r.err = err // The last block.
if r.n < blockSize && r.n > 0 {
if !first {
return r.corrupt(0, "missing chunk part", false)
} }
r.err = io.EOF
return r.err
} }
// Read block.
n, err := io.ReadFull(r.r, r.buf[:])
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return err return err
} }
if r.n < blockSize && r.n > 0 {
// This is the last block.
if r.j != r.n {
r.err = io.ErrUnexpectedEOF
} else {
r.err = io.EOF
}
return r.err
}
n, err := io.ReadFull(r.r, r.buf[:])
if err != nil && err != io.ErrUnexpectedEOF {
r.err = err
return r.err
}
if n == 0 { if n == 0 {
if !first {
return r.corrupt(0, "missing chunk part", false)
}
r.err = io.EOF r.err = io.EOF
return r.err return r.err
} }
@ -237,29 +239,26 @@ func (r *Reader) nextChunk(wantFirst, skip bool) error {
// Next returns a reader for the next journal. It returns io.EOF if there are no // Next returns a reader for the next journal. It returns io.EOF if there are no
// more journals. The reader returned becomes stale after the next Next call, // more journals. The reader returned becomes stale after the next Next call,
// and should no longer be used. // and should no longer be used. If strict is false, the reader will returns
// io.ErrUnexpectedEOF error when found corrupted journal.
func (r *Reader) Next() (io.Reader, error) { func (r *Reader) Next() (io.Reader, error) {
r.seq++ r.seq++
if r.err != nil { if r.err != nil {
return nil, r.err return nil, r.err
} }
skip := !r.last
for {
r.i = r.j r.i = r.j
if r.nextChunk(true, skip) != nil { for {
// So that 'orphan chunk' drop will be reported. if err := r.nextChunk(true); err == nil {
skip = false
} else {
break break
} } else if err != errSkip {
if r.err != nil { return nil, err
return nil, r.err
} }
} }
return &singleReader{r, r.seq, nil}, nil return &singleReader{r, r.seq, nil}, nil
} }
// Reset resets the journal reader, allows reuse of the journal reader. // Reset resets the journal reader, allows reuse of the journal reader. Reset returns
// last accumulated error.
func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error { func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
r.seq++ r.seq++
err := r.err err := r.err
@ -296,7 +295,11 @@ func (x *singleReader) Read(p []byte) (int, error) {
if r.last { if r.last {
return 0, io.EOF return 0, io.EOF
} }
if x.err = r.nextChunk(false, false); x.err != nil { x.err = r.nextChunk(false)
if x.err != nil {
if x.err == errSkip {
x.err = io.ErrUnexpectedEOF
}
return 0, x.err return 0, x.err
} }
} }
@ -320,7 +323,11 @@ func (x *singleReader) ReadByte() (byte, error) {
if r.last { if r.last {
return 0, io.EOF return 0, io.EOF
} }
if x.err = r.nextChunk(false, false); x.err != nil { x.err = r.nextChunk(false)
if x.err != nil {
if x.err == errSkip {
x.err = io.ErrUnexpectedEOF
}
return 0, x.err return 0, x.err
} }
} }

View File

@ -326,3 +326,148 @@ func TestStaleWriter(t *testing.T) {
t.Fatalf("stale write #1: unexpected error: %v", err) t.Fatalf("stale write #1: unexpected error: %v", err)
} }
} }
func TestCorrupt_MissingLastBlock(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(buf)
// First record.
ww, err := w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-1024)); err != nil {
t.Fatalf("write #0: unexpected error: %v", err)
}
// Second record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil {
t.Fatalf("write #1: unexpected error: %v", err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
// Cut the last block.
b := buf.Bytes()[:blockSize]
r := NewReader(bytes.NewReader(b), dropper{t}, false, true)
// First read.
rr, err := r.Next()
if err != nil {
t.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #0: %v", err)
}
if n != blockSize-1024 {
t.Fatalf("read #0: got %d bytes want %d", n, blockSize-1024)
}
// Second read.
rr, err = r.Next()
if err != nil {
t.Fatal(err)
}
n, err = io.Copy(ioutil.Discard, rr)
if err != io.ErrUnexpectedEOF {
t.Fatalf("read #1: unexpected error: %v", err)
}
}
func TestCorrupt_CorruptedMiddleBlock(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(buf)
// First record.
ww, err := w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil {
t.Fatalf("write #0: unexpected error: %v", err)
}
// Second record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil {
t.Fatalf("write #1: unexpected error: %v", err)
}
// Third record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil {
t.Fatalf("write #2: unexpected error: %v", err)
}
// Fourth record.
ww, err = w.Next()
if err != nil {
t.Fatal(err)
}
if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+2)); err != nil {
t.Fatalf("write #3: unexpected error: %v", err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
b := buf.Bytes()
// Corrupting block #1.
for i := 0; i < 1024; i++ {
b[blockSize+i] = '1'
}
r := NewReader(bytes.NewReader(b), dropper{t}, false, true)
// First read.
rr, err := r.Next()
if err != nil {
t.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #0: %v", err)
}
if want := int64(blockSize / 2); n != want {
t.Fatalf("read #0: got %d bytes want %d", n, want)
}
// Second read.
rr, err = r.Next()
if err != nil {
t.Fatal(err)
}
n, err = io.Copy(ioutil.Discard, rr)
if err != io.ErrUnexpectedEOF {
t.Fatalf("read #1: unexpected error: %v", err)
}
// Third read.
rr, err = r.Next()
if err != nil {
t.Fatal(err)
}
n, err = io.Copy(ioutil.Discard, rr)
if err != nil {
t.Fatalf("read #2: %v", err)
}
if want := int64(blockSize-headerSize) + 2; n != want {
t.Fatalf("read #2: got %d bytes want %d", n, want)
}
}

View File

@ -22,7 +22,7 @@ type dropper struct {
} }
func (d dropper) Drop(err error) { func (d dropper) Drop(err error) {
if e, ok := err.(journal.DroppedError); ok { if e, ok := err.(journal.ErrCorrupted); ok {
d.s.logf("journal@drop %s-%d S·%s %q", d.file.Type(), d.file.Num(), shortenb(e.Size), e.Reason) d.s.logf("journal@drop %s-%d S·%s %q", d.file.Type(), d.file.Num(), shortenb(e.Size), e.Reason)
} else { } else {
d.s.logf("journal@drop %s-%d %q", d.file.Type(), d.file.Num(), err) d.s.logf("journal@drop %s-%d %q", d.file.Type(), d.file.Num(), err)