From 0987c731ec7abf2f65fd42ea5e362c14edd96bf6 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Wed, 1 May 2024 20:03:31 +0200 Subject: [PATCH 1/4] backend: configure protocol-level connection health checks This should detect a connection that is stuck for more than 2 minutes. --- internal/backend/http_transport.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/internal/backend/http_transport.go b/internal/backend/http_transport.go index 19b20dc6a..354611e07 100644 --- a/internal/backend/http_transport.go +++ b/internal/backend/http_transport.go @@ -13,6 +13,7 @@ import ( "github.com/peterbourgon/unixtransport" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "golang.org/x/net/http2" ) // TransportOptions collects various options which can be set for an HTTP based @@ -74,7 +75,6 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) { KeepAlive: 30 * time.Second, DualStack: true, }).DialContext, - ForceAttemptHTTP2: true, MaxIdleConns: 100, MaxIdleConnsPerHost: 100, IdleConnTimeout: 90 * time.Second, @@ -83,6 +83,17 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) { TLSClientConfig: &tls.Config{}, } + // ensure that http2 connections are closed if they are broken + h2, err := http2.ConfigureTransports(tr) + if err != nil { + panic(err) + } + if feature.Flag.Enabled(feature.HTTPTimeouts) { + h2.WriteByteTimeout = 120 * time.Second + h2.ReadIdleTimeout = 60 * time.Second + h2.PingTimeout = 60 * time.Second + } + unixtransport.Register(tr) if opts.InsecureTLS { From 877867023271c933948bf3fa8cfe1410c2d39cc8 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Wed, 1 May 2024 20:06:54 +0200 Subject: [PATCH 2/4] backend: cancel stuck http requests requests that make no upload or download progress within a timeout are canceled. --- internal/backend/http_transport.go | 8 +- internal/backend/watchdog_roundtriper.go | 104 +++++++++++++++++++++++ internal/feature/registry.go | 2 + 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 internal/backend/watchdog_roundtriper.go diff --git a/internal/backend/http_transport.go b/internal/backend/http_transport.go index 354611e07..09eb3cf16 100644 --- a/internal/backend/http_transport.go +++ b/internal/backend/http_transport.go @@ -13,6 +13,7 @@ import ( "github.com/peterbourgon/unixtransport" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/feature" "golang.org/x/net/http2" ) @@ -130,6 +131,11 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) { tr.TLSClientConfig.RootCAs = pool } + rt := http.RoundTripper(tr) + if feature.Flag.Enabled(feature.HTTPTimeouts) { + rt = newWatchdogRoundtripper(rt, 120*time.Second, 128*1024) + } + // wrap in the debug round tripper (if active) - return debug.RoundTripper(tr), nil + return debug.RoundTripper(rt), nil } diff --git a/internal/backend/watchdog_roundtriper.go b/internal/backend/watchdog_roundtriper.go new file mode 100644 index 000000000..fb7863002 --- /dev/null +++ b/internal/backend/watchdog_roundtriper.go @@ -0,0 +1,104 @@ +package backend + +import ( + "context" + "io" + "net/http" + "time" +) + +// watchdogRoundtripper cancels an http request if an upload or download did not make progress +// within timeout. The time between fully sending the request and receiving an response is also +// limited by this timeout. This ensures that stuck requests are cancelled after some time. +// +// The roundtriper makes the assumption that the upload and download happen continuously. In particular, +// the caller must not make long pauses between individual read requests from the response body. +type watchdogRoundtripper struct { + rt http.RoundTripper + timeout time.Duration + chunkSize int +} + +var _ http.RoundTripper = &watchdogRoundtripper{} + +func newWatchdogRoundtripper(rt http.RoundTripper, timeout time.Duration, chunkSize int) *watchdogRoundtripper { + return &watchdogRoundtripper{ + rt: rt, + timeout: timeout, + chunkSize: chunkSize, + } +} + +func (w *watchdogRoundtripper) RoundTrip(req *http.Request) (*http.Response, error) { + timer := time.NewTimer(w.timeout) + ctx, cancel := context.WithCancel(req.Context()) + + // cancel context if timer expires + go func() { + defer timer.Stop() + select { + case <-timer.C: + cancel() + case <-ctx.Done(): + } + }() + + kick := func() { + timer.Reset(w.timeout) + } + + req = req.Clone(ctx) + if req.Body != nil { + // kick watchdog timer as long as uploading makes progress + req.Body = newWatchdogReadCloser(req.Body, w.chunkSize, kick, nil) + } + + resp, err := w.rt.RoundTrip(req) + if err != nil { + return nil, err + } + + // kick watchdog timer as long as downloading makes progress + // cancel context to stop goroutine once response body is closed + resp.Body = newWatchdogReadCloser(resp.Body, w.chunkSize, kick, cancel) + return resp, nil +} + +func newWatchdogReadCloser(rc io.ReadCloser, chunkSize int, kick func(), close func()) *watchdogReadCloser { + return &watchdogReadCloser{ + rc: rc, + chunkSize: chunkSize, + kick: kick, + close: close, + } +} + +type watchdogReadCloser struct { + rc io.ReadCloser + chunkSize int + kick func() + close func() +} + +var _ io.ReadCloser = &watchdogReadCloser{} + +func (w *watchdogReadCloser) Read(p []byte) (n int, err error) { + w.kick() + + // Read is not required to fill the whole passed in byte slice + // Thus, keep things simple and just stay within our chunkSize. + if len(p) > w.chunkSize { + p = p[:w.chunkSize] + } + n, err = w.rc.Read(p) + w.kick() + + return n, err +} + +func (w *watchdogReadCloser) Close() error { + if w.close != nil { + w.close() + } + return w.rc.Close() +} diff --git a/internal/feature/registry.go b/internal/feature/registry.go index 2d2e45edf..b0e4d2ed7 100644 --- a/internal/feature/registry.go +++ b/internal/feature/registry.go @@ -8,6 +8,7 @@ const ( DeprecateLegacyIndex FlagName = "deprecate-legacy-index" DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout" DeviceIDForHardlinks FlagName = "device-id-for-hardlinks" + HTTPTimeouts FlagName = "http-timeouts" ) func init() { @@ -15,5 +16,6 @@ func init() { DeprecateLegacyIndex: {Type: Beta, Description: "disable support for index format used by restic 0.1.0. Use `restic repair index` to update the index if necessary."}, DeprecateS3LegacyLayout: {Type: Beta, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use `RESTIC_FEATURES=deprecate-s3-legacy-layout=false restic migrate s3_layout` to migrate your S3 repository if necessary."}, DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"}, + HTTPTimeouts: {Type: Beta, Description: "enforce timeouts for stuck HTTP requests."}, }) } From ebd01a467599b469e486cb08a170ab04e787bc8a Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Wed, 1 May 2024 21:54:21 +0200 Subject: [PATCH 3/4] backend: add tests for watchdogRoundTripper --- internal/backend/watchdog_roundtriper_test.go | 201 ++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 internal/backend/watchdog_roundtriper_test.go diff --git a/internal/backend/watchdog_roundtriper_test.go b/internal/backend/watchdog_roundtriper_test.go new file mode 100644 index 000000000..a13d670e0 --- /dev/null +++ b/internal/backend/watchdog_roundtriper_test.go @@ -0,0 +1,201 @@ +package backend + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + rtest "github.com/restic/restic/internal/test" +) + +func TestRead(t *testing.T) { + data := []byte("abcdef") + var ctr int + kick := func() { + ctr++ + } + var closed bool + onClose := func() { + closed = true + } + + wd := newWatchdogReadCloser(io.NopCloser(bytes.NewReader(data)), 1, kick, onClose) + + out, err := io.ReadAll(wd) + rtest.OK(t, err) + rtest.Equals(t, data, out, "data mismatch") + // the EOF read also triggers the kick function + rtest.Equals(t, len(data)*2+2, ctr, "unexpected number of kick calls") + + rtest.Equals(t, false, closed, "close function called too early") + rtest.OK(t, wd.Close()) + rtest.Equals(t, true, closed, "close function not called") +} + +func TestRoundtrip(t *testing.T) { + t.Parallel() + + // at the higher delay values, it takes longer to transmit the request/response body + // than the roundTripper timeout + for _, delay := range []int{0, 1, 10, 20} { + t.Run(fmt.Sprintf("%v", delay), func(t *testing.T) { + msg := []byte("ping-pong-data") + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + data, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + + // slowly send the reply + for len(data) >= 2 { + _, _ = w.Write(data[:2]) + w.(http.Flusher).Flush() + data = data[2:] + time.Sleep(time.Duration(delay) * time.Millisecond) + } + _, _ = w.Write(data) + })) + defer srv.Close() + + rt := newWatchdogRoundtripper(http.DefaultTransport, 50*time.Millisecond, 2) + req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(newSlowReader(bytes.NewReader(msg), time.Duration(delay)*time.Millisecond))) + rtest.OK(t, err) + + resp, err := rt.RoundTrip(req) + rtest.OK(t, err) + rtest.Equals(t, 200, resp.StatusCode, "unexpected status code") + + response, err := io.ReadAll(resp.Body) + rtest.OK(t, err) + rtest.Equals(t, msg, response, "unexpected response") + + rtest.OK(t, resp.Body.Close()) + }) + } +} + +func TestCanceledRoundtrip(t *testing.T) { + rt := newWatchdogRoundtripper(http.DefaultTransport, time.Second, 2) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + req, err := http.NewRequestWithContext(ctx, "GET", "http://some.random.url.dfdgsfg", nil) + rtest.OK(t, err) + + resp, err := rt.RoundTrip(req) + rtest.Equals(t, context.Canceled, err) + // make linter happy + if resp != nil { + rtest.OK(t, resp.Body.Close()) + } +} + +type slowReader struct { + data io.Reader + delay time.Duration +} + +func newSlowReader(data io.Reader, delay time.Duration) *slowReader { + return &slowReader{ + data: data, + delay: delay, + } +} + +func (s *slowReader) Read(p []byte) (n int, err error) { + time.Sleep(s.delay) + return s.data.Read(p) +} + +func TestUploadTimeout(t *testing.T) { + t.Parallel() + + msg := []byte("ping") + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(500) + return + } + t.Error("upload should have been canceled") + })) + defer srv.Close() + + rt := newWatchdogRoundtripper(http.DefaultTransport, 10*time.Millisecond, 1024) + req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(newSlowReader(bytes.NewReader(msg), 100*time.Millisecond))) + rtest.OK(t, err) + + resp, err := rt.RoundTrip(req) + rtest.Equals(t, context.Canceled, err) + // make linter happy + if resp != nil { + rtest.OK(t, resp.Body.Close()) + } +} + +func TestProcessingTimeout(t *testing.T) { + t.Parallel() + + msg := []byte("ping") + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(500) + return + } + time.Sleep(100 * time.Millisecond) + w.WriteHeader(200) + })) + defer srv.Close() + + rt := newWatchdogRoundtripper(http.DefaultTransport, 10*time.Millisecond, 1024) + req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(bytes.NewReader(msg))) + rtest.OK(t, err) + + resp, err := rt.RoundTrip(req) + rtest.Equals(t, context.Canceled, err) + // make linter happy + if resp != nil { + rtest.OK(t, resp.Body.Close()) + } +} + +func TestDownloadTimeout(t *testing.T) { + t.Parallel() + + msg := []byte("ping") + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + data, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + _, _ = w.Write(data[:2]) + w.(http.Flusher).Flush() + data = data[2:] + + time.Sleep(100 * time.Millisecond) + _, _ = w.Write(data) + + })) + defer srv.Close() + + rt := newWatchdogRoundtripper(http.DefaultTransport, 10*time.Millisecond, 1024) + req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(bytes.NewReader(msg))) + rtest.OK(t, err) + + resp, err := rt.RoundTrip(req) + rtest.OK(t, err) + rtest.Equals(t, 200, resp.StatusCode, "unexpected status code") + + _, err = io.ReadAll(resp.Body) + rtest.Equals(t, context.Canceled, err, "response download not canceled") + rtest.OK(t, resp.Body.Close()) +} From 3740700ddca7eaf61524462a0d56cc27d632dc4a Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Wed, 1 May 2024 22:03:20 +0200 Subject: [PATCH 4/4] add http timeouts to changelog --- changelog/unreleased/issue-4627 | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/changelog/unreleased/issue-4627 b/changelog/unreleased/issue-4627 index 626b341ea..bbc861b8e 100644 --- a/changelog/unreleased/issue-4627 +++ b/changelog/unreleased/issue-4627 @@ -4,5 +4,14 @@ Restic now downloads pack files in large chunks instead of using a streaming download. This prevents failures due to interrupted streams. The `restore` command now also retries downloading individual blobs that cannot be retrieved. +HTTP requests that are stuck for more than two minutes while uploading or +downloading are now forcibly interrupted. This ensures that stuck requests are +retried after a short timeout. These new request timeouts can temporarily be +disabled by setting the environment variable +`RESTIC_FEATURES=http-timeouts=false`. Note that this feature flag will be +removed in the next minor restic version. + https://github.com/restic/restic/issues/4627 +https://github.com/restic/restic/issues/4193 https://github.com/restic/restic/pull/4605 +https://github.com/restic/restic/pull/4792