mirror of
https://github.com/octoleo/restic.git
synced 2024-11-10 23:31:09 +00:00
Merge pull request #3475 from MichaelEischer/local-sftp-conn-limit
Limit concurrent operations for local / sftp backend
This commit is contained in:
commit
04e054465a
@ -226,18 +226,6 @@ func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.Rewi
|
|||||||
return errors.Wrap(err, "PutBlockList")
|
return errors.Wrap(err, "PutBlockList")
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
|
|
||||||
type wrapReader struct {
|
|
||||||
io.ReadCloser
|
|
||||||
f func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wr wrapReader) Close() error {
|
|
||||||
err := wr.ReadCloser.Close()
|
|
||||||
wr.f()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||||
// given offset.
|
// given offset.
|
||||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||||
@ -278,15 +266,7 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
closeRd := wrapReader{
|
return be.sem.ReleaseTokenOnClose(rd, nil), err
|
||||||
ReadCloser: rd,
|
|
||||||
f: func() {
|
|
||||||
debug.Log("Close()")
|
|
||||||
be.sem.ReleaseToken()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return closeRd, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns information about a blob.
|
// Stat returns information about a blob.
|
||||||
|
@ -263,18 +263,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
|
|
||||||
type wrapReader struct {
|
|
||||||
io.ReadCloser
|
|
||||||
f func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wr wrapReader) Close() error {
|
|
||||||
err := wr.ReadCloser.Close()
|
|
||||||
wr.f()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||||
// given offset.
|
// given offset.
|
||||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||||
@ -303,21 +291,16 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
|
|||||||
|
|
||||||
be.sem.GetToken()
|
be.sem.GetToken()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length))
|
r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cancel()
|
||||||
be.sem.ReleaseToken()
|
be.sem.ReleaseToken()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
closeRd := wrapReader{
|
return be.sem.ReleaseTokenOnClose(r, cancel), err
|
||||||
ReadCloser: r,
|
|
||||||
f: func() {
|
|
||||||
debug.Log("Close()")
|
|
||||||
be.sem.ReleaseToken()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return closeRd, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns information about a blob.
|
// Stat returns information about a blob.
|
||||||
|
@ -11,6 +11,15 @@ import (
|
|||||||
type Config struct {
|
type Config struct {
|
||||||
Path string
|
Path string
|
||||||
Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"`
|
Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"`
|
||||||
|
|
||||||
|
Connections uint `option:"connections" help:"set a limit for the number of concurrent operations (default: 2)"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewConfig returns a new config with default options applied.
|
||||||
|
func NewConfig() Config {
|
||||||
|
return Config{
|
||||||
|
Connections: 2,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -18,10 +27,12 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ParseConfig parses a local backend config.
|
// ParseConfig parses a local backend config.
|
||||||
func ParseConfig(cfg string) (interface{}, error) {
|
func ParseConfig(s string) (interface{}, error) {
|
||||||
if !strings.HasPrefix(cfg, "local:") {
|
if !strings.HasPrefix(s, "local:") {
|
||||||
return nil, errors.New(`invalid format, prefix "local" not found`)
|
return nil, errors.New(`invalid format, prefix "local" not found`)
|
||||||
}
|
}
|
||||||
|
|
||||||
return Config{Path: cfg[6:]}, nil
|
cfg := NewConfig()
|
||||||
|
cfg.Path = s[6:]
|
||||||
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
@ -37,8 +37,9 @@ func TestLayout(t *testing.T) {
|
|||||||
|
|
||||||
repo := filepath.Join(path, "repo")
|
repo := filepath.Join(path, "repo")
|
||||||
be, err := Open(context.TODO(), Config{
|
be, err := Open(context.TODO(), Config{
|
||||||
Path: repo,
|
Path: repo,
|
||||||
Layout: test.layout,
|
Layout: test.layout,
|
||||||
|
Connections: 2,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
// Local is a backend in a local directory.
|
// Local is a backend in a local directory.
|
||||||
type Local struct {
|
type Local struct {
|
||||||
Config
|
Config
|
||||||
|
sem *backend.Semaphore
|
||||||
backend.Layout
|
backend.Layout
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -30,15 +31,28 @@ var _ restic.Backend = &Local{}
|
|||||||
|
|
||||||
const defaultLayout = "default"
|
const defaultLayout = "default"
|
||||||
|
|
||||||
// Open opens the local backend as specified by config.
|
func open(ctx context.Context, cfg Config) (*Local, error) {
|
||||||
func Open(ctx context.Context, cfg Config) (*Local, error) {
|
|
||||||
debug.Log("open local backend at %v (layout %q)", cfg.Path, cfg.Layout)
|
|
||||||
l, err := backend.ParseLayout(ctx, &backend.LocalFilesystem{}, cfg.Layout, defaultLayout, cfg.Path)
|
l, err := backend.ParseLayout(ctx, &backend.LocalFilesystem{}, cfg.Layout, defaultLayout, cfg.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Local{Config: cfg, Layout: l}, nil
|
sem, err := backend.NewSemaphore(cfg.Connections)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Local{
|
||||||
|
Config: cfg,
|
||||||
|
Layout: l,
|
||||||
|
sem: sem,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open opens the local backend as specified by config.
|
||||||
|
func Open(ctx context.Context, cfg Config) (*Local, error) {
|
||||||
|
debug.Log("open local backend at %v (layout %q)", cfg.Path, cfg.Layout)
|
||||||
|
return open(ctx, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create creates all the necessary files and directories for a new local
|
// Create creates all the necessary files and directories for a new local
|
||||||
@ -46,16 +60,11 @@ func Open(ctx context.Context, cfg Config) (*Local, error) {
|
|||||||
func Create(ctx context.Context, cfg Config) (*Local, error) {
|
func Create(ctx context.Context, cfg Config) (*Local, error) {
|
||||||
debug.Log("create local backend at %v (layout %q)", cfg.Path, cfg.Layout)
|
debug.Log("create local backend at %v (layout %q)", cfg.Path, cfg.Layout)
|
||||||
|
|
||||||
l, err := backend.ParseLayout(ctx, &backend.LocalFilesystem{}, cfg.Layout, defaultLayout, cfg.Path)
|
be, err := open(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
be := &Local{
|
|
||||||
Config: cfg,
|
|
||||||
Layout: l,
|
|
||||||
}
|
|
||||||
|
|
||||||
// test if config file already exists
|
// test if config file already exists
|
||||||
_, err = fs.Lstat(be.Filename(restic.Handle{Type: restic.ConfigFile}))
|
_, err = fs.Lstat(be.Filename(restic.Handle{Type: restic.ConfigFile}))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -73,6 +82,10 @@ func Create(ctx context.Context, cfg Config) (*Local, error) {
|
|||||||
return be, nil
|
return be, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Local) Connections() uint {
|
||||||
|
return b.Config.Connections
|
||||||
|
}
|
||||||
|
|
||||||
// Location returns this backend's location (the directory name).
|
// Location returns this backend's location (the directory name).
|
||||||
func (b *Local) Location() string {
|
func (b *Local) Location() string {
|
||||||
return b.Path
|
return b.Path
|
||||||
@ -105,6 +118,9 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReade
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
b.sem.GetToken()
|
||||||
|
defer b.sem.ReleaseToken()
|
||||||
|
|
||||||
// Create new file with a temporary name.
|
// Create new file with a temporary name.
|
||||||
tmpname := filepath.Base(finalname) + "-tmp-"
|
tmpname := filepath.Base(finalname) + "-tmp-"
|
||||||
f, err := tempFile(dir, tmpname)
|
f, err := tempFile(dir, tmpname)
|
||||||
@ -199,24 +215,29 @@ func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, off
|
|||||||
return nil, errors.New("offset is negative")
|
return nil, errors.New("offset is negative")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.sem.GetToken()
|
||||||
f, err := fs.Open(b.Filename(h))
|
f, err := fs.Open(b.Filename(h))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
b.sem.ReleaseToken()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
_, err = f.Seek(offset, 0)
|
_, err = f.Seek(offset, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
b.sem.ReleaseToken()
|
||||||
_ = f.Close()
|
_ = f.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r := b.sem.ReleaseTokenOnClose(f, nil)
|
||||||
|
|
||||||
if length > 0 {
|
if length > 0 {
|
||||||
return backend.LimitReadCloser(f, int64(length)), nil
|
return backend.LimitReadCloser(r, int64(length)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return f, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns information about a blob.
|
// Stat returns information about a blob.
|
||||||
@ -226,6 +247,9 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err
|
|||||||
return restic.FileInfo{}, backoff.Permanent(err)
|
return restic.FileInfo{}, backoff.Permanent(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.sem.GetToken()
|
||||||
|
defer b.sem.ReleaseToken()
|
||||||
|
|
||||||
fi, err := fs.Stat(b.Filename(h))
|
fi, err := fs.Stat(b.Filename(h))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return restic.FileInfo{}, errors.WithStack(err)
|
return restic.FileInfo{}, errors.WithStack(err)
|
||||||
@ -237,6 +261,10 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err
|
|||||||
// Test returns true if a blob of the given type and name exists in the backend.
|
// Test returns true if a blob of the given type and name exists in the backend.
|
||||||
func (b *Local) Test(ctx context.Context, h restic.Handle) (bool, error) {
|
func (b *Local) Test(ctx context.Context, h restic.Handle) (bool, error) {
|
||||||
debug.Log("Test %v", h)
|
debug.Log("Test %v", h)
|
||||||
|
|
||||||
|
b.sem.GetToken()
|
||||||
|
defer b.sem.ReleaseToken()
|
||||||
|
|
||||||
_, err := fs.Stat(b.Filename(h))
|
_, err := fs.Stat(b.Filename(h))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if b.IsNotExist(err) {
|
if b.IsNotExist(err) {
|
||||||
@ -253,6 +281,9 @@ func (b *Local) Remove(ctx context.Context, h restic.Handle) error {
|
|||||||
debug.Log("Remove %v", h)
|
debug.Log("Remove %v", h)
|
||||||
fn := b.Filename(h)
|
fn := b.Filename(h)
|
||||||
|
|
||||||
|
b.sem.GetToken()
|
||||||
|
defer b.sem.ReleaseToken()
|
||||||
|
|
||||||
// reset read-only flag
|
// reset read-only flag
|
||||||
err := fs.Chmod(fn, 0666)
|
err := fs.Chmod(fn, 0666)
|
||||||
if err != nil && !os.IsPermission(err) {
|
if err != nil && !os.IsPermission(err) {
|
||||||
|
@ -27,7 +27,7 @@ func TestNoSpacePermanent(t *testing.T) {
|
|||||||
dir, cleanup := rtest.TempDir(t)
|
dir, cleanup := rtest.TempDir(t)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
be, err := Open(context.Background(), Config{Path: dir})
|
be, err := Open(context.Background(), Config{Path: dir, Connections: 2})
|
||||||
rtest.OK(t, err)
|
rtest.OK(t, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
rtest.OK(t, be.Close())
|
rtest.OK(t, be.Close())
|
||||||
|
@ -25,7 +25,8 @@ func newTestSuite(t testing.TB) *test.Suite {
|
|||||||
t.Logf("create new backend at %v", dir)
|
t.Logf("create new backend at %v", dir)
|
||||||
|
|
||||||
cfg := local.Config{
|
cfg := local.Config{
|
||||||
Path: dir,
|
Path: dir,
|
||||||
|
Connections: 2,
|
||||||
}
|
}
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
},
|
},
|
||||||
|
@ -30,7 +30,8 @@ var parseTests = []struct {
|
|||||||
"local:/srv/repo",
|
"local:/srv/repo",
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: "/srv/repo",
|
Path: "/srv/repo",
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -38,7 +39,8 @@ var parseTests = []struct {
|
|||||||
"local:dir1/dir2",
|
"local:dir1/dir2",
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: "dir1/dir2",
|
Path: "dir1/dir2",
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -46,7 +48,8 @@ var parseTests = []struct {
|
|||||||
"local:dir1/dir2",
|
"local:dir1/dir2",
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: "dir1/dir2",
|
Path: "dir1/dir2",
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -54,7 +57,8 @@ var parseTests = []struct {
|
|||||||
"dir1/dir2",
|
"dir1/dir2",
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: "dir1/dir2",
|
Path: "dir1/dir2",
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -62,7 +66,8 @@ var parseTests = []struct {
|
|||||||
"/dir1/dir2",
|
"/dir1/dir2",
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: "/dir1/dir2",
|
Path: "/dir1/dir2",
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -70,7 +75,8 @@ var parseTests = []struct {
|
|||||||
"local:../dir1/dir2",
|
"local:../dir1/dir2",
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: "../dir1/dir2",
|
Path: "../dir1/dir2",
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -78,7 +84,8 @@ var parseTests = []struct {
|
|||||||
"/dir1/dir2",
|
"/dir1/dir2",
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: "/dir1/dir2",
|
Path: "/dir1/dir2",
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -86,7 +93,8 @@ var parseTests = []struct {
|
|||||||
"/dir1:foobar/dir2",
|
"/dir1:foobar/dir2",
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: "/dir1:foobar/dir2",
|
Path: "/dir1:foobar/dir2",
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -94,7 +102,8 @@ var parseTests = []struct {
|
|||||||
`\dir1\foobar\dir2`,
|
`\dir1\foobar\dir2`,
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: `\dir1\foobar\dir2`,
|
Path: `\dir1\foobar\dir2`,
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -102,7 +111,8 @@ var parseTests = []struct {
|
|||||||
`c:\dir1\foobar\dir2`,
|
`c:\dir1\foobar\dir2`,
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: `c:\dir1\foobar\dir2`,
|
Path: `c:\dir1\foobar\dir2`,
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -110,7 +120,8 @@ var parseTests = []struct {
|
|||||||
`C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`,
|
`C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`,
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: `C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`,
|
Path: `C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`,
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -118,7 +129,8 @@ var parseTests = []struct {
|
|||||||
`c:/dir1/foobar/dir2`,
|
`c:/dir1/foobar/dir2`,
|
||||||
Location{Scheme: "local",
|
Location{Scheme: "local",
|
||||||
Config: local.Config{
|
Config: local.Config{
|
||||||
Path: `c:/dir1/foobar/dir2`,
|
Path: `c:/dir1/foobar/dir2`,
|
||||||
|
Connections: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -126,9 +138,10 @@ var parseTests = []struct {
|
|||||||
"sftp:user@host:/srv/repo",
|
"sftp:user@host:/srv/repo",
|
||||||
Location{Scheme: "sftp",
|
Location{Scheme: "sftp",
|
||||||
Config: sftp.Config{
|
Config: sftp.Config{
|
||||||
User: "user",
|
User: "user",
|
||||||
Host: "host",
|
Host: "host",
|
||||||
Path: "/srv/repo",
|
Path: "/srv/repo",
|
||||||
|
Connections: 5,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -136,9 +149,10 @@ var parseTests = []struct {
|
|||||||
"sftp:host:/srv/repo",
|
"sftp:host:/srv/repo",
|
||||||
Location{Scheme: "sftp",
|
Location{Scheme: "sftp",
|
||||||
Config: sftp.Config{
|
Config: sftp.Config{
|
||||||
User: "",
|
User: "",
|
||||||
Host: "host",
|
Host: "host",
|
||||||
Path: "/srv/repo",
|
Path: "/srv/repo",
|
||||||
|
Connections: 5,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -146,9 +160,10 @@ var parseTests = []struct {
|
|||||||
"sftp://user@host/srv/repo",
|
"sftp://user@host/srv/repo",
|
||||||
Location{Scheme: "sftp",
|
Location{Scheme: "sftp",
|
||||||
Config: sftp.Config{
|
Config: sftp.Config{
|
||||||
User: "user",
|
User: "user",
|
||||||
Host: "host",
|
Host: "host",
|
||||||
Path: "srv/repo",
|
Path: "srv/repo",
|
||||||
|
Connections: 5,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -156,9 +171,10 @@ var parseTests = []struct {
|
|||||||
"sftp://user@host//srv/repo",
|
"sftp://user@host//srv/repo",
|
||||||
Location{Scheme: "sftp",
|
Location{Scheme: "sftp",
|
||||||
Config: sftp.Config{
|
Config: sftp.Config{
|
||||||
User: "user",
|
User: "user",
|
||||||
Host: "host",
|
Host: "host",
|
||||||
Path: "/srv/repo",
|
Path: "/srv/repo",
|
||||||
|
Connections: 5,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -301,18 +301,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||||||
return errors.Wrap(err, "client.PutObject")
|
return errors.Wrap(err, "client.PutObject")
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
|
|
||||||
type wrapReader struct {
|
|
||||||
io.ReadCloser
|
|
||||||
f func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wr wrapReader) Close() error {
|
|
||||||
err := wr.ReadCloser.Close()
|
|
||||||
wr.f()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||||
// given offset.
|
// given offset.
|
||||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||||
@ -350,22 +338,17 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
|
|||||||
}
|
}
|
||||||
|
|
||||||
be.sem.GetToken()
|
be.sem.GetToken()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
coreClient := minio.Core{Client: be.client}
|
coreClient := minio.Core{Client: be.client}
|
||||||
rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts)
|
rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cancel()
|
||||||
be.sem.ReleaseToken()
|
be.sem.ReleaseToken()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
closeRd := wrapReader{
|
return be.sem.ReleaseTokenOnClose(rd, cancel), err
|
||||||
ReadCloser: rd,
|
|
||||||
f: func() {
|
|
||||||
debug.Log("Close()")
|
|
||||||
be.sem.ReleaseToken()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return closeRd, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns information about a blob.
|
// Stat returns information about a blob.
|
||||||
|
@ -15,6 +15,15 @@ type Config struct {
|
|||||||
|
|
||||||
Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"`
|
Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"`
|
||||||
Command string `option:"command" help:"specify command to create sftp connection"`
|
Command string `option:"command" help:"specify command to create sftp connection"`
|
||||||
|
|
||||||
|
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewConfig returns a new config with default options applied.
|
||||||
|
func NewConfig() Config {
|
||||||
|
return Config{
|
||||||
|
Connections: 5,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -75,10 +84,11 @@ func ParseConfig(s string) (interface{}, error) {
|
|||||||
return nil, errors.Fatal("sftp path starts with the tilde (~) character, that fails for most sftp servers.\nUse a relative directory, most servers interpret this as relative to the user's home directory.")
|
return nil, errors.Fatal("sftp path starts with the tilde (~) character, that fails for most sftp servers.\nUse a relative directory, most servers interpret this as relative to the user's home directory.")
|
||||||
}
|
}
|
||||||
|
|
||||||
return Config{
|
cfg := NewConfig()
|
||||||
User: user,
|
cfg.User = user
|
||||||
Host: host,
|
cfg.Host = host
|
||||||
Port: port,
|
cfg.Port = port
|
||||||
Path: p,
|
cfg.Path = p
|
||||||
}, nil
|
|
||||||
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
@ -11,68 +11,68 @@ var configTests = []struct {
|
|||||||
// first form, user specified sftp://user@host/dir
|
// first form, user specified sftp://user@host/dir
|
||||||
{
|
{
|
||||||
"sftp://user@host/dir/subdir",
|
"sftp://user@host/dir/subdir",
|
||||||
Config{User: "user", Host: "host", Path: "dir/subdir"},
|
Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp://host/dir/subdir",
|
"sftp://host/dir/subdir",
|
||||||
Config{Host: "host", Path: "dir/subdir"},
|
Config{Host: "host", Path: "dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp://host//dir/subdir",
|
"sftp://host//dir/subdir",
|
||||||
Config{Host: "host", Path: "/dir/subdir"},
|
Config{Host: "host", Path: "/dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp://host:10022//dir/subdir",
|
"sftp://host:10022//dir/subdir",
|
||||||
Config{Host: "host", Port: "10022", Path: "/dir/subdir"},
|
Config{Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp://user@host:10022//dir/subdir",
|
"sftp://user@host:10022//dir/subdir",
|
||||||
Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir"},
|
Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp://user@host/dir/subdir/../other",
|
"sftp://user@host/dir/subdir/../other",
|
||||||
Config{User: "user", Host: "host", Path: "dir/other"},
|
Config{User: "user", Host: "host", Path: "dir/other", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp://user@host/dir///subdir",
|
"sftp://user@host/dir///subdir",
|
||||||
Config{User: "user", Host: "host", Path: "dir/subdir"},
|
Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
|
|
||||||
// IPv6 address.
|
// IPv6 address.
|
||||||
{
|
{
|
||||||
"sftp://user@[::1]/dir",
|
"sftp://user@[::1]/dir",
|
||||||
Config{User: "user", Host: "::1", Path: "dir"},
|
Config{User: "user", Host: "::1", Path: "dir", Connections: 5},
|
||||||
},
|
},
|
||||||
// IPv6 address with port.
|
// IPv6 address with port.
|
||||||
{
|
{
|
||||||
"sftp://user@[::1]:22/dir",
|
"sftp://user@[::1]:22/dir",
|
||||||
Config{User: "user", Host: "::1", Port: "22", Path: "dir"},
|
Config{User: "user", Host: "::1", Port: "22", Path: "dir", Connections: 5},
|
||||||
},
|
},
|
||||||
|
|
||||||
// second form, user specified sftp:user@host:/dir
|
// second form, user specified sftp:user@host:/dir
|
||||||
{
|
{
|
||||||
"sftp:user@host:/dir/subdir",
|
"sftp:user@host:/dir/subdir",
|
||||||
Config{User: "user", Host: "host", Path: "/dir/subdir"},
|
Config{User: "user", Host: "host", Path: "/dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp:user@domain@host:/dir/subdir",
|
"sftp:user@domain@host:/dir/subdir",
|
||||||
Config{User: "user@domain", Host: "host", Path: "/dir/subdir"},
|
Config{User: "user@domain", Host: "host", Path: "/dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp:host:../dir/subdir",
|
"sftp:host:../dir/subdir",
|
||||||
Config{Host: "host", Path: "../dir/subdir"},
|
Config{Host: "host", Path: "../dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp:user@host:dir/subdir:suffix",
|
"sftp:user@host:dir/subdir:suffix",
|
||||||
Config{User: "user", Host: "host", Path: "dir/subdir:suffix"},
|
Config{User: "user", Host: "host", Path: "dir/subdir:suffix", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp:user@host:dir/subdir/../other",
|
"sftp:user@host:dir/subdir/../other",
|
||||||
Config{User: "user", Host: "host", Path: "dir/other"},
|
Config{User: "user", Host: "host", Path: "dir/other", Connections: 5},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"sftp:user@host:dir///subdir",
|
"sftp:user@host:dir///subdir",
|
||||||
Config{User: "user", Host: "host", Path: "dir/subdir"},
|
Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,9 +43,10 @@ func TestLayout(t *testing.T) {
|
|||||||
|
|
||||||
repo := filepath.Join(path, "repo")
|
repo := filepath.Join(path, "repo")
|
||||||
be, err := sftp.Open(context.TODO(), sftp.Config{
|
be, err := sftp.Open(context.TODO(), sftp.Config{
|
||||||
Command: fmt.Sprintf("%q -e", sftpServer),
|
Command: fmt.Sprintf("%q -e", sftpServer),
|
||||||
Path: repo,
|
Path: repo,
|
||||||
Layout: test.layout,
|
Layout: test.layout,
|
||||||
|
Connections: 5,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -31,6 +31,7 @@ type SFTP struct {
|
|||||||
cmd *exec.Cmd
|
cmd *exec.Cmd
|
||||||
result <-chan error
|
result <-chan error
|
||||||
|
|
||||||
|
sem *backend.Semaphore
|
||||||
backend.Layout
|
backend.Layout
|
||||||
Config
|
Config
|
||||||
}
|
}
|
||||||
@ -116,6 +117,11 @@ func (r *SFTP) clientError() error {
|
|||||||
func Open(ctx context.Context, cfg Config) (*SFTP, error) {
|
func Open(ctx context.Context, cfg Config) (*SFTP, error) {
|
||||||
debug.Log("open backend with config %#v", cfg)
|
debug.Log("open backend with config %#v", cfg)
|
||||||
|
|
||||||
|
sem, err := backend.NewSemaphore(cfg.Connections)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
cmd, args, err := buildSSHCommand(cfg)
|
cmd, args, err := buildSSHCommand(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -136,6 +142,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) {
|
|||||||
|
|
||||||
sftp.Config = cfg
|
sftp.Config = cfg
|
||||||
sftp.p = cfg.Path
|
sftp.p = cfg.Path
|
||||||
|
sftp.sem = sem
|
||||||
return sftp, nil
|
return sftp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -238,6 +245,10 @@ func Create(ctx context.Context, cfg Config) (*SFTP, error) {
|
|||||||
return Open(ctx, cfg)
|
return Open(ctx, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *SFTP) Connections() uint {
|
||||||
|
return r.Config.Connections
|
||||||
|
}
|
||||||
|
|
||||||
// Location returns this backend's location (the directory name).
|
// Location returns this backend's location (the directory name).
|
||||||
func (r *SFTP) Location() string {
|
func (r *SFTP) Location() string {
|
||||||
return r.p
|
return r.p
|
||||||
@ -280,6 +291,9 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader
|
|||||||
tmpFilename := filename + "-restic-temp-" + tempSuffix()
|
tmpFilename := filename + "-restic-temp-" + tempSuffix()
|
||||||
dirname := r.Dirname(h)
|
dirname := r.Dirname(h)
|
||||||
|
|
||||||
|
r.sem.GetToken()
|
||||||
|
defer r.sem.ReleaseToken()
|
||||||
|
|
||||||
// create new file
|
// create new file
|
||||||
f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY)
|
f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY)
|
||||||
|
|
||||||
@ -371,6 +385,19 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int
|
|||||||
return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
|
return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
|
||||||
|
type wrapReader struct {
|
||||||
|
io.ReadCloser
|
||||||
|
io.WriterTo
|
||||||
|
f func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wr *wrapReader) Close() error {
|
||||||
|
err := wr.ReadCloser.Close()
|
||||||
|
wr.f()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||||
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
||||||
if err := h.Valid(); err != nil {
|
if err := h.Valid(); err != nil {
|
||||||
@ -381,26 +408,38 @@ func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offs
|
|||||||
return nil, errors.New("offset is negative")
|
return nil, errors.New("offset is negative")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.sem.GetToken()
|
||||||
f, err := r.c.Open(r.Filename(h))
|
f, err := r.c.Open(r.Filename(h))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
r.sem.ReleaseToken()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
_, err = f.Seek(offset, 0)
|
_, err = f.Seek(offset, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
r.sem.ReleaseToken()
|
||||||
_ = f.Close()
|
_ = f.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// use custom close wrapper to also provide WriteTo() on the wrapper
|
||||||
|
rd := &wrapReader{
|
||||||
|
ReadCloser: f,
|
||||||
|
WriterTo: f,
|
||||||
|
f: func() {
|
||||||
|
r.sem.ReleaseToken()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
if length > 0 {
|
if length > 0 {
|
||||||
// unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader
|
// unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader
|
||||||
// limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go
|
// limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go
|
||||||
return backend.LimitReadCloser(f, int64(length)), nil
|
return backend.LimitReadCloser(rd, int64(length)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return f, nil
|
return rd, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns information about a blob.
|
// Stat returns information about a blob.
|
||||||
@ -414,6 +453,9 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro
|
|||||||
return restic.FileInfo{}, backoff.Permanent(err)
|
return restic.FileInfo{}, backoff.Permanent(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.sem.GetToken()
|
||||||
|
defer r.sem.ReleaseToken()
|
||||||
|
|
||||||
fi, err := r.c.Lstat(r.Filename(h))
|
fi, err := r.c.Lstat(r.Filename(h))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return restic.FileInfo{}, errors.Wrap(err, "Lstat")
|
return restic.FileInfo{}, errors.Wrap(err, "Lstat")
|
||||||
@ -429,6 +471,9 @@ func (r *SFTP) Test(ctx context.Context, h restic.Handle) (bool, error) {
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.sem.GetToken()
|
||||||
|
defer r.sem.ReleaseToken()
|
||||||
|
|
||||||
_, err := r.c.Lstat(r.Filename(h))
|
_, err := r.c.Lstat(r.Filename(h))
|
||||||
if os.IsNotExist(errors.Cause(err)) {
|
if os.IsNotExist(errors.Cause(err)) {
|
||||||
return false, nil
|
return false, nil
|
||||||
@ -448,6 +493,9 @@ func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.sem.GetToken()
|
||||||
|
defer r.sem.ReleaseToken()
|
||||||
|
|
||||||
return r.c.Remove(r.Filename(h))
|
return r.c.Remove(r.Filename(h))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -458,7 +506,14 @@ func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileI
|
|||||||
|
|
||||||
basedir, subdirs := r.Basedir(t)
|
basedir, subdirs := r.Basedir(t)
|
||||||
walker := r.c.Walk(basedir)
|
walker := r.c.Walk(basedir)
|
||||||
for walker.Step() {
|
for {
|
||||||
|
r.sem.GetToken()
|
||||||
|
ok := walker.Step()
|
||||||
|
r.sem.ReleaseToken()
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
if walker.Err() != nil {
|
if walker.Err() != nil {
|
||||||
if r.IsNotExist(walker.Err()) {
|
if r.IsNotExist(walker.Err()) {
|
||||||
debug.Log("ignoring non-existing directory")
|
debug.Log("ignoring non-existing directory")
|
||||||
|
@ -42,8 +42,9 @@ func newTestSuite(t testing.TB) *test.Suite {
|
|||||||
t.Logf("create new backend at %v", dir)
|
t.Logf("create new backend at %v", dir)
|
||||||
|
|
||||||
cfg := sftp.Config{
|
cfg := sftp.Config{
|
||||||
Path: dir,
|
Path: dir,
|
||||||
Command: fmt.Sprintf("%q -e", sftpServer),
|
Command: fmt.Sprintf("%q -e", sftpServer),
|
||||||
|
Connections: 5,
|
||||||
}
|
}
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
},
|
},
|
||||||
|
@ -93,7 +93,7 @@ func TestRepository(t testing.TB) (r restic.Repository, cleanup func()) {
|
|||||||
|
|
||||||
// TestOpenLocal opens a local repository.
|
// TestOpenLocal opens a local repository.
|
||||||
func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) {
|
func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) {
|
||||||
be, err := local.Open(context.TODO(), local.Config{Path: dir})
|
be, err := local.Open(context.TODO(), local.Config{Path: dir, Connections: 2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user