2
2
mirror of https://github.com/octoleo/restic.git synced 2025-01-17 20:07:18 +00:00
restic/internal/backend/cache/backend.go

261 lines
6.2 KiB
Go
Raw Normal View History

package cache
import (
"context"
"fmt"
"io"
"os"
"sync"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
)
// Backend wraps a restic.Backend and adds a cache.
type Backend struct {
backend.Backend
2017-09-24 20:24:11 +00:00
*Cache
// inProgress contains the handle for all files that are currently
// downloaded. The channel in the value is closed as soon as the download
// is finished.
inProgressMutex sync.Mutex
inProgress map[backend.Handle]chan struct{}
}
// ensure Backend implements backend.Backend
var _ backend.Backend = &Backend{}
func newBackend(be backend.Backend, c *Cache) *Backend {
return &Backend{
Backend: be,
Cache: c,
inProgress: make(map[backend.Handle]chan struct{}),
}
}
// Remove deletes a file from the backend and the cache if it has been cached.
func (b *Backend) Remove(ctx context.Context, h backend.Handle) error {
debug.Log("cache Remove(%v)", h)
err := b.Backend.Remove(ctx, h)
if err != nil {
return err
}
_, err = b.Cache.remove(h)
return err
}
func autoCacheTypes(h backend.Handle) bool {
2020-07-28 08:13:11 +00:00
switch h.Type {
case backend.IndexFile, backend.SnapshotFile:
2020-07-28 08:13:11 +00:00
return true
case backend.PackFile:
2023-10-01 08:52:57 +00:00
return h.IsMetadata
2020-07-28 08:13:11 +00:00
}
return false
}
// Save stores a new file in the backend and the cache.
func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
2020-07-28 08:13:11 +00:00
if !autoCacheTypes(h) {
return b.Backend.Save(ctx, h, rd)
}
2017-09-24 20:24:01 +00:00
debug.Log("Save(%v): auto-store in the cache", h)
// make sure the reader is at the start
err := rd.Rewind()
if err != nil {
return err
}
// first, save in the backend
err = b.Backend.Save(ctx, h, rd)
if err != nil {
return err
}
// next, save in the cache
err = rd.Rewind()
if err != nil {
return err
}
2024-05-09 11:07:21 +00:00
err = b.Cache.save(h, rd)
if err != nil {
debug.Log("unable to save %v to cache: %v", h, err)
return err
}
return nil
}
func (b *Backend) cacheFile(ctx context.Context, h backend.Handle) error {
finish := make(chan struct{})
b.inProgressMutex.Lock()
other, alreadyDownloading := b.inProgress[h]
if !alreadyDownloading {
b.inProgress[h] = finish
}
b.inProgressMutex.Unlock()
if alreadyDownloading {
debug.Log("readahead %v is already performed by somebody else, delegating...", h)
<-other
debug.Log("download %v finished", h)
return nil
}
defer func() {
// signal other waiting goroutines that the file may now be cached
close(finish)
// remove the finish channel from the map
b.inProgressMutex.Lock()
delete(b.inProgress, h)
b.inProgressMutex.Unlock()
}()
2018-10-04 12:30:07 +00:00
// test again, maybe the file was cached in the meantime
if !b.Cache.Has(h) {
// nope, it's still not in the cache, pull it from the repo and save it
err := b.Backend.Load(ctx, h, 0, 0, func(rd io.Reader) error {
2024-05-09 11:07:21 +00:00
return b.Cache.save(h, rd)
})
if err != nil {
// try to remove from the cache, ignore errors
_, _ = b.Cache.remove(h)
}
return err
}
2018-10-04 12:30:07 +00:00
return nil
}
// loadFromCache will try to load the file from the cache.
func (b *Backend) loadFromCache(h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) (bool, error) {
rd, inCache, err := b.Cache.load(h, length, offset)
if err != nil {
return inCache, err
}
err = consumer(rd)
if err != nil {
_ = rd.Close() // ignore secondary errors
return true, err
}
return true, rd.Close()
}
// Load loads a file from the cache or the backend.
func (b *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
b.inProgressMutex.Lock()
waitForFinish, inProgress := b.inProgress[h]
b.inProgressMutex.Unlock()
if inProgress {
debug.Log("downloading %v is already in progress, waiting for finish", h)
<-waitForFinish
debug.Log("downloading %v finished", h)
}
// try loading from cache without checking that the handle is actually cached
inCache, err := b.loadFromCache(h, length, offset, consumer)
if inCache {
2024-08-03 16:51:38 +00:00
if err != nil {
debug.Log("error loading %v from cache: %v", h, err)
}
// the caller must explicitly use cache.Forget() to remove the cache entry
return err
}
// if we don't automatically cache this file type, fall back to the backend
2020-07-28 08:13:11 +00:00
if !autoCacheTypes(h) {
debug.Log("Load(%v, %v, %v): delegating to backend", h, length, offset)
return b.Backend.Load(ctx, h, length, offset, consumer)
}
debug.Log("auto-store %v in the cache", h)
err = b.cacheFile(ctx, h)
if err != nil {
return err
}
inCache, err = b.loadFromCache(h, length, offset, consumer)
if inCache {
2024-05-09 19:21:08 +00:00
if err != nil {
debug.Log("error loading %v from cache: %v", h, err)
}
return err
}
debug.Log("error caching %v: %v, falling back to backend", h, err)
return b.Backend.Load(ctx, h, length, offset, consumer)
}
// Stat tests whether the backend has a file. If it does not exist but still
// exists in the cache, it is removed from the cache.
func (b *Backend) Stat(ctx context.Context, h backend.Handle) (backend.FileInfo, error) {
debug.Log("cache Stat(%v)", h)
fi, err := b.Backend.Stat(ctx, h)
2024-05-09 11:07:21 +00:00
if err != nil && b.Backend.IsNotExist(err) {
// try to remove from the cache, ignore errors
_, _ = b.Cache.remove(h)
}
return fi, err
}
// IsNotExist returns true if the error is caused by a non-existing file.
func (b *Backend) IsNotExist(err error) bool {
return b.Backend.IsNotExist(err)
}
2023-04-08 10:53:43 +00:00
func (b *Backend) Unwrap() backend.Backend {
2023-04-08 10:53:43 +00:00
return b.Backend
}
func (b *Backend) List(ctx context.Context, t backend.FileType, fn func(f backend.FileInfo) error) error {
if !b.Cache.canBeCached(t) {
return b.Backend.List(ctx, t, fn)
}
// will contain the IDs of the files that are in the repository
ids := restic.NewIDSet()
// wrap the original function to also add the file to the ids set
wrapFn := func(f backend.FileInfo) error {
id, err := restic.ParseID(f.Name)
if err != nil {
// ignore files with invalid name
return nil
}
ids.Insert(id)
// execute the original function
return fn(f)
}
err := b.Backend.List(ctx, t, wrapFn)
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
// clear the cache for files that are not in the repo anymore, ignore errors
err = b.Cache.Clear(t, ids)
if err != nil {
fmt.Fprintf(os.Stderr, "error clearing %s files in cache: %v\n", t.String(), err)
}
return nil
}