mirror of
https://github.com/octoleo/restic.git
synced 2024-11-26 14:56:29 +00:00
bf9a507148
Fixes #2183 Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
102 lines
2.9 KiB
Go
102 lines
2.9 KiB
Go
package restorer
|
|
|
|
import (
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/restic/restic/internal/debug"
|
|
"github.com/restic/restic/internal/errors"
|
|
)
|
|
|
|
// Writes blobs to output files. Each file is written sequentially,
|
|
// start to finish, but multiple files can be written to concurrently.
|
|
// Implementation allows virtually unlimited number of logically open
|
|
// files, but number of phisically open files will never exceed number
|
|
// of concurrent writeToFile invocations plus cacheCap.
|
|
type filesWriter struct {
|
|
lock sync.Mutex // guards concurrent access to open files cache
|
|
inprogress map[string]struct{} // (logically) opened file writers
|
|
cache map[string]*os.File // cache of open files
|
|
cacheCap int // max number of cached open files
|
|
}
|
|
|
|
func newFilesWriter(cacheCap int) *filesWriter {
|
|
return &filesWriter{
|
|
inprogress: make(map[string]struct{}),
|
|
cache: make(map[string]*os.File),
|
|
cacheCap: cacheCap,
|
|
}
|
|
}
|
|
|
|
func (w *filesWriter) writeToFile(path string, blob []byte) error {
|
|
// First writeToFile invocation for any given path will:
|
|
// - create and open the file
|
|
// - write the blob to the file
|
|
// - cache the open file if there is space, close the file otherwise
|
|
// Subsequent invocations will:
|
|
// - remove the open file from the cache _or_ open the file for append
|
|
// - write the blob to the file
|
|
// - cache the open file if there is space, close the file otherwise
|
|
// The idea is to cap maximum number of open files with minimal
|
|
// coordination among concurrent writeToFile invocations (note that
|
|
// writeToFile never touches somebody else's open file).
|
|
|
|
// TODO measure if caching is useful (likely depends on operating system
|
|
// and hardware configuration)
|
|
acquireWriter := func() (*os.File, error) {
|
|
w.lock.Lock()
|
|
defer w.lock.Unlock()
|
|
if wr, ok := w.cache[path]; ok {
|
|
debug.Log("Used cached writer for %s", path)
|
|
delete(w.cache, path)
|
|
return wr, nil
|
|
}
|
|
var flags int
|
|
if _, append := w.inprogress[path]; append {
|
|
flags = os.O_APPEND | os.O_WRONLY
|
|
} else {
|
|
w.inprogress[path] = struct{}{}
|
|
flags = os.O_CREATE | os.O_TRUNC | os.O_WRONLY
|
|
}
|
|
wr, err := os.OpenFile(path, flags, 0600)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
debug.Log("Opened writer for %s", path)
|
|
return wr, nil
|
|
}
|
|
cacheOrCloseWriter := func(wr *os.File) {
|
|
w.lock.Lock()
|
|
defer w.lock.Unlock()
|
|
if len(w.cache) < w.cacheCap {
|
|
w.cache[path] = wr
|
|
} else {
|
|
wr.Close()
|
|
}
|
|
}
|
|
|
|
wr, err := acquireWriter()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n, err := wr.Write(blob)
|
|
cacheOrCloseWriter(wr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if n != len(blob) {
|
|
return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(blob), n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *filesWriter) close(path string) {
|
|
w.lock.Lock()
|
|
defer w.lock.Unlock()
|
|
if wr, ok := w.cache[path]; ok {
|
|
wr.Close()
|
|
delete(w.cache, path)
|
|
}
|
|
delete(w.inprogress, path)
|
|
}
|