mirror of
https://github.com/octoleo/restic.git
synced 2025-01-14 19:19:44 +00:00
Fix rare 'file already closed' during restore
Fixes #2183 Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
This commit is contained in:
parent
65b476ead9
commit
bf9a507148
@ -23,8 +23,8 @@ import (
|
|||||||
const (
|
const (
|
||||||
workerCount = 8
|
workerCount = 8
|
||||||
|
|
||||||
// max number of open output file handles
|
// max number of cached open output file handles
|
||||||
filesWriterCount = 32
|
filesWriterCacheCap = 32
|
||||||
|
|
||||||
// estimated average pack size used to calculate pack cache capacity
|
// estimated average pack size used to calculate pack cache capacity
|
||||||
averagePackSize = 5 * 1024 * 1024
|
averagePackSize = 5 * 1024 * 1024
|
||||||
@ -73,7 +73,7 @@ func newFileRestorer(dst string, packLoader func(ctx context.Context, h restic.H
|
|||||||
packLoader: packLoader,
|
packLoader: packLoader,
|
||||||
key: key,
|
key: key,
|
||||||
idx: idx,
|
idx: idx,
|
||||||
filesWriter: newFilesWriter(filesWriterCount),
|
filesWriter: newFilesWriter(filesWriterCacheCap),
|
||||||
packCache: newPackCache(packCacheCapacity),
|
packCache: newPackCache(packCacheCapacity),
|
||||||
dst: dst,
|
dst: dst,
|
||||||
}
|
}
|
||||||
|
@ -178,7 +178,8 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
rtest.Equals(t, false, r.filesWriter.writers.Contains(target))
|
_, contains := r.filesWriter.cache[target]
|
||||||
|
rtest.Equals(t, false, contains)
|
||||||
|
|
||||||
content := repo.fileContent(file)
|
content := repo.fileContent(file)
|
||||||
if !bytes.Equal(data, []byte(content)) {
|
if !bytes.Equal(data, []byte(content)) {
|
||||||
|
@ -1,36 +1,55 @@
|
|||||||
package restorer
|
package restorer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/hashicorp/golang-lru/simplelru"
|
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Writes blobs to output files. Each file is written sequentially,
|
||||||
|
// start to finish, but multiple files can be written to concurrently.
|
||||||
|
// Implementation allows virtually unlimited number of logically open
|
||||||
|
// files, but number of phisically open files will never exceed number
|
||||||
|
// of concurrent writeToFile invocations plus cacheCap.
|
||||||
type filesWriter struct {
|
type filesWriter struct {
|
||||||
lock sync.Mutex // guards concurrent access
|
lock sync.Mutex // guards concurrent access to open files cache
|
||||||
inprogress map[string]struct{} // (logically) opened file writers
|
inprogress map[string]struct{} // (logically) opened file writers
|
||||||
writers simplelru.LRUCache // key: string, value: *os.File
|
cache map[string]*os.File // cache of open files
|
||||||
|
cacheCap int // max number of cached open files
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFilesWriter(count int) *filesWriter {
|
func newFilesWriter(cacheCap int) *filesWriter {
|
||||||
writers, _ := simplelru.NewLRU(count, func(key interface{}, value interface{}) {
|
return &filesWriter{
|
||||||
value.(*os.File).Close()
|
inprogress: make(map[string]struct{}),
|
||||||
debug.Log("Closed and purged cached writer for %v", key)
|
cache: make(map[string]*os.File),
|
||||||
})
|
cacheCap: cacheCap,
|
||||||
return &filesWriter{inprogress: make(map[string]struct{}), writers: writers}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *filesWriter) writeToFile(path string, buf []byte) error {
|
func (w *filesWriter) writeToFile(path string, blob []byte) error {
|
||||||
acquireWriter := func() (io.Writer, error) {
|
// First writeToFile invocation for any given path will:
|
||||||
|
// - create and open the file
|
||||||
|
// - write the blob to the file
|
||||||
|
// - cache the open file if there is space, close the file otherwise
|
||||||
|
// Subsequent invocations will:
|
||||||
|
// - remove the open file from the cache _or_ open the file for append
|
||||||
|
// - write the blob to the file
|
||||||
|
// - cache the open file if there is space, close the file otherwise
|
||||||
|
// The idea is to cap maximum number of open files with minimal
|
||||||
|
// coordination among concurrent writeToFile invocations (note that
|
||||||
|
// writeToFile never touches somebody else's open file).
|
||||||
|
|
||||||
|
// TODO measure if caching is useful (likely depends on operating system
|
||||||
|
// and hardware configuration)
|
||||||
|
acquireWriter := func() (*os.File, error) {
|
||||||
w.lock.Lock()
|
w.lock.Lock()
|
||||||
defer w.lock.Unlock()
|
defer w.lock.Unlock()
|
||||||
if wr, ok := w.writers.Get(path); ok {
|
if wr, ok := w.cache[path]; ok {
|
||||||
debug.Log("Used cached writer for %s", path)
|
debug.Log("Used cached writer for %s", path)
|
||||||
return wr.(*os.File), nil
|
delete(w.cache, path)
|
||||||
|
return wr, nil
|
||||||
}
|
}
|
||||||
var flags int
|
var flags int
|
||||||
if _, append := w.inprogress[path]; append {
|
if _, append := w.inprogress[path]; append {
|
||||||
@ -43,21 +62,30 @@ func (w *filesWriter) writeToFile(path string, buf []byte) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
w.writers.Add(path, wr)
|
debug.Log("Opened writer for %s", path)
|
||||||
debug.Log("Opened and cached writer for %s", path)
|
|
||||||
return wr, nil
|
return wr, nil
|
||||||
}
|
}
|
||||||
|
cacheOrCloseWriter := func(wr *os.File) {
|
||||||
|
w.lock.Lock()
|
||||||
|
defer w.lock.Unlock()
|
||||||
|
if len(w.cache) < w.cacheCap {
|
||||||
|
w.cache[path] = wr
|
||||||
|
} else {
|
||||||
|
wr.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
wr, err := acquireWriter()
|
wr, err := acquireWriter()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
n, err := wr.Write(buf)
|
n, err := wr.Write(blob)
|
||||||
|
cacheOrCloseWriter(wr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if n != len(buf) {
|
if n != len(blob) {
|
||||||
return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(buf), n)
|
return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(blob), n)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -65,6 +93,9 @@ func (w *filesWriter) writeToFile(path string, buf []byte) error {
|
|||||||
func (w *filesWriter) close(path string) {
|
func (w *filesWriter) close(path string) {
|
||||||
w.lock.Lock()
|
w.lock.Lock()
|
||||||
defer w.lock.Unlock()
|
defer w.lock.Unlock()
|
||||||
w.writers.Remove(path)
|
if wr, ok := w.cache[path]; ok {
|
||||||
|
wr.Close()
|
||||||
|
delete(w.cache, path)
|
||||||
|
}
|
||||||
delete(w.inprogress, path)
|
delete(w.inprogress, path)
|
||||||
}
|
}
|
||||||
|
@ -17,21 +17,21 @@ func TestFilesWriterBasic(t *testing.T) {
|
|||||||
f2 := dir + "/f2"
|
f2 := dir + "/f2"
|
||||||
|
|
||||||
rtest.OK(t, w.writeToFile(f1, []byte{1}))
|
rtest.OK(t, w.writeToFile(f1, []byte{1}))
|
||||||
rtest.Equals(t, 1, w.writers.Len())
|
rtest.Equals(t, 1, len(w.cache))
|
||||||
rtest.Equals(t, 1, len(w.inprogress))
|
rtest.Equals(t, 1, len(w.inprogress))
|
||||||
|
|
||||||
rtest.OK(t, w.writeToFile(f2, []byte{2}))
|
rtest.OK(t, w.writeToFile(f2, []byte{2}))
|
||||||
rtest.Equals(t, 1, w.writers.Len())
|
rtest.Equals(t, 1, len(w.cache))
|
||||||
rtest.Equals(t, 2, len(w.inprogress))
|
rtest.Equals(t, 2, len(w.inprogress))
|
||||||
|
|
||||||
rtest.OK(t, w.writeToFile(f1, []byte{1}))
|
rtest.OK(t, w.writeToFile(f1, []byte{1}))
|
||||||
w.close(f1)
|
w.close(f1)
|
||||||
rtest.Equals(t, 0, w.writers.Len())
|
rtest.Equals(t, 0, len(w.cache))
|
||||||
rtest.Equals(t, 1, len(w.inprogress))
|
rtest.Equals(t, 1, len(w.inprogress))
|
||||||
|
|
||||||
rtest.OK(t, w.writeToFile(f2, []byte{2}))
|
rtest.OK(t, w.writeToFile(f2, []byte{2}))
|
||||||
w.close(f2)
|
w.close(f2)
|
||||||
rtest.Equals(t, 0, w.writers.Len())
|
rtest.Equals(t, 0, len(w.cache))
|
||||||
rtest.Equals(t, 0, len(w.inprogress))
|
rtest.Equals(t, 0, len(w.inprogress))
|
||||||
|
|
||||||
buf, err := ioutil.ReadFile(f1)
|
buf, err := ioutil.ReadFile(f1)
|
||||||
|
Loading…
Reference in New Issue
Block a user