2
2
mirror of https://github.com/octoleo/restic.git synced 2024-12-24 11:55:28 +00:00
restic/internal/backend/rclone/backend.go
Michael Eischer c81b122374 rclone: Don't panic after unexpected subprocess exit
As the connection to the rclone child process is now closed after an
unexpected subprocess exit, later requests will cause the http2
transport to try to reestablish a new connection. As previously this never
should have happened, the connection called panic in that case. This
panic is now replaced with a simple error message, as it no longer
indicates an internal problem.
2020-08-01 12:17:40 +02:00

328 lines
6.6 KiB
Go

package rclone
import (
"bufio"
"context"
"crypto/tls"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"sync"
"time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/rest"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/limiter"
"golang.org/x/net/context/ctxhttp"
"golang.org/x/net/http2"
)
// Backend is used to access data stored somewhere via rclone.
type Backend struct {
*rest.Backend
tr *http2.Transport
cmd *exec.Cmd
waitCh <-chan struct{}
waitResult error
wg *sync.WaitGroup
conn *StdioConn
}
// run starts command with args and initializes the StdioConn.
func run(command string, args ...string) (*StdioConn, *exec.Cmd, *sync.WaitGroup, func() error, error) {
cmd := exec.Command(command, args...)
p, err := cmd.StderrPipe()
if err != nil {
return nil, nil, nil, nil, err
}
var wg sync.WaitGroup
// start goroutine to add a prefix to all messages printed by to stderr by rclone
wg.Add(1)
go func() {
defer wg.Done()
sc := bufio.NewScanner(p)
for sc.Scan() {
fmt.Fprintf(os.Stderr, "rclone: %v\n", sc.Text())
}
}()
r, stdin, err := os.Pipe()
if err != nil {
return nil, nil, nil, nil, err
}
stdout, w, err := os.Pipe()
if err != nil {
// close first pipe
r.Close()
stdin.Close()
return nil, nil, nil, nil, err
}
cmd.Stdin = r
cmd.Stdout = w
bg, err := backend.StartForeground(cmd)
// close rclone side of pipes
errR := r.Close()
errW := w.Close()
// return first error
if err == nil {
err = errR
}
if err == nil {
err = errW
}
if err != nil {
return nil, nil, nil, nil, err
}
c := &StdioConn{
receive: stdout,
send: stdin,
cmd: cmd,
}
return c, cmd, &wg, bg, nil
}
// wrappedConn adds bandwidth limiting capabilities to the StdioConn by
// wrapping the Read/Write methods.
type wrappedConn struct {
*StdioConn
io.Reader
io.Writer
}
func (c wrappedConn) Read(p []byte) (int, error) {
return c.Reader.Read(p)
}
func (c wrappedConn) Write(p []byte) (int, error) {
return c.Writer.Write(p)
}
func wrapConn(c *StdioConn, lim limiter.Limiter) wrappedConn {
wc := wrappedConn{
StdioConn: c,
Reader: c,
Writer: c,
}
if lim != nil {
wc.Reader = lim.Downstream(c)
wc.Writer = lim.UpstreamWriter(c)
}
return wc
}
// New initializes a Backend and starts the process.
func newBackend(cfg Config, lim limiter.Limiter) (*Backend, error) {
var (
args []string
err error
)
// build program args, start with the program
if cfg.Program != "" {
a, err := backend.SplitShellStrings(cfg.Program)
if err != nil {
return nil, err
}
args = append(args, a...)
}
// then add the arguments
if cfg.Args != "" {
a, err := backend.SplitShellStrings(cfg.Args)
if err != nil {
return nil, err
}
args = append(args, a...)
}
// finally, add the remote
args = append(args, cfg.Remote)
arg0, args := args[0], args[1:]
debug.Log("running command: %v %v", arg0, args)
stdioConn, cmd, wg, bg, err := run(arg0, args...)
if err != nil {
return nil, err
}
var conn net.Conn = stdioConn
if lim != nil {
conn = wrapConn(stdioConn, lim)
}
dialCount := 0
tr := &http2.Transport{
AllowHTTP: true, // this is not really HTTP, just stdin/stdout
DialTLS: func(network, address string, cfg *tls.Config) (net.Conn, error) {
debug.Log("new connection requested, %v %v", network, address)
if dialCount > 0 {
// the connection to the child process is already closed
return nil, errors.New("rclone stdio connection already closed")
}
dialCount++
return conn, nil
},
}
waitCh := make(chan struct{})
be := &Backend{
tr: tr,
cmd: cmd,
waitCh: waitCh,
conn: stdioConn,
wg: wg,
}
wg.Add(1)
go func() {
defer wg.Done()
debug.Log("waiting for error result")
err := cmd.Wait()
debug.Log("Wait returned %v", err)
be.waitResult = err
// close our side of the pipes to rclone
stdioConn.CloseAll()
close(waitCh)
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
debug.Log("monitoring command to cancel first HTTP request context")
select {
case <-ctx.Done():
debug.Log("context has been cancelled, returning")
case <-be.waitCh:
debug.Log("command has exited, cancelling context")
cancel()
}
}()
// send an HTTP request to the base URL, see if the server is there
client := &http.Client{
Transport: debug.RoundTripper(tr),
Timeout: 60 * time.Second,
}
// request a random file which does not exist. we just want to test when
// rclone is able to accept HTTP requests.
url := fmt.Sprintf("http://localhost/file-%d", rand.Uint64())
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", rest.ContentTypeV2)
req.Cancel = ctx.Done()
res, err := ctxhttp.Do(ctx, client, req)
if err != nil {
bg()
_ = cmd.Process.Kill()
return nil, errors.Errorf("error talking HTTP to rclone: %v", err)
}
debug.Log("HTTP status %q returned, moving instance to background", res.Status)
bg()
return be, nil
}
// Open starts an rclone process with the given config.
func Open(cfg Config, lim limiter.Limiter) (*Backend, error) {
be, err := newBackend(cfg, lim)
if err != nil {
return nil, err
}
url, err := url.Parse("http://localhost/")
if err != nil {
return nil, err
}
restConfig := rest.Config{
Connections: cfg.Connections,
URL: url,
}
restBackend, err := rest.Open(restConfig, debug.RoundTripper(be.tr))
if err != nil {
return nil, err
}
be.Backend = restBackend
return be, nil
}
// Create initializes a new restic repo with clone.
func Create(cfg Config) (*Backend, error) {
be, err := newBackend(cfg, nil)
if err != nil {
return nil, err
}
debug.Log("new backend created")
url, err := url.Parse("http://localhost/")
if err != nil {
return nil, err
}
restConfig := rest.Config{
Connections: 20,
URL: url,
}
restBackend, err := rest.Create(restConfig, debug.RoundTripper(be.tr))
if err != nil {
_ = be.Close()
return nil, err
}
be.Backend = restBackend
return be, nil
}
const waitForExit = 5 * time.Second
// Close terminates the backend.
func (be *Backend) Close() error {
debug.Log("exiting rclone")
be.tr.CloseIdleConnections()
select {
case <-be.waitCh:
debug.Log("rclone exited")
case <-time.After(waitForExit):
debug.Log("timeout, closing file descriptors")
err := be.conn.CloseAll()
if err != nil {
return err
}
}
be.wg.Wait()
debug.Log("wait for rclone returned: %v", be.waitResult)
return be.waitResult
}