2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-16 18:15:20 +00:00
restic/internal/backend/rclone/backend.go

342 lines
7.5 KiB
Go
Raw Normal View History

2018-03-13 21:30:51 +00:00
package rclone
import (
"bufio"
2018-03-13 21:30:51 +00:00
"context"
"crypto/tls"
"fmt"
2018-05-22 18:48:17 +00:00
"io"
"math/rand"
2018-03-13 21:30:51 +00:00
"net"
"net/http"
"net/url"
"os"
"os/exec"
"sync"
"syscall"
2018-03-13 21:30:51 +00:00
"time"
"github.com/cenkalti/backoff/v4"
2018-03-13 21:30:51 +00:00
"github.com/restic/restic/internal/backend"
2022-06-12 12:38:19 +00:00
"github.com/restic/restic/internal/backend/limiter"
"github.com/restic/restic/internal/backend/location"
2018-03-13 21:30:51 +00:00
"github.com/restic/restic/internal/backend/rest"
"github.com/restic/restic/internal/backend/util"
2018-03-13 21:30:51 +00:00
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"golang.org/x/net/http2"
)
// Backend is used to access data stored somewhere via rclone.
type Backend struct {
*rest.Backend
tr *http2.Transport
cmd *exec.Cmd
waitCh <-chan struct{}
waitResult error
wg *sync.WaitGroup
conn *StdioConn
2018-03-13 21:30:51 +00:00
}
func NewFactory() location.Factory {
return location.NewLimitedBackendFactory("rclone", ParseConfig, location.NoPassword, Create, Open)
}
2018-03-13 21:30:51 +00:00
// run starts command with args and initializes the StdioConn.
func run(command string, args ...string) (*StdioConn, *sync.WaitGroup, chan struct{}, func() error, error) {
2018-03-13 21:30:51 +00:00
cmd := exec.Command(command, args...)
p, err := cmd.StderrPipe()
if err != nil {
return nil, nil, nil, nil, err
}
var wg sync.WaitGroup
waitCh := make(chan struct{})
// start goroutine to add a prefix to all messages printed by to stderr by rclone
wg.Add(1)
go func() {
defer wg.Done()
defer close(waitCh)
sc := bufio.NewScanner(p)
for sc.Scan() {
fmt.Fprintf(os.Stderr, "rclone: %v\n", sc.Text())
}
debug.Log("command has exited, closing waitCh")
}()
2018-03-13 21:30:51 +00:00
r, stdin, err := os.Pipe()
if err != nil {
return nil, nil, nil, nil, err
2018-03-13 21:30:51 +00:00
}
stdout, w, err := os.Pipe()
if err != nil {
2021-01-30 15:46:34 +00:00
// close first pipe and ignore subsequent errors
_ = r.Close()
_ = stdin.Close()
return nil, nil, nil, nil, err
2018-03-13 21:30:51 +00:00
}
cmd.Stdin = r
cmd.Stdout = w
bg, err := util.StartForeground(cmd)
// close rclone side of pipes
errR := r.Close()
errW := w.Close()
// return first error
if err == nil {
err = errR
}
if err == nil {
err = errW
}
2018-03-13 21:30:51 +00:00
if err != nil {
if util.IsErrDot(err) {
return nil, nil, nil, nil, errors.Errorf("cannot implicitly run relative executable %v found in current directory, use -o rclone.program=./<program> to override", cmd.Path)
}
return nil, nil, nil, nil, err
2018-03-13 21:30:51 +00:00
}
c := &StdioConn{
receive: stdout,
send: stdin,
cmd: cmd,
2018-03-13 21:30:51 +00:00
}
return c, &wg, waitCh, bg, nil
2018-03-13 21:30:51 +00:00
}
2018-05-22 18:48:17 +00:00
// wrappedConn adds bandwidth limiting capabilities to the StdioConn by
// wrapping the Read/Write methods.
type wrappedConn struct {
*StdioConn
io.Reader
io.Writer
}
func (c *wrappedConn) Read(p []byte) (int, error) {
2018-05-22 18:48:17 +00:00
return c.Reader.Read(p)
}
func (c *wrappedConn) Write(p []byte) (int, error) {
2018-05-22 18:48:17 +00:00
return c.Writer.Write(p)
}
func wrapConn(c *StdioConn, lim limiter.Limiter) *wrappedConn {
wc := &wrappedConn{
2018-05-22 18:48:17 +00:00
StdioConn: c,
Reader: c,
Writer: c,
}
if lim != nil {
wc.Reader = lim.Downstream(c)
wc.Writer = lim.UpstreamWriter(c)
}
return wc
}
2018-03-13 21:30:51 +00:00
// New initializes a Backend and starts the process.
func newBackend(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) {
2018-03-13 21:30:51 +00:00
var (
args []string
err error
)
// build program args, start with the program
if cfg.Program != "" {
a, err := backend.SplitShellStrings(cfg.Program)
if err != nil {
return nil, err
}
args = append(args, a...)
}
// then add the arguments
if cfg.Args != "" {
a, err := backend.SplitShellStrings(cfg.Args)
if err != nil {
return nil, err
}
args = append(args, a...)
}
// finally, add the remote
args = append(args, cfg.Remote)
arg0, args := args[0], args[1:]
debug.Log("running command: %v %v", arg0, args)
stdioConn, wg, waitCh, bg, err := run(arg0, args...)
2018-03-13 21:30:51 +00:00
if err != nil {
return nil, err
}
2018-05-22 18:48:17 +00:00
var conn net.Conn = stdioConn
if lim != nil {
conn = wrapConn(stdioConn, lim)
}
dialCount := 0
2018-03-13 21:30:51 +00:00
tr := &http2.Transport{
AllowHTTP: true, // this is not really HTTP, just stdin/stdout
DialTLS: func(network, address string, cfg *tls.Config) (net.Conn, error) {
debug.Log("new connection requested, %v %v", network, address)
if dialCount > 0 {
// the connection to the child process is already closed
return nil, backoff.Permanent(errors.New("rclone stdio connection already closed"))
}
dialCount++
2018-03-13 21:30:51 +00:00
return conn, nil
},
}
cmd := stdioConn.cmd
2018-03-13 21:30:51 +00:00
be := &Backend{
tr: tr,
cmd: cmd,
waitCh: waitCh,
2018-05-22 18:48:17 +00:00
conn: stdioConn,
wg: wg,
2018-03-13 21:30:51 +00:00
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wg.Add(1)
2018-03-13 21:30:51 +00:00
go func() {
defer wg.Done()
<-waitCh
cancel()
// according to the documentation of StdErrPipe, Wait() must only be called after the former has completed
2018-03-13 21:30:51 +00:00
err := cmd.Wait()
debug.Log("Wait returned %v", err)
be.waitResult = err
2021-01-30 15:46:34 +00:00
// close our side of the pipes to rclone, ignore errors
_ = stdioConn.CloseAll()
2018-03-13 21:30:51 +00:00
}()
// send an HTTP request to the base URL, see if the server is there
client := http.Client{
2018-10-21 17:58:40 +00:00
Transport: debug.RoundTripper(tr),
2021-11-07 16:47:18 +00:00
Timeout: cfg.Timeout,
2018-03-13 21:30:51 +00:00
}
// request a random file which does not exist. we just want to test when
// rclone is able to accept HTTP requests.
url := fmt.Sprintf("http://localhost/file-%d", rand.Uint64())
2021-01-30 19:43:53 +00:00
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
2018-03-13 21:30:51 +00:00
if err != nil {
return nil, err
}
req.Header.Set("Accept", rest.ContentTypeV2)
res, err := client.Do(req)
2018-03-13 21:30:51 +00:00
if err != nil {
2021-01-30 15:46:34 +00:00
// ignore subsequent errors
_ = bg()
2018-03-13 21:30:51 +00:00
_ = cmd.Process.Kill()
// wait for rclone to exit
wg.Wait()
// try to return the program exit code if communication with rclone has failed
if be.waitResult != nil && (errors.Is(err, context.Canceled) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) || errors.Is(err, os.ErrClosed)) {
err = be.waitResult
}
return nil, fmt.Errorf("error talking HTTP to rclone: %w", err)
2018-03-13 21:30:51 +00:00
}
debug.Log("HTTP status %q returned, moving instance to background", res.Status)
2021-01-30 15:46:34 +00:00
err = bg()
if err != nil {
return nil, fmt.Errorf("error moving process to background: %w", err)
}
2018-03-13 21:30:51 +00:00
return be, nil
}
// Open starts an rclone process with the given config.
func Open(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) {
be, err := newBackend(ctx, cfg, lim)
2018-03-13 21:30:51 +00:00
if err != nil {
return nil, err
}
url, err := url.Parse("http://localhost/")
if err != nil {
return nil, err
}
restConfig := rest.Config{
Connections: cfg.Connections,
2018-03-13 21:30:51 +00:00
URL: url,
}
restBackend, err := rest.Open(ctx, restConfig, debug.RoundTripper(be.tr))
2018-03-13 21:30:51 +00:00
if err != nil {
_ = be.Close()
2018-03-13 21:30:51 +00:00
return nil, err
}
be.Backend = restBackend
return be, nil
}
// Create initializes a new restic repo with rclone.
func Create(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) {
be, err := newBackend(ctx, cfg, lim)
2018-03-13 21:30:51 +00:00
if err != nil {
return nil, err
}
debug.Log("new backend created")
url, err := url.Parse("http://localhost/")
if err != nil {
return nil, err
}
restConfig := rest.Config{
Connections: cfg.Connections,
2018-03-13 21:30:51 +00:00
URL: url,
}
restBackend, err := rest.Create(ctx, restConfig, debug.RoundTripper(be.tr))
2018-03-13 21:30:51 +00:00
if err != nil {
_ = be.Close()
2018-03-13 21:30:51 +00:00
return nil, err
}
be.Backend = restBackend
return be, nil
}
const waitForExit = 5 * time.Second
2018-03-13 21:30:51 +00:00
// Close terminates the backend.
func (be *Backend) Close() error {
2018-03-15 18:00:25 +00:00
debug.Log("exiting rclone")
2018-03-13 21:30:51 +00:00
be.tr.CloseIdleConnections()
select {
case <-be.waitCh:
debug.Log("rclone exited")
case <-time.After(waitForExit):
debug.Log("timeout, closing file descriptors")
err := be.conn.CloseAll()
if err != nil {
return err
}
}
be.wg.Wait()
2018-03-13 21:30:51 +00:00
debug.Log("wait for rclone returned: %v", be.waitResult)
return be.waitResult
}