From bf9a50714881cfa421ec3aff05df39cf20126ca9 Mon Sep 17 00:00:00 2001 From: Igor Fedorenko Date: Sun, 24 Feb 2019 21:50:40 -0800 Subject: [PATCH] Fix rare 'file already closed' during restore Fixes #2183 Signed-off-by: Igor Fedorenko --- internal/restorer/filerestorer.go | 6 +-- internal/restorer/filerestorer_test.go | 3 +- internal/restorer/fileswriter.go | 71 ++++++++++++++++++-------- internal/restorer/fileswriter_test.go | 8 +-- 4 files changed, 60 insertions(+), 28 deletions(-) diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 4baf9b567..c59b061b0 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -23,8 +23,8 @@ import ( const ( workerCount = 8 - // max number of open output file handles - filesWriterCount = 32 + // max number of cached open output file handles + filesWriterCacheCap = 32 // estimated average pack size used to calculate pack cache capacity averagePackSize = 5 * 1024 * 1024 @@ -73,7 +73,7 @@ func newFileRestorer(dst string, packLoader func(ctx context.Context, h restic.H packLoader: packLoader, key: key, idx: idx, - filesWriter: newFilesWriter(filesWriterCount), + filesWriter: newFilesWriter(filesWriterCacheCap), packCache: newPackCache(packCacheCapacity), dst: dst, } diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index dd022e9d4..db683b10e 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -178,7 +178,8 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) { continue } - rtest.Equals(t, false, r.filesWriter.writers.Contains(target)) + _, contains := r.filesWriter.cache[target] + rtest.Equals(t, false, contains) content := repo.fileContent(file) if !bytes.Equal(data, []byte(content)) { diff --git a/internal/restorer/fileswriter.go b/internal/restorer/fileswriter.go index af7ea8428..b3beb8161 100644 --- a/internal/restorer/fileswriter.go +++ b/internal/restorer/fileswriter.go @@ -1,36 +1,55 @@ package restorer import ( - "io" "os" "sync" - "github.com/hashicorp/golang-lru/simplelru" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" ) +// Writes blobs to output files. Each file is written sequentially, +// start to finish, but multiple files can be written to concurrently. +// Implementation allows virtually unlimited number of logically open +// files, but number of phisically open files will never exceed number +// of concurrent writeToFile invocations plus cacheCap. type filesWriter struct { - lock sync.Mutex // guards concurrent access + lock sync.Mutex // guards concurrent access to open files cache inprogress map[string]struct{} // (logically) opened file writers - writers simplelru.LRUCache // key: string, value: *os.File + cache map[string]*os.File // cache of open files + cacheCap int // max number of cached open files } -func newFilesWriter(count int) *filesWriter { - writers, _ := simplelru.NewLRU(count, func(key interface{}, value interface{}) { - value.(*os.File).Close() - debug.Log("Closed and purged cached writer for %v", key) - }) - return &filesWriter{inprogress: make(map[string]struct{}), writers: writers} +func newFilesWriter(cacheCap int) *filesWriter { + return &filesWriter{ + inprogress: make(map[string]struct{}), + cache: make(map[string]*os.File), + cacheCap: cacheCap, + } } -func (w *filesWriter) writeToFile(path string, buf []byte) error { - acquireWriter := func() (io.Writer, error) { +func (w *filesWriter) writeToFile(path string, blob []byte) error { + // First writeToFile invocation for any given path will: + // - create and open the file + // - write the blob to the file + // - cache the open file if there is space, close the file otherwise + // Subsequent invocations will: + // - remove the open file from the cache _or_ open the file for append + // - write the blob to the file + // - cache the open file if there is space, close the file otherwise + // The idea is to cap maximum number of open files with minimal + // coordination among concurrent writeToFile invocations (note that + // writeToFile never touches somebody else's open file). + + // TODO measure if caching is useful (likely depends on operating system + // and hardware configuration) + acquireWriter := func() (*os.File, error) { w.lock.Lock() defer w.lock.Unlock() - if wr, ok := w.writers.Get(path); ok { + if wr, ok := w.cache[path]; ok { debug.Log("Used cached writer for %s", path) - return wr.(*os.File), nil + delete(w.cache, path) + return wr, nil } var flags int if _, append := w.inprogress[path]; append { @@ -43,21 +62,30 @@ func (w *filesWriter) writeToFile(path string, buf []byte) error { if err != nil { return nil, err } - w.writers.Add(path, wr) - debug.Log("Opened and cached writer for %s", path) + debug.Log("Opened writer for %s", path) return wr, nil } + cacheOrCloseWriter := func(wr *os.File) { + w.lock.Lock() + defer w.lock.Unlock() + if len(w.cache) < w.cacheCap { + w.cache[path] = wr + } else { + wr.Close() + } + } wr, err := acquireWriter() if err != nil { return err } - n, err := wr.Write(buf) + n, err := wr.Write(blob) + cacheOrCloseWriter(wr) if err != nil { return err } - if n != len(buf) { - return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(buf), n) + if n != len(blob) { + return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(blob), n) } return nil } @@ -65,6 +93,9 @@ func (w *filesWriter) writeToFile(path string, buf []byte) error { func (w *filesWriter) close(path string) { w.lock.Lock() defer w.lock.Unlock() - w.writers.Remove(path) + if wr, ok := w.cache[path]; ok { + wr.Close() + delete(w.cache, path) + } delete(w.inprogress, path) } diff --git a/internal/restorer/fileswriter_test.go b/internal/restorer/fileswriter_test.go index 45c2a88fb..ada7f2107 100644 --- a/internal/restorer/fileswriter_test.go +++ b/internal/restorer/fileswriter_test.go @@ -17,21 +17,21 @@ func TestFilesWriterBasic(t *testing.T) { f2 := dir + "/f2" rtest.OK(t, w.writeToFile(f1, []byte{1})) - rtest.Equals(t, 1, w.writers.Len()) + rtest.Equals(t, 1, len(w.cache)) rtest.Equals(t, 1, len(w.inprogress)) rtest.OK(t, w.writeToFile(f2, []byte{2})) - rtest.Equals(t, 1, w.writers.Len()) + rtest.Equals(t, 1, len(w.cache)) rtest.Equals(t, 2, len(w.inprogress)) rtest.OK(t, w.writeToFile(f1, []byte{1})) w.close(f1) - rtest.Equals(t, 0, w.writers.Len()) + rtest.Equals(t, 0, len(w.cache)) rtest.Equals(t, 1, len(w.inprogress)) rtest.OK(t, w.writeToFile(f2, []byte{2})) w.close(f2) - rtest.Equals(t, 0, w.writers.Len()) + rtest.Equals(t, 0, len(w.cache)) rtest.Equals(t, 0, len(w.inprogress)) buf, err := ioutil.ReadFile(f1)