Introduce interface pack.Loader

This commit is contained in:
Alexander Neumann 2016-08-07 14:50:24 +02:00
parent f72f3dbc6a
commit 94d157d97a
13 changed files with 152 additions and 74 deletions

View File

@ -126,9 +126,9 @@ func printPacks(repo *repository.Repository, wr io.Writer) error {
name := job.Data.(string)
h := backend.Handle{Type: backend.Data, Name: name}
rd := backend.NewReadSeeker(repo.Backend(), h)
ldr := pack.BackendLoader{Backend: repo.Backend(), Handle: h}
unpacker, err := pack.NewUnpacker(repo.Key(), rd)
unpacker, err := pack.NewUnpacker(repo.Key(), ldr)
if err != nil {
return nil, err
}

View File

@ -31,7 +31,9 @@ type Backend interface {
Lister
// Load returns the data stored in the backend for h at the given offset
// and saves it in p. Load has the same semantics as io.ReaderAt.
// and saves it in p. Load has the same semantics as io.ReaderAt, except
// that a negative offset is also allowed. In this case it references a
// position relative to the end of the file (similar to Seek()).
Load(h Handle, p []byte, off int64) (int, error)
// Save stores the data in the backend under the given handle.

View File

@ -98,9 +98,12 @@ func dirname(base string, t backend.Type, name string) string {
return filepath.Join(base, n)
}
// Load returns the data stored in the backend for h at the given offset
// and saves it in p. Load has the same semantics as io.ReaderAt.
// Load returns the data stored in the backend for h at the given offset and
// saves it in p. Load has the same semantics as io.ReaderAt, with one
// exception: when off is lower than zero, it is treated as an offset relative
// to the end of the file.
func (b *Local) Load(h backend.Handle, p []byte, off int64) (n int, err error) {
debug.Log("backend.local.Load", "Load %v, length %v at %v", h, len(p), off)
if err := h.Valid(); err != nil {
return 0, err
}
@ -117,11 +120,15 @@ func (b *Local) Load(h backend.Handle, p []byte, off int64) (n int, err error) {
}
}()
if off > 0 {
switch {
case off > 0:
_, err = f.Seek(off, 0)
if err != nil {
return 0, err
}
case off < 0:
_, err = f.Seek(off, 2)
}
if err != nil {
return 0, err
}
return io.ReadFull(f, p)
@ -162,6 +169,7 @@ func writeToTempfile(tempdir string, p []byte) (filename string, err error) {
// Save stores data in the backend at the handle.
func (b *Local) Save(h backend.Handle, p []byte) (err error) {
debug.Log("backend.local.Save", "Save %v, length %v", h, len(p))
if err := h.Valid(); err != nil {
return err
}
@ -203,6 +211,7 @@ func (b *Local) Save(h backend.Handle, p []byte) (err error) {
// Stat returns information about a blob.
func (b *Local) Stat(h backend.Handle) (backend.BlobInfo, error) {
debug.Log("backend.local.Stat", "Stat %v", h)
if err := h.Valid(); err != nil {
return backend.BlobInfo{}, err
}
@ -217,6 +226,7 @@ func (b *Local) Stat(h backend.Handle) (backend.BlobInfo, error) {
// Test returns true if a blob of the given type and name exists in the backend.
func (b *Local) Test(t backend.Type, name string) (bool, error) {
debug.Log("backend.local.Test", "Test %v %v", t, name)
_, err := fs.Stat(filename(b.p, t, name))
if err != nil {
if os.IsNotExist(err) {
@ -230,6 +240,7 @@ func (b *Local) Test(t backend.Type, name string) (bool, error) {
// Remove removes the blob with the given name and type.
func (b *Local) Remove(t backend.Type, name string) error {
debug.Log("backend.local.Remove", "Remove %v %v", t, name)
fn := filename(b.p, t, name)
// reset read-only flag
@ -304,6 +315,7 @@ func listDirs(dir string) (filenames []string, err error) {
// goroutine is started for this. If the channel done is closed, sending
// stops.
func (b *Local) List(t backend.Type, done <-chan struct{}) <-chan string {
debug.Log("backend.local.List", "List %v", t)
lister := listDir
if t == backend.Data {
lister = listDirs
@ -336,11 +348,13 @@ func (b *Local) List(t backend.Type, done <-chan struct{}) <-chan string {
// Delete removes the repository and all files.
func (b *Local) Delete() error {
debug.Log("backend.local.Delete", "Delete()")
return fs.RemoveAll(b.p)
}
// Close closes all open files.
func (b *Local) Close() error {
debug.Log("backend.local.Close", "Close()")
// this does not need to do anything, all open files are closed within the
// same function.
return nil

View File

@ -116,8 +116,13 @@ func memLoad(be *MemoryBackend, h backend.Handle, p []byte, off int64) (int, err
}
buf := be.data[entry{h.Type, h.Name}]
if off > int64(len(buf)) {
switch {
case off > int64(len(buf)):
return 0, errors.New("offset beyond end of file")
case off < -int64(len(buf)):
return 0, errors.New("offset beyond beginning of file")
case off < 0:
off = int64(len(buf)) + off
}
buf = buf[off:]

View File

@ -75,6 +75,20 @@ func (b *restBackend) Load(h backend.Handle, p []byte, off int64) (n int, err er
return 0, err
}
// invert offset
if off < 0 {
info, err := b.Stat(h)
if err != nil {
return 0, err
}
if off > -info.Size {
return 0, errors.New("offset before beginning of file")
}
off = info.Size + off
}
req, err := http.NewRequest("GET", restPath(b.url, h), nil)
if err != nil {
return 0, err

View File

@ -86,11 +86,15 @@ func (be s3) Load(h backend.Handle, p []byte, off int64) (int, error) {
return 0, err
}
if off > 0 {
switch {
case off > 0:
_, err = obj.Seek(off, 0)
if err != nil {
return 0, err
}
case off < 0:
_, err = obj.Seek(off, 2)
}
if err != nil {
return 0, err
}
<-be.connChan

View File

@ -11,10 +11,11 @@ import (
"path/filepath"
"strings"
"github.com/juju/errors"
"github.com/pkg/sftp"
"restic/backend"
"restic/debug"
"github.com/juju/errors"
"github.com/pkg/sftp"
)
const (
@ -304,11 +305,15 @@ func (r *SFTP) Load(h backend.Handle, p []byte, off int64) (n int, err error) {
}
}()
if off > 0 {
switch {
case off > 0:
_, err = f.Seek(off, 0)
if err != nil {
return 0, err
}
case off < 0:
_, err = f.Seek(off, 2)
}
if err != nil {
return 0, err
}
return io.ReadFull(f, p)

View File

@ -1,7 +1,6 @@
package checker
import (
"bytes"
"errors"
"fmt"
"sync"
@ -677,7 +676,7 @@ func checkPack(r *repository.Repository, id backend.ID) error {
return fmt.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str())
}
unpacker, err := pack.NewUnpacker(r.Key(), bytes.NewReader(buf))
unpacker, err := pack.NewUnpacker(r.Key(), pack.BufferLoader(buf))
if err != nil {
return err
}

43
src/restic/pack/loader.go Normal file
View File

@ -0,0 +1,43 @@
package pack
import (
"errors"
"restic/backend"
)
// Loader loads data from somewhere at a given offset. In contrast to
// io.ReaderAt, off may be negative, in which case it references a position
// relative to the end of the file (similar to Seek()).
type Loader interface {
Load(p []byte, off int64) (int, error)
}
// BackendLoader creates a Loader from a Backend and a Handle.
type BackendLoader struct {
Backend backend.Backend
Handle backend.Handle
}
// Load returns data at the given offset.
func (l BackendLoader) Load(p []byte, off int64) (int, error) {
return l.Backend.Load(l.Handle, p, off)
}
// BufferLoader allows using a buffer as a Loader.
type BufferLoader []byte
// Load returns data at the given offset.
func (b BufferLoader) Load(p []byte, off int64) (int, error) {
switch {
case off > int64(len(b)):
return 0, errors.New("offset is larger than data")
case off < -int64(len(b)):
return 0, errors.New("offset starts before the beginning of the data")
case off < 0:
off = int64(len(b)) + off
}
b = b[off:]
return copy(p, b), nil
}

View File

@ -245,61 +245,51 @@ const preloadHeaderSize = 2048
// NewUnpacker returns a pointer to Unpacker which can be used to read
// individual Blobs from a pack.
func NewUnpacker(k *crypto.Key, rd io.ReadSeeker) (*Unpacker, error) {
func NewUnpacker(k *crypto.Key, ldr Loader) (*Unpacker, error) {
var err error
// read the last 2048 byte, this will mostly be enough for the header, so
// we do not need another round trip.
buf := make([]byte, preloadHeaderSize)
_, err = rd.Seek(-int64(len(buf)), 2)
n, err := ldr.Load(buf, -int64(len(buf)))
if err != nil {
return nil, fmt.Errorf("seek to -%d failed: %v", len(buf), err)
return nil, fmt.Errorf("Load at -%d failed: %v", len(buf), err)
}
buf = buf[:n]
bs := binary.Size(uint32(0))
p := len(buf) - bs
// read the length from the end of the buffer
length := int(binary.LittleEndian.Uint32(buf[p : p+bs]))
buf = buf[:p]
// if the header is longer than the preloaded buffer, call the loader again.
if length > len(buf) {
buf = make([]byte, length)
n, err := ldr.Load(buf, -int64(len(buf)+bs))
if err != nil {
return nil, fmt.Errorf("Load at -%d failed: %v", len(buf), err)
}
buf = buf[:n]
}
_, err = io.ReadFull(rd, buf)
if err != nil {
return nil, fmt.Errorf("error reading last %d bytes: %v", len(buf), err)
}
hdrRd := io.ReadSeeker(bytes.NewReader(buf))
ls := binary.Size(uint32(0))
// reset to the end to read header length
_, err = hdrRd.Seek(-int64(ls), 2)
if err != nil {
return nil, fmt.Errorf("seeking to read header length failed: %v", err)
}
var length uint32
err = binary.Read(hdrRd, binary.LittleEndian, &length)
if err != nil {
return nil, fmt.Errorf("reading header length failed: %v", err)
}
// if the header is longer than the preloaded buffer, use the original
// reader (and do another round trip)
if int(length) > preloadHeaderSize-ls {
hdrRd = rd
}
// reset to the beginning of the header
_, err = hdrRd.Seek(-int64(ls)-int64(length), 2)
if err != nil {
return nil, fmt.Errorf("seeking to read header length failed: %v", err)
}
buf = buf[len(buf)-length:]
// read header
hrd, err := crypto.DecryptFrom(k, io.LimitReader(hdrRd, int64(length)))
hdr, err := crypto.Decrypt(k, buf, buf)
if err != nil {
return nil, err
}
rd := bytes.NewReader(hdr)
var entries []Blob
pos := uint(0)
for {
e := headerEntry{}
err = binary.Read(hrd, binary.LittleEndian, &e)
err = binary.Read(rd, binary.LittleEndian, &e)
if err == io.EOF {
break
}
@ -328,11 +318,11 @@ func NewUnpacker(k *crypto.Key, rd io.ReadSeeker) (*Unpacker, error) {
pos += uint(e.Length)
}
p := &Unpacker{
up := &Unpacker{
rd: rd,
k: k,
Entries: entries,
}
return p, nil
return up, nil
}

View File

@ -7,7 +7,6 @@ import (
"encoding/binary"
"encoding/json"
"io"
"io/ioutil"
"testing"
"restic/backend"
@ -48,7 +47,7 @@ func newPack(t testing.TB, k *crypto.Key) ([]Buf, []byte, uint) {
return bufs, packData, p.Size()
}
func verifyBlobs(t testing.TB, bufs []Buf, k *crypto.Key, rd io.ReadSeeker, packSize uint) {
func verifyBlobs(t testing.TB, bufs []Buf, k *crypto.Key, ldr pack.Loader, packSize uint) {
written := 0
for _, l := range lengths {
written += l
@ -64,20 +63,24 @@ func verifyBlobs(t testing.TB, bufs []Buf, k *crypto.Key, rd io.ReadSeeker, pack
Equals(t, uint(written), packSize)
// read and parse it again
np, err := pack.NewUnpacker(k, rd)
np, err := pack.NewUnpacker(k, ldr)
OK(t, err)
Equals(t, len(np.Entries), len(bufs))
var buf []byte
for i, b := range bufs {
e := np.Entries[i]
Equals(t, b.id, e.ID)
brd, err := e.GetReader(rd)
OK(t, err)
data, err := ioutil.ReadAll(brd)
if len(buf) < int(e.Length) {
buf = make([]byte, int(e.Length))
}
buf = buf[:int(e.Length)]
n, err := ldr.Load(buf, int64(e.Offset))
OK(t, err)
buf = buf[:n]
Assert(t, bytes.Equal(b.data, data),
Assert(t, bytes.Equal(b.data, buf),
"data for blob %v doesn't match", i)
}
}
@ -88,7 +91,7 @@ func TestCreatePack(t *testing.T) {
bufs, packData, packSize := newPack(t, k)
Equals(t, uint(len(packData)), packSize)
verifyBlobs(t, bufs, k, bytes.NewReader(packData), packSize)
verifyBlobs(t, bufs, k, pack.BufferLoader(packData), packSize)
}
var blobTypeJSON = []struct {
@ -125,6 +128,6 @@ func TestUnpackReadSeeker(t *testing.T) {
handle := backend.Handle{Type: backend.Data, Name: id.String()}
OK(t, b.Save(handle, packData))
rd := backend.NewReadSeeker(b, handle)
verifyBlobs(t, bufs, k, rd, packSize)
ldr := pack.BackendLoader{Backend: b, Handle: handle}
verifyBlobs(t, bufs, k, ldr, packSize)
}

View File

@ -1,7 +1,6 @@
package repository
import (
"bytes"
"io"
"restic/backend"
"restic/crypto"
@ -33,7 +32,7 @@ func Repack(repo *Repository, packs backend.IDSet, keepBlobs pack.BlobSet) (err
debug.Log("Repack", "pack %v loaded (%d bytes)", packID.Str(), len(buf))
unpck, err := pack.NewUnpacker(repo.Key(), bytes.NewReader(buf))
unpck, err := pack.NewUnpacker(repo.Key(), pack.BufferLoader(buf))
if err != nil {
return err
}

View File

@ -547,9 +547,9 @@ func (r *Repository) List(t backend.Type, done <-chan struct{}) <-chan backend.I
// ListPack returns the list of blobs saved in the pack id.
func (r *Repository) ListPack(id backend.ID) ([]pack.Blob, error) {
h := backend.Handle{Type: backend.Data, Name: id.String()}
rd := backend.NewReadSeeker(r.Backend(), h)
ldr := pack.BackendLoader{Backend: r.Backend(), Handle: h}
unpacker, err := pack.NewUnpacker(r.Key(), rd)
unpacker, err := pack.NewUnpacker(r.Key(), ldr)
if err != nil {
return nil, err
}