mirror of
https://github.com/octoleo/restic.git
synced 2024-11-18 19:15:14 +00:00
Merge pull request #2790 from greatroar/fix-quadratic-read
Fix quadratic file reading in restic mount
This commit is contained in:
commit
b84f5177cb
6
changelog/unreleased/pull-2790
Normal file
6
changelog/unreleased/pull-2790
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
Enhancement: Optimized file access in restic mount
|
||||||
|
|
||||||
|
Reading large (> 100GiB) files from restic mountpoints is now faster,
|
||||||
|
and the speedup is greater for larger files.
|
||||||
|
|
||||||
|
https://github.com/restic/restic/pull/2790
|
@ -139,10 +139,7 @@ func mount(opts MountOptions, gopts GlobalOptions, mountpoint string) error {
|
|||||||
Paths: opts.Paths,
|
Paths: opts.Paths,
|
||||||
SnapshotTemplate: opts.SnapshotTemplate,
|
SnapshotTemplate: opts.SnapshotTemplate,
|
||||||
}
|
}
|
||||||
root, err := fuse.NewRoot(gopts.ctx, repo, cfg)
|
root := fuse.NewRoot(gopts.ctx, repo, cfg)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
Printf("Now serving the repository at %s\n", mountpoint)
|
Printf("Now serving the repository at %s\n", mountpoint)
|
||||||
Printf("When finished, quit with Ctrl-c or umount the mountpoint.\n")
|
Printf("When finished, quit with Ctrl-c or umount the mountpoint.\n")
|
||||||
|
2
go.mod
2
go.mod
@ -13,7 +13,7 @@ require (
|
|||||||
github.com/golang/protobuf v1.3.1 // indirect
|
github.com/golang/protobuf v1.3.1 // indirect
|
||||||
github.com/google/go-cmp v0.2.0
|
github.com/google/go-cmp v0.2.0
|
||||||
github.com/gopherjs/gopherjs v0.0.0-20190411002643-bd77b112433e // indirect
|
github.com/gopherjs/gopherjs v0.0.0-20190411002643-bd77b112433e // indirect
|
||||||
github.com/hashicorp/golang-lru v0.5.1 // indirect
|
github.com/hashicorp/golang-lru v0.5.1
|
||||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||||
github.com/juju/ratelimit v1.0.1
|
github.com/juju/ratelimit v1.0.1
|
||||||
github.com/kr/fs v0.1.0 // indirect
|
github.com/kr/fs v0.1.0 // indirect
|
||||||
|
87
internal/fuse/blobcache.go
Normal file
87
internal/fuse/blobcache.go
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
package fuse
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/restic/restic/internal/debug"
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
|
||||||
|
"github.com/hashicorp/golang-lru/simplelru"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Crude estimate of the overhead per blob: a SHA-256, a linked list node
|
||||||
|
// and some pointers. See comment in blobCache.add.
|
||||||
|
const cacheOverhead = len(restic.ID{}) + 64
|
||||||
|
|
||||||
|
// A blobCache is a fixed-size cache of blob contents.
|
||||||
|
// It is safe for concurrent access.
|
||||||
|
type blobCache struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
c *simplelru.LRU
|
||||||
|
|
||||||
|
free, size int // Current and max capacity, in bytes.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct a blob cache that stores at most size bytes worth of blobs.
|
||||||
|
func newBlobCache(size int) *blobCache {
|
||||||
|
c := &blobCache{
|
||||||
|
free: size,
|
||||||
|
size: size,
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLRU wants us to specify some max. number of entries, else it errors.
|
||||||
|
// The actual maximum will be smaller than size/cacheOverhead, because we
|
||||||
|
// evict entries (RemoveOldest in add) to maintain our size bound.
|
||||||
|
maxEntries := size / cacheOverhead
|
||||||
|
lru, err := simplelru.NewLRU(maxEntries, c.evict)
|
||||||
|
if err != nil {
|
||||||
|
panic(err) // Can only be maxEntries <= 0.
|
||||||
|
}
|
||||||
|
c.c = lru
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *blobCache) add(id restic.ID, blob []byte) {
|
||||||
|
debug.Log("blobCache: add %v", id)
|
||||||
|
|
||||||
|
size := len(blob) + cacheOverhead
|
||||||
|
if size > c.size {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
var key interface{} = id
|
||||||
|
|
||||||
|
if c.c.Contains(key) { // Doesn't update the recency list.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// This loop takes at most min(maxEntries, maxchunksize/cacheOverhead)
|
||||||
|
// iterations.
|
||||||
|
for size > c.free {
|
||||||
|
c.c.RemoveOldest()
|
||||||
|
}
|
||||||
|
|
||||||
|
c.c.Add(key, blob)
|
||||||
|
c.free -= size
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *blobCache) get(id restic.ID) ([]byte, bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
value, ok := c.c.Get(id)
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
debug.Log("blobCache: get %v, hit %v", id, ok)
|
||||||
|
|
||||||
|
blob, ok := value.([]byte)
|
||||||
|
return blob, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *blobCache) evict(key, value interface{}) {
|
||||||
|
blob := value.([]byte)
|
||||||
|
debug.Log("blobCache: evict %v, %d bytes", key, len(blob))
|
||||||
|
c.free += len(blob) + cacheOverhead
|
||||||
|
}
|
@ -3,6 +3,8 @@
|
|||||||
package fuse
|
package fuse
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sort"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
|
||||||
@ -18,21 +20,20 @@ const blockSize = 512
|
|||||||
|
|
||||||
// Statically ensure that *file implements the given interface
|
// Statically ensure that *file implements the given interface
|
||||||
var _ = fs.HandleReader(&file{})
|
var _ = fs.HandleReader(&file{})
|
||||||
var _ = fs.HandleReleaser(&file{})
|
|
||||||
|
|
||||||
type file struct {
|
type file struct {
|
||||||
root *Root
|
root *Root
|
||||||
node *restic.Node
|
node *restic.Node
|
||||||
inode uint64
|
inode uint64
|
||||||
|
|
||||||
sizes []int
|
// cumsize[i] holds the cumulative size of blobs[:i].
|
||||||
blobs [][]byte
|
cumsize []uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFile(ctx context.Context, root *Root, inode uint64, node *restic.Node) (fusefile *file, err error) {
|
func newFile(ctx context.Context, root *Root, inode uint64, node *restic.Node) (fusefile *file, err error) {
|
||||||
debug.Log("create new file for %v with %d blobs", node.Name, len(node.Content))
|
debug.Log("create new file for %v with %d blobs", node.Name, len(node.Content))
|
||||||
var bytes uint64
|
var bytes uint64
|
||||||
sizes := make([]int, len(node.Content))
|
cumsize := make([]uint64, 1+len(node.Content))
|
||||||
for i, id := range node.Content {
|
for i, id := range node.Content {
|
||||||
size, ok := root.blobSizeCache.Lookup(id)
|
size, ok := root.blobSizeCache.Lookup(id)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -43,8 +44,8 @@ func newFile(ctx context.Context, root *Root, inode uint64, node *restic.Node) (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sizes[i] = int(size)
|
|
||||||
bytes += uint64(size)
|
bytes += uint64(size)
|
||||||
|
cumsize[i+1] = bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
if bytes != node.Size {
|
if bytes != node.Size {
|
||||||
@ -56,8 +57,8 @@ func newFile(ctx context.Context, root *Root, inode uint64, node *restic.Node) (
|
|||||||
inode: inode,
|
inode: inode,
|
||||||
root: root,
|
root: root,
|
||||||
node: node,
|
node: node,
|
||||||
sizes: sizes,
|
|
||||||
blobs: make([][]byte, len(node.Content)),
|
cumsize: cumsize,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,13 +85,10 @@ func (f *file) Attr(ctx context.Context, a *fuse.Attr) error {
|
|||||||
|
|
||||||
func (f *file) getBlobAt(ctx context.Context, i int) (blob []byte, err error) {
|
func (f *file) getBlobAt(ctx context.Context, i int) (blob []byte, err error) {
|
||||||
debug.Log("getBlobAt(%v, %v)", f.node.Name, i)
|
debug.Log("getBlobAt(%v, %v)", f.node.Name, i)
|
||||||
if f.blobs[i] != nil {
|
|
||||||
return f.blobs[i], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// release earlier blobs
|
blob, ok := f.root.blobCache.get(f.node.Content[i])
|
||||||
for j := 0; j < i; j++ {
|
if ok {
|
||||||
f.blobs[j] = nil
|
return blob, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
blob, err = f.root.repo.LoadBlob(ctx, restic.DataBlob, f.node.Content[i], nil)
|
blob, err = f.root.repo.LoadBlob(ctx, restic.DataBlob, f.node.Content[i], nil)
|
||||||
@ -98,16 +96,17 @@ func (f *file) getBlobAt(ctx context.Context, i int) (blob []byte, err error) {
|
|||||||
debug.Log("LoadBlob(%v, %v) failed: %v", f.node.Name, f.node.Content[i], err)
|
debug.Log("LoadBlob(%v, %v) failed: %v", f.node.Name, f.node.Content[i], err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
f.blobs[i] = blob
|
|
||||||
|
f.root.blobCache.add(f.node.Content[i], blob)
|
||||||
|
|
||||||
return blob, nil
|
return blob, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *file) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
func (f *file) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
||||||
debug.Log("Read(%v, %v, %v), file size %v", f.node.Name, req.Size, req.Offset, f.node.Size)
|
debug.Log("Read(%v, %v, %v), file size %v", f.node.Name, req.Size, req.Offset, f.node.Size)
|
||||||
offset := req.Offset
|
offset := uint64(req.Offset)
|
||||||
|
|
||||||
if uint64(offset) > f.node.Size {
|
if offset > f.node.Size {
|
||||||
debug.Log("Read(%v): offset is greater than file size: %v > %v",
|
debug.Log("Read(%v): offset is greater than file size: %v > %v",
|
||||||
f.node.Name, req.Offset, f.node.Size)
|
f.node.Name, req.Offset, f.node.Size)
|
||||||
|
|
||||||
@ -123,16 +122,15 @@ func (f *file) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadR
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Skip blobs before the offset
|
// Skip blobs before the offset
|
||||||
startContent := 0
|
startContent := -1 + sort.Search(len(f.cumsize), func(i int) bool {
|
||||||
for offset > int64(f.sizes[startContent]) {
|
return f.cumsize[i] > offset
|
||||||
offset -= int64(f.sizes[startContent])
|
})
|
||||||
startContent++
|
offset -= f.cumsize[startContent]
|
||||||
}
|
|
||||||
|
|
||||||
dst := resp.Data[0:req.Size]
|
dst := resp.Data[0:req.Size]
|
||||||
readBytes := 0
|
readBytes := 0
|
||||||
remainingBytes := req.Size
|
remainingBytes := req.Size
|
||||||
for i := startContent; remainingBytes > 0 && i < len(f.sizes); i++ {
|
for i := startContent; remainingBytes > 0 && i < len(f.cumsize)-1; i++ {
|
||||||
blob, err := f.getBlobAt(ctx, i)
|
blob, err := f.getBlobAt(ctx, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -154,13 +152,6 @@ func (f *file) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadR
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *file) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
|
|
||||||
for i := range f.blobs {
|
|
||||||
f.blobs[i] = nil
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *file) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
|
func (f *file) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
|
||||||
debug.Log("Listxattr(%v, %v)", f.node.Name, req.Size)
|
debug.Log("Listxattr(%v, %v)", f.node.Name, req.Size)
|
||||||
for _, attr := range f.node.ExtendedAttributes {
|
for _, attr := range f.node.ExtendedAttributes {
|
||||||
|
@ -20,6 +20,48 @@ import (
|
|||||||
rtest "github.com/restic/restic/internal/test"
|
rtest "github.com/restic/restic/internal/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestCache(t *testing.T) {
|
||||||
|
var id1, id2, id3 restic.ID
|
||||||
|
id1[0] = 1
|
||||||
|
id2[0] = 2
|
||||||
|
id3[0] = 3
|
||||||
|
|
||||||
|
const (
|
||||||
|
kiB = 1 << 10
|
||||||
|
cacheSize = 64*kiB + 3*cacheOverhead
|
||||||
|
)
|
||||||
|
|
||||||
|
c := newBlobCache(cacheSize)
|
||||||
|
|
||||||
|
addAndCheck := func(id restic.ID, exp []byte) {
|
||||||
|
c.add(id, exp)
|
||||||
|
blob, ok := c.get(id)
|
||||||
|
rtest.Assert(t, ok, "blob %v added but not found in cache", id)
|
||||||
|
rtest.Equals(t, &exp[0], &blob[0])
|
||||||
|
rtest.Equals(t, exp, blob)
|
||||||
|
}
|
||||||
|
|
||||||
|
addAndCheck(id1, make([]byte, 32*kiB))
|
||||||
|
addAndCheck(id2, make([]byte, 30*kiB))
|
||||||
|
addAndCheck(id3, make([]byte, 10*kiB))
|
||||||
|
|
||||||
|
_, ok := c.get(id2)
|
||||||
|
rtest.Assert(t, ok, "blob %v not present", id2)
|
||||||
|
_, ok = c.get(id1)
|
||||||
|
rtest.Assert(t, !ok, "blob %v present, but should have been evicted", id1)
|
||||||
|
|
||||||
|
c.add(id1, make([]byte, 1+c.size))
|
||||||
|
_, ok = c.get(id1)
|
||||||
|
rtest.Assert(t, !ok, "blob %v too large but still added to cache")
|
||||||
|
|
||||||
|
c.c.Remove(id1)
|
||||||
|
c.c.Remove(id3)
|
||||||
|
c.c.Remove(id2)
|
||||||
|
|
||||||
|
rtest.Equals(t, cacheSize, c.size)
|
||||||
|
rtest.Equals(t, cacheSize, c.free)
|
||||||
|
}
|
||||||
|
|
||||||
func testRead(t testing.TB, f *file, offset, length int, data []byte) {
|
func testRead(t testing.TB, f *file, offset, length int, data []byte) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -114,10 +156,7 @@ func TestFuseFile(t *testing.T) {
|
|||||||
Size: filesize,
|
Size: filesize,
|
||||||
Content: content,
|
Content: content,
|
||||||
}
|
}
|
||||||
root := &Root{
|
root := NewRoot(context.TODO(), repo, Config{})
|
||||||
blobSizeCache: NewBlobSizeCache(context.TODO(), repo.Index()),
|
|
||||||
repo: repo,
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("blob cache has %d entries", len(root.blobSizeCache.m))
|
t.Logf("blob cache has %d entries", len(root.blobSizeCache.m))
|
||||||
|
|
||||||
@ -146,8 +185,6 @@ func TestFuseFile(t *testing.T) {
|
|||||||
t.Errorf("test %d failed, wrong data returned (offset %v, length %v)", i, offset, length)
|
t.Errorf("test %d failed, wrong data returned (offset %v, length %v)", i, offset, length)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rtest.OK(t, f.Release(ctx, nil))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test top-level directories for their UID and GID.
|
// Test top-level directories for their UID and GID.
|
||||||
@ -165,11 +202,10 @@ func testTopUidGid(t *testing.T, cfg Config, repo restic.Repository, uid, gid ui
|
|||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
root, err := NewRoot(ctx, repo, cfg)
|
root := NewRoot(ctx, repo, cfg)
|
||||||
rtest.OK(t, err)
|
|
||||||
|
|
||||||
var attr fuse.Attr
|
var attr fuse.Attr
|
||||||
err = root.Attr(ctx, &attr)
|
err := root.Attr(ctx, &attr)
|
||||||
rtest.OK(t, err)
|
rtest.OK(t, err)
|
||||||
rtest.Equals(t, uid, attr.Uid)
|
rtest.Equals(t, uid, attr.Uid)
|
||||||
rtest.Equals(t, gid, attr.Gid)
|
rtest.Equals(t, gid, attr.Gid)
|
||||||
|
@ -29,6 +29,7 @@ type Root struct {
|
|||||||
cfg Config
|
cfg Config
|
||||||
inode uint64
|
inode uint64
|
||||||
snapshots restic.Snapshots
|
snapshots restic.Snapshots
|
||||||
|
blobCache *blobCache
|
||||||
blobSizeCache *BlobSizeCache
|
blobSizeCache *BlobSizeCache
|
||||||
|
|
||||||
snCount int
|
snCount int
|
||||||
@ -45,14 +46,18 @@ var _ = fs.NodeStringLookuper(&Root{})
|
|||||||
|
|
||||||
const rootInode = 1
|
const rootInode = 1
|
||||||
|
|
||||||
|
// Size of the blob cache. TODO: make this configurable.
|
||||||
|
const blobCacheSize = 64 << 20
|
||||||
|
|
||||||
// NewRoot initializes a new root node from a repository.
|
// NewRoot initializes a new root node from a repository.
|
||||||
func NewRoot(ctx context.Context, repo restic.Repository, cfg Config) (*Root, error) {
|
func NewRoot(ctx context.Context, repo restic.Repository, cfg Config) *Root {
|
||||||
debug.Log("NewRoot(), config %v", cfg)
|
debug.Log("NewRoot(), config %v", cfg)
|
||||||
|
|
||||||
root := &Root{
|
root := &Root{
|
||||||
repo: repo,
|
repo: repo,
|
||||||
inode: rootInode,
|
inode: rootInode,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
|
blobCache: newBlobCache(blobCacheSize),
|
||||||
blobSizeCache: NewBlobSizeCache(ctx, repo.Index()),
|
blobSizeCache: NewBlobSizeCache(ctx, repo.Index()),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,7 +75,7 @@ func NewRoot(ctx context.Context, repo restic.Repository, cfg Config) (*Root, er
|
|||||||
|
|
||||||
root.MetaDir = NewMetaDir(root, rootInode, entries)
|
root.MetaDir = NewMetaDir(root, rootInode, entries)
|
||||||
|
|
||||||
return root, nil
|
return root
|
||||||
}
|
}
|
||||||
|
|
||||||
// Root is just there to satisfy fs.Root, it returns itself.
|
// Root is just there to satisfy fs.Root, it returns itself.
|
||||||
|
Loading…
Reference in New Issue
Block a user