diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 51c4122ae..5dcf21f89 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -49,7 +49,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "c5955912e3287376475731c5bc59c79a5a799105" + "Rev": "fcb3916f495c513b7eab70340f030e942e67b7da" }, { "ImportPath": "github.com/vitrun/qart/coding", diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index fd11a8633..4d178b9c0 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -481,10 +481,11 @@ func (db *DB) recoverJournal() error { buf.Reset() if _, err := buf.ReadFrom(r); err != nil { - if strict { + if err == io.ErrUnexpectedEOF { + continue + } else { return err } - continue } if err := batch.decode(buf.Bytes()); err != nil { return err diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go index b522c76e6..2e2cc351e 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go @@ -103,18 +103,18 @@ type flusher interface { Flush() error } -// DroppedError is the error type that passed to Dropper.Drop method. -type DroppedError struct { +// ErrCorrupted is the error type that generated by corrupted block or chunk. +type ErrCorrupted struct { Size int Reason string } -func (e DroppedError) Error() string { - return fmt.Sprintf("leveldb/journal: dropped %d bytes: %s", e.Size, e.Reason) +func (e ErrCorrupted) Error() string { + return fmt.Sprintf("leveldb/journal: corrupted %d bytes: %s", e.Size, e.Reason) } // Dropper is the interface that wrap simple Drop method. The Drop -// method will be called when the journal reader dropping a chunk. +// method will be called when the journal reader dropping a block or chunk. type Dropper interface { Drop(err error) } @@ -158,76 +158,78 @@ func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader { } } +var errSkip = errors.New("leveldb/journal: skipped") + +func (r *Reader) corrupt(n int, reason string, skip bool) error { + if r.dropper != nil { + r.dropper.Drop(ErrCorrupted{n, reason}) + } + if r.strict && !skip { + r.err = ErrCorrupted{n, reason} + return r.err + } + return errSkip +} + // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the // next block into the buffer if necessary. -func (r *Reader) nextChunk(wantFirst, skip bool) error { +func (r *Reader) nextChunk(first bool) error { for { if r.j+headerSize <= r.n { checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4]) length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6]) chunkType := r.buf[r.j+6] - var err error if checksum == 0 && length == 0 && chunkType == 0 { // Drop entire block. - err = DroppedError{r.n - r.j, "zero header"} + m := r.n - r.j r.i = r.n r.j = r.n + return r.corrupt(m, "zero header", false) } else { m := r.n - r.j r.i = r.j + headerSize r.j = r.j + headerSize + int(length) if r.j > r.n { // Drop entire block. - err = DroppedError{m, "chunk length overflows block"} r.i = r.n r.j = r.n + return r.corrupt(m, "chunk length overflows block", false) } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { // Drop entire block. - err = DroppedError{m, "checksum mismatch"} r.i = r.n r.j = r.n + return r.corrupt(m, "checksum mismatch", false) } } - if wantFirst && err == nil && chunkType != fullChunkType && chunkType != firstChunkType { - if skip { - // The chunk are intentionally skipped. - if chunkType == lastChunkType { - skip = false - } - continue - } else { - // Drop the chunk. - err = DroppedError{r.j - r.i + headerSize, "orphan chunk"} - } + if first && chunkType != fullChunkType && chunkType != firstChunkType { + m := r.j - r.i + r.i = r.j + // Report the error, but skip it. + return r.corrupt(m+headerSize, "orphan chunk", true) } - if err == nil { - r.last = chunkType == fullChunkType || chunkType == lastChunkType - } else { - if r.dropper != nil { - r.dropper.Drop(err) - } - if r.strict { - r.err = err - } + r.last = chunkType == fullChunkType || chunkType == lastChunkType + return nil + } + + // The last block. + if r.n < blockSize && r.n > 0 { + if !first { + return r.corrupt(0, "missing chunk part", false) } + r.err = io.EOF + return r.err + } + + // Read block. + n, err := io.ReadFull(r.r, r.buf[:]) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { return err } - if r.n < blockSize && r.n > 0 { - // This is the last block. - if r.j != r.n { - r.err = io.ErrUnexpectedEOF - } else { - r.err = io.EOF - } - return r.err - } - n, err := io.ReadFull(r.r, r.buf[:]) - if err != nil && err != io.ErrUnexpectedEOF { - r.err = err - return r.err - } if n == 0 { + if !first { + return r.corrupt(0, "missing chunk part", false) + } r.err = io.EOF return r.err } @@ -237,29 +239,26 @@ func (r *Reader) nextChunk(wantFirst, skip bool) error { // Next returns a reader for the next journal. It returns io.EOF if there are no // more journals. The reader returned becomes stale after the next Next call, -// and should no longer be used. +// and should no longer be used. If strict is false, the reader will returns +// io.ErrUnexpectedEOF error when found corrupted journal. func (r *Reader) Next() (io.Reader, error) { r.seq++ if r.err != nil { return nil, r.err } - skip := !r.last + r.i = r.j for { - r.i = r.j - if r.nextChunk(true, skip) != nil { - // So that 'orphan chunk' drop will be reported. - skip = false - } else { + if err := r.nextChunk(true); err == nil { break - } - if r.err != nil { - return nil, r.err + } else if err != errSkip { + return nil, err } } return &singleReader{r, r.seq, nil}, nil } -// Reset resets the journal reader, allows reuse of the journal reader. +// Reset resets the journal reader, allows reuse of the journal reader. Reset returns +// last accumulated error. func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error { r.seq++ err := r.err @@ -296,7 +295,11 @@ func (x *singleReader) Read(p []byte) (int, error) { if r.last { return 0, io.EOF } - if x.err = r.nextChunk(false, false); x.err != nil { + x.err = r.nextChunk(false) + if x.err != nil { + if x.err == errSkip { + x.err = io.ErrUnexpectedEOF + } return 0, x.err } } @@ -320,7 +323,11 @@ func (x *singleReader) ReadByte() (byte, error) { if r.last { return 0, io.EOF } - if x.err = r.nextChunk(false, false); x.err != nil { + x.err = r.nextChunk(false) + if x.err != nil { + if x.err == errSkip { + x.err = io.ErrUnexpectedEOF + } return 0, x.err } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal_test.go index 5e1193ae2..bde5cec22 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal_test.go @@ -326,3 +326,148 @@ func TestStaleWriter(t *testing.T) { t.Fatalf("stale write #1: unexpected error: %v", err) } } + +func TestCorrupt_MissingLastBlock(t *testing.T) { + buf := new(bytes.Buffer) + + w := NewWriter(buf) + + // First record. + ww, err := w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-1024)); err != nil { + t.Fatalf("write #0: unexpected error: %v", err) + } + + // Second record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil { + t.Fatalf("write #1: unexpected error: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + // Cut the last block. + b := buf.Bytes()[:blockSize] + r := NewReader(bytes.NewReader(b), dropper{t}, false, true) + + // First read. + rr, err := r.Next() + if err != nil { + t.Fatal(err) + } + n, err := io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #0: %v", err) + } + if n != blockSize-1024 { + t.Fatalf("read #0: got %d bytes want %d", n, blockSize-1024) + } + + // Second read. + rr, err = r.Next() + if err != nil { + t.Fatal(err) + } + n, err = io.Copy(ioutil.Discard, rr) + if err != io.ErrUnexpectedEOF { + t.Fatalf("read #1: unexpected error: %v", err) + } +} + +func TestCorrupt_CorruptedMiddleBlock(t *testing.T) { + buf := new(bytes.Buffer) + + w := NewWriter(buf) + + // First record. + ww, err := w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize/2)); err != nil { + t.Fatalf("write #0: unexpected error: %v", err) + } + + // Second record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), blockSize-headerSize)); err != nil { + t.Fatalf("write #1: unexpected error: %v", err) + } + + // Third record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+1)); err != nil { + t.Fatalf("write #2: unexpected error: %v", err) + } + + // Fourth record. + ww, err = w.Next() + if err != nil { + t.Fatal(err) + } + if _, err := ww.Write(bytes.Repeat([]byte("0"), (blockSize-headerSize)+2)); err != nil { + t.Fatalf("write #3: unexpected error: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + b := buf.Bytes() + // Corrupting block #1. + for i := 0; i < 1024; i++ { + b[blockSize+i] = '1' + } + + r := NewReader(bytes.NewReader(b), dropper{t}, false, true) + + // First read. + rr, err := r.Next() + if err != nil { + t.Fatal(err) + } + n, err := io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #0: %v", err) + } + if want := int64(blockSize / 2); n != want { + t.Fatalf("read #0: got %d bytes want %d", n, want) + } + + // Second read. + rr, err = r.Next() + if err != nil { + t.Fatal(err) + } + n, err = io.Copy(ioutil.Discard, rr) + if err != io.ErrUnexpectedEOF { + t.Fatalf("read #1: unexpected error: %v", err) + } + + // Third read. + rr, err = r.Next() + if err != nil { + t.Fatal(err) + } + n, err = io.Copy(ioutil.Discard, rr) + if err != nil { + t.Fatalf("read #2: %v", err) + } + if want := int64(blockSize-headerSize) + 2; n != want { + t.Fatalf("read #2: got %d bytes want %d", n, want) + } +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go index 47a2ea898..50a990948 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -22,7 +22,7 @@ type dropper struct { } func (d dropper) Drop(err error) { - if e, ok := err.(journal.DroppedError); ok { + if e, ok := err.(journal.ErrCorrupted); ok { d.s.logf("journal@drop %s-%d S·%s %q", d.file.Type(), d.file.Num(), shortenb(e.Size), e.Reason) } else { d.s.logf("journal@drop %s-%d %q", d.file.Type(), d.file.Num(), err)