From 94d157d97acd7c0a4ead68364e8f698782a9955b Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 7 Aug 2016 14:50:24 +0200 Subject: [PATCH] Introduce interface pack.Loader --- src/cmds/restic/cmd_dump.go | 4 +- src/restic/backend/interface.go | 4 +- src/restic/backend/local/local.go | 26 ++++++++--- src/restic/backend/mem/mem_backend.go | 7 ++- src/restic/backend/rest/rest.go | 14 ++++++ src/restic/backend/s3/s3.go | 12 +++-- src/restic/backend/sftp/sftp.go | 17 ++++--- src/restic/checker/checker.go | 3 +- src/restic/pack/loader.go | 43 +++++++++++++++++ src/restic/pack/pack.go | 66 ++++++++++++--------------- src/restic/pack/pack_test.go | 23 ++++++---- src/restic/repository/repack.go | 3 +- src/restic/repository/repository.go | 4 +- 13 files changed, 152 insertions(+), 74 deletions(-) create mode 100644 src/restic/pack/loader.go diff --git a/src/cmds/restic/cmd_dump.go b/src/cmds/restic/cmd_dump.go index 72a9d85b8..68e4ac0d8 100644 --- a/src/cmds/restic/cmd_dump.go +++ b/src/cmds/restic/cmd_dump.go @@ -126,9 +126,9 @@ func printPacks(repo *repository.Repository, wr io.Writer) error { name := job.Data.(string) h := backend.Handle{Type: backend.Data, Name: name} - rd := backend.NewReadSeeker(repo.Backend(), h) + ldr := pack.BackendLoader{Backend: repo.Backend(), Handle: h} - unpacker, err := pack.NewUnpacker(repo.Key(), rd) + unpacker, err := pack.NewUnpacker(repo.Key(), ldr) if err != nil { return nil, err } diff --git a/src/restic/backend/interface.go b/src/restic/backend/interface.go index fb0927c6e..24838ddfd 100644 --- a/src/restic/backend/interface.go +++ b/src/restic/backend/interface.go @@ -31,7 +31,9 @@ type Backend interface { Lister // Load returns the data stored in the backend for h at the given offset - // and saves it in p. Load has the same semantics as io.ReaderAt. + // and saves it in p. Load has the same semantics as io.ReaderAt, except + // that a negative offset is also allowed. In this case it references a + // position relative to the end of the file (similar to Seek()). Load(h Handle, p []byte, off int64) (int, error) // Save stores the data in the backend under the given handle. diff --git a/src/restic/backend/local/local.go b/src/restic/backend/local/local.go index 0821720c7..6fa1ac9f4 100644 --- a/src/restic/backend/local/local.go +++ b/src/restic/backend/local/local.go @@ -98,9 +98,12 @@ func dirname(base string, t backend.Type, name string) string { return filepath.Join(base, n) } -// Load returns the data stored in the backend for h at the given offset -// and saves it in p. Load has the same semantics as io.ReaderAt. +// Load returns the data stored in the backend for h at the given offset and +// saves it in p. Load has the same semantics as io.ReaderAt, with one +// exception: when off is lower than zero, it is treated as an offset relative +// to the end of the file. func (b *Local) Load(h backend.Handle, p []byte, off int64) (n int, err error) { + debug.Log("backend.local.Load", "Load %v, length %v at %v", h, len(p), off) if err := h.Valid(); err != nil { return 0, err } @@ -117,11 +120,15 @@ func (b *Local) Load(h backend.Handle, p []byte, off int64) (n int, err error) { } }() - if off > 0 { + switch { + case off > 0: _, err = f.Seek(off, 0) - if err != nil { - return 0, err - } + case off < 0: + _, err = f.Seek(off, 2) + } + + if err != nil { + return 0, err } return io.ReadFull(f, p) @@ -162,6 +169,7 @@ func writeToTempfile(tempdir string, p []byte) (filename string, err error) { // Save stores data in the backend at the handle. func (b *Local) Save(h backend.Handle, p []byte) (err error) { + debug.Log("backend.local.Save", "Save %v, length %v", h, len(p)) if err := h.Valid(); err != nil { return err } @@ -203,6 +211,7 @@ func (b *Local) Save(h backend.Handle, p []byte) (err error) { // Stat returns information about a blob. func (b *Local) Stat(h backend.Handle) (backend.BlobInfo, error) { + debug.Log("backend.local.Stat", "Stat %v", h) if err := h.Valid(); err != nil { return backend.BlobInfo{}, err } @@ -217,6 +226,7 @@ func (b *Local) Stat(h backend.Handle) (backend.BlobInfo, error) { // Test returns true if a blob of the given type and name exists in the backend. func (b *Local) Test(t backend.Type, name string) (bool, error) { + debug.Log("backend.local.Test", "Test %v %v", t, name) _, err := fs.Stat(filename(b.p, t, name)) if err != nil { if os.IsNotExist(err) { @@ -230,6 +240,7 @@ func (b *Local) Test(t backend.Type, name string) (bool, error) { // Remove removes the blob with the given name and type. func (b *Local) Remove(t backend.Type, name string) error { + debug.Log("backend.local.Remove", "Remove %v %v", t, name) fn := filename(b.p, t, name) // reset read-only flag @@ -304,6 +315,7 @@ func listDirs(dir string) (filenames []string, err error) { // goroutine is started for this. If the channel done is closed, sending // stops. func (b *Local) List(t backend.Type, done <-chan struct{}) <-chan string { + debug.Log("backend.local.List", "List %v", t) lister := listDir if t == backend.Data { lister = listDirs @@ -336,11 +348,13 @@ func (b *Local) List(t backend.Type, done <-chan struct{}) <-chan string { // Delete removes the repository and all files. func (b *Local) Delete() error { + debug.Log("backend.local.Delete", "Delete()") return fs.RemoveAll(b.p) } // Close closes all open files. func (b *Local) Close() error { + debug.Log("backend.local.Close", "Close()") // this does not need to do anything, all open files are closed within the // same function. return nil diff --git a/src/restic/backend/mem/mem_backend.go b/src/restic/backend/mem/mem_backend.go index 2dde7e320..961997ae7 100644 --- a/src/restic/backend/mem/mem_backend.go +++ b/src/restic/backend/mem/mem_backend.go @@ -116,8 +116,13 @@ func memLoad(be *MemoryBackend, h backend.Handle, p []byte, off int64) (int, err } buf := be.data[entry{h.Type, h.Name}] - if off > int64(len(buf)) { + switch { + case off > int64(len(buf)): return 0, errors.New("offset beyond end of file") + case off < -int64(len(buf)): + return 0, errors.New("offset beyond beginning of file") + case off < 0: + off = int64(len(buf)) + off } buf = buf[off:] diff --git a/src/restic/backend/rest/rest.go b/src/restic/backend/rest/rest.go index e9303358a..00fe0192b 100644 --- a/src/restic/backend/rest/rest.go +++ b/src/restic/backend/rest/rest.go @@ -75,6 +75,20 @@ func (b *restBackend) Load(h backend.Handle, p []byte, off int64) (n int, err er return 0, err } + // invert offset + if off < 0 { + info, err := b.Stat(h) + if err != nil { + return 0, err + } + + if off > -info.Size { + return 0, errors.New("offset before beginning of file") + } + + off = info.Size + off + } + req, err := http.NewRequest("GET", restPath(b.url, h), nil) if err != nil { return 0, err diff --git a/src/restic/backend/s3/s3.go b/src/restic/backend/s3/s3.go index c35719329..0dbf48020 100644 --- a/src/restic/backend/s3/s3.go +++ b/src/restic/backend/s3/s3.go @@ -86,11 +86,15 @@ func (be s3) Load(h backend.Handle, p []byte, off int64) (int, error) { return 0, err } - if off > 0 { + switch { + case off > 0: _, err = obj.Seek(off, 0) - if err != nil { - return 0, err - } + case off < 0: + _, err = obj.Seek(off, 2) + } + + if err != nil { + return 0, err } <-be.connChan diff --git a/src/restic/backend/sftp/sftp.go b/src/restic/backend/sftp/sftp.go index 4279b8d5a..37f274913 100644 --- a/src/restic/backend/sftp/sftp.go +++ b/src/restic/backend/sftp/sftp.go @@ -11,10 +11,11 @@ import ( "path/filepath" "strings" - "github.com/juju/errors" - "github.com/pkg/sftp" "restic/backend" "restic/debug" + + "github.com/juju/errors" + "github.com/pkg/sftp" ) const ( @@ -304,11 +305,15 @@ func (r *SFTP) Load(h backend.Handle, p []byte, off int64) (n int, err error) { } }() - if off > 0 { + switch { + case off > 0: _, err = f.Seek(off, 0) - if err != nil { - return 0, err - } + case off < 0: + _, err = f.Seek(off, 2) + } + + if err != nil { + return 0, err } return io.ReadFull(f, p) diff --git a/src/restic/checker/checker.go b/src/restic/checker/checker.go index 2f796de69..bcfa56a04 100644 --- a/src/restic/checker/checker.go +++ b/src/restic/checker/checker.go @@ -1,7 +1,6 @@ package checker import ( - "bytes" "errors" "fmt" "sync" @@ -677,7 +676,7 @@ func checkPack(r *repository.Repository, id backend.ID) error { return fmt.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str()) } - unpacker, err := pack.NewUnpacker(r.Key(), bytes.NewReader(buf)) + unpacker, err := pack.NewUnpacker(r.Key(), pack.BufferLoader(buf)) if err != nil { return err } diff --git a/src/restic/pack/loader.go b/src/restic/pack/loader.go new file mode 100644 index 000000000..d9610a967 --- /dev/null +++ b/src/restic/pack/loader.go @@ -0,0 +1,43 @@ +package pack + +import ( + "errors" + "restic/backend" +) + +// Loader loads data from somewhere at a given offset. In contrast to +// io.ReaderAt, off may be negative, in which case it references a position +// relative to the end of the file (similar to Seek()). +type Loader interface { + Load(p []byte, off int64) (int, error) +} + +// BackendLoader creates a Loader from a Backend and a Handle. +type BackendLoader struct { + Backend backend.Backend + Handle backend.Handle +} + +// Load returns data at the given offset. +func (l BackendLoader) Load(p []byte, off int64) (int, error) { + return l.Backend.Load(l.Handle, p, off) +} + +// BufferLoader allows using a buffer as a Loader. +type BufferLoader []byte + +// Load returns data at the given offset. +func (b BufferLoader) Load(p []byte, off int64) (int, error) { + switch { + case off > int64(len(b)): + return 0, errors.New("offset is larger than data") + case off < -int64(len(b)): + return 0, errors.New("offset starts before the beginning of the data") + case off < 0: + off = int64(len(b)) + off + } + + b = b[off:] + + return copy(p, b), nil +} diff --git a/src/restic/pack/pack.go b/src/restic/pack/pack.go index 91f12a63b..930a15a72 100644 --- a/src/restic/pack/pack.go +++ b/src/restic/pack/pack.go @@ -245,61 +245,51 @@ const preloadHeaderSize = 2048 // NewUnpacker returns a pointer to Unpacker which can be used to read // individual Blobs from a pack. -func NewUnpacker(k *crypto.Key, rd io.ReadSeeker) (*Unpacker, error) { +func NewUnpacker(k *crypto.Key, ldr Loader) (*Unpacker, error) { var err error // read the last 2048 byte, this will mostly be enough for the header, so // we do not need another round trip. buf := make([]byte, preloadHeaderSize) - _, err = rd.Seek(-int64(len(buf)), 2) + n, err := ldr.Load(buf, -int64(len(buf))) if err != nil { - return nil, fmt.Errorf("seek to -%d failed: %v", len(buf), err) + return nil, fmt.Errorf("Load at -%d failed: %v", len(buf), err) + } + buf = buf[:n] + + bs := binary.Size(uint32(0)) + p := len(buf) - bs + + // read the length from the end of the buffer + length := int(binary.LittleEndian.Uint32(buf[p : p+bs])) + buf = buf[:p] + + // if the header is longer than the preloaded buffer, call the loader again. + if length > len(buf) { + buf = make([]byte, length) + n, err := ldr.Load(buf, -int64(len(buf)+bs)) + if err != nil { + return nil, fmt.Errorf("Load at -%d failed: %v", len(buf), err) + } + buf = buf[:n] } - _, err = io.ReadFull(rd, buf) - if err != nil { - return nil, fmt.Errorf("error reading last %d bytes: %v", len(buf), err) - } - - hdrRd := io.ReadSeeker(bytes.NewReader(buf)) - ls := binary.Size(uint32(0)) - - // reset to the end to read header length - _, err = hdrRd.Seek(-int64(ls), 2) - if err != nil { - return nil, fmt.Errorf("seeking to read header length failed: %v", err) - } - - var length uint32 - err = binary.Read(hdrRd, binary.LittleEndian, &length) - if err != nil { - return nil, fmt.Errorf("reading header length failed: %v", err) - } - - // if the header is longer than the preloaded buffer, use the original - // reader (and do another round trip) - if int(length) > preloadHeaderSize-ls { - hdrRd = rd - } - - // reset to the beginning of the header - _, err = hdrRd.Seek(-int64(ls)-int64(length), 2) - if err != nil { - return nil, fmt.Errorf("seeking to read header length failed: %v", err) - } + buf = buf[len(buf)-length:] // read header - hrd, err := crypto.DecryptFrom(k, io.LimitReader(hdrRd, int64(length))) + hdr, err := crypto.Decrypt(k, buf, buf) if err != nil { return nil, err } + rd := bytes.NewReader(hdr) + var entries []Blob pos := uint(0) for { e := headerEntry{} - err = binary.Read(hrd, binary.LittleEndian, &e) + err = binary.Read(rd, binary.LittleEndian, &e) if err == io.EOF { break } @@ -328,11 +318,11 @@ func NewUnpacker(k *crypto.Key, rd io.ReadSeeker) (*Unpacker, error) { pos += uint(e.Length) } - p := &Unpacker{ + up := &Unpacker{ rd: rd, k: k, Entries: entries, } - return p, nil + return up, nil } diff --git a/src/restic/pack/pack_test.go b/src/restic/pack/pack_test.go index e987ced7c..16e2af1b9 100644 --- a/src/restic/pack/pack_test.go +++ b/src/restic/pack/pack_test.go @@ -7,7 +7,6 @@ import ( "encoding/binary" "encoding/json" "io" - "io/ioutil" "testing" "restic/backend" @@ -48,7 +47,7 @@ func newPack(t testing.TB, k *crypto.Key) ([]Buf, []byte, uint) { return bufs, packData, p.Size() } -func verifyBlobs(t testing.TB, bufs []Buf, k *crypto.Key, rd io.ReadSeeker, packSize uint) { +func verifyBlobs(t testing.TB, bufs []Buf, k *crypto.Key, ldr pack.Loader, packSize uint) { written := 0 for _, l := range lengths { written += l @@ -64,20 +63,24 @@ func verifyBlobs(t testing.TB, bufs []Buf, k *crypto.Key, rd io.ReadSeeker, pack Equals(t, uint(written), packSize) // read and parse it again - np, err := pack.NewUnpacker(k, rd) + np, err := pack.NewUnpacker(k, ldr) OK(t, err) Equals(t, len(np.Entries), len(bufs)) + var buf []byte for i, b := range bufs { e := np.Entries[i] Equals(t, b.id, e.ID) - brd, err := e.GetReader(rd) - OK(t, err) - data, err := ioutil.ReadAll(brd) + if len(buf) < int(e.Length) { + buf = make([]byte, int(e.Length)) + } + buf = buf[:int(e.Length)] + n, err := ldr.Load(buf, int64(e.Offset)) OK(t, err) + buf = buf[:n] - Assert(t, bytes.Equal(b.data, data), + Assert(t, bytes.Equal(b.data, buf), "data for blob %v doesn't match", i) } } @@ -88,7 +91,7 @@ func TestCreatePack(t *testing.T) { bufs, packData, packSize := newPack(t, k) Equals(t, uint(len(packData)), packSize) - verifyBlobs(t, bufs, k, bytes.NewReader(packData), packSize) + verifyBlobs(t, bufs, k, pack.BufferLoader(packData), packSize) } var blobTypeJSON = []struct { @@ -125,6 +128,6 @@ func TestUnpackReadSeeker(t *testing.T) { handle := backend.Handle{Type: backend.Data, Name: id.String()} OK(t, b.Save(handle, packData)) - rd := backend.NewReadSeeker(b, handle) - verifyBlobs(t, bufs, k, rd, packSize) + ldr := pack.BackendLoader{Backend: b, Handle: handle} + verifyBlobs(t, bufs, k, ldr, packSize) } diff --git a/src/restic/repository/repack.go b/src/restic/repository/repack.go index 9f99cdb63..0498164a4 100644 --- a/src/restic/repository/repack.go +++ b/src/restic/repository/repack.go @@ -1,7 +1,6 @@ package repository import ( - "bytes" "io" "restic/backend" "restic/crypto" @@ -33,7 +32,7 @@ func Repack(repo *Repository, packs backend.IDSet, keepBlobs pack.BlobSet) (err debug.Log("Repack", "pack %v loaded (%d bytes)", packID.Str(), len(buf)) - unpck, err := pack.NewUnpacker(repo.Key(), bytes.NewReader(buf)) + unpck, err := pack.NewUnpacker(repo.Key(), pack.BufferLoader(buf)) if err != nil { return err } diff --git a/src/restic/repository/repository.go b/src/restic/repository/repository.go index 1fe2d26d7..654994af1 100644 --- a/src/restic/repository/repository.go +++ b/src/restic/repository/repository.go @@ -547,9 +547,9 @@ func (r *Repository) List(t backend.Type, done <-chan struct{}) <-chan backend.I // ListPack returns the list of blobs saved in the pack id. func (r *Repository) ListPack(id backend.ID) ([]pack.Blob, error) { h := backend.Handle{Type: backend.Data, Name: id.String()} - rd := backend.NewReadSeeker(r.Backend(), h) + ldr := pack.BackendLoader{Backend: r.Backend(), Handle: h} - unpacker, err := pack.NewUnpacker(r.Key(), rd) + unpacker, err := pack.NewUnpacker(r.Key(), ldr) if err != nil { return nil, err }