mirror of
https://github.com/octoleo/restic.git
synced 2024-11-30 00:33:57 +00:00
limiter: support WriteTo in LimitBackend for read rate limiting
This commit is contained in:
parent
bcb852a8d0
commit
f35f2c48cd
@ -20,6 +20,10 @@ type Limiter interface {
|
|||||||
// for downloads.
|
// for downloads.
|
||||||
Downstream(r io.Reader) io.Reader
|
Downstream(r io.Reader) io.Reader
|
||||||
|
|
||||||
|
// Downstream returns a rate limited reader that is intended to be used
|
||||||
|
// for downloads.
|
||||||
|
DownstreamWriter(r io.Writer) io.Writer
|
||||||
|
|
||||||
// Transport returns an http.RoundTripper limited with the limiter.
|
// Transport returns an http.RoundTripper limited with the limiter.
|
||||||
Transport(http.RoundTripper) http.RoundTripper
|
Transport(http.RoundTripper) http.RoundTripper
|
||||||
}
|
}
|
||||||
|
@ -42,20 +42,34 @@ func (l limitedRewindReader) Read(b []byte) (int, error) {
|
|||||||
|
|
||||||
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||||
return r.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
|
return r.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
|
||||||
lrd := limitedReadCloser{
|
return consumer(newDownstreamLimitedReadCloser(rd, r.limiter, nil))
|
||||||
limited: r.limiter.Downstream(rd),
|
|
||||||
}
|
|
||||||
return consumer(lrd)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type limitedReadCloser struct {
|
type limitedReadCloser struct {
|
||||||
|
io.Reader
|
||||||
original io.ReadCloser
|
original io.ReadCloser
|
||||||
limited io.Reader
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l limitedReadCloser) Read(b []byte) (n int, err error) {
|
type limitedReadWriteToCloser struct {
|
||||||
return l.limited.Read(b)
|
limitedReadCloser
|
||||||
|
writerTo io.WriterTo
|
||||||
|
limiter Limiter
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDownstreamLimitedReadCloser(rd io.Reader, limiter Limiter, original io.ReadCloser) io.ReadCloser {
|
||||||
|
lrd := limitedReadCloser{
|
||||||
|
Reader: limiter.Downstream(rd),
|
||||||
|
original: original,
|
||||||
|
}
|
||||||
|
if _, ok := rd.(io.WriterTo); ok {
|
||||||
|
return &limitedReadWriteToCloser{
|
||||||
|
limitedReadCloser: lrd,
|
||||||
|
writerTo: rd.(io.WriterTo),
|
||||||
|
limiter: limiter,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &lrd
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l limitedReadCloser) Close() error {
|
func (l limitedReadCloser) Close() error {
|
||||||
@ -65,4 +79,8 @@ func (l limitedReadCloser) Close() error {
|
|||||||
return l.original.Close()
|
return l.original.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l limitedReadWriteToCloser) WriteTo(w io.Writer) (int64, error) {
|
||||||
|
return l.writerTo.WriteTo(l.limiter.DownstreamWriter(w))
|
||||||
|
}
|
||||||
|
|
||||||
var _ restic.Backend = (*rateLimitedBackend)(nil)
|
var _ restic.Backend = (*rateLimitedBackend)(nil)
|
||||||
|
@ -46,6 +46,10 @@ func (l staticLimiter) Downstream(r io.Reader) io.Reader {
|
|||||||
return l.limitReader(r, l.downstream)
|
return l.limitReader(r, l.downstream)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l staticLimiter) DownstreamWriter(w io.Writer) io.Writer {
|
||||||
|
return l.limitWriter(w, l.downstream)
|
||||||
|
}
|
||||||
|
|
||||||
type roundTripper func(*http.Request) (*http.Response, error)
|
type roundTripper func(*http.Request) (*http.Response, error)
|
||||||
|
|
||||||
func (rt roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
func (rt roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
@ -55,7 +59,7 @@ func (rt roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|||||||
func (l staticLimiter) roundTripper(rt http.RoundTripper, req *http.Request) (*http.Response, error) {
|
func (l staticLimiter) roundTripper(rt http.RoundTripper, req *http.Request) (*http.Response, error) {
|
||||||
if req.Body != nil {
|
if req.Body != nil {
|
||||||
req.Body = limitedReadCloser{
|
req.Body = limitedReadCloser{
|
||||||
limited: l.Upstream(req.Body),
|
Reader: l.Upstream(req.Body),
|
||||||
original: req.Body,
|
original: req.Body,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -64,7 +68,7 @@ func (l staticLimiter) roundTripper(rt http.RoundTripper, req *http.Request) (*h
|
|||||||
|
|
||||||
if res != nil && res.Body != nil {
|
if res != nil && res.Body != nil {
|
||||||
res.Body = limitedReadCloser{
|
res.Body = limitedReadCloser{
|
||||||
limited: l.Downstream(res.Body),
|
Reader: l.Downstream(res.Body),
|
||||||
original: res.Body,
|
original: res.Body,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user