syncthing/lib/netutil/counting.go
Jakob Borg 020cfe395a all: Use multiple QUIC streams (fixes #8879)
Work in progress, to be described more fully in time, but in principle:

- support multiple streams on a single connection at the protocol level
- use multiple streams for concurrent requests
- hope for improved greatness
2023-04-28 09:01:54 +02:00

167 lines
3.1 KiB
Go

// Copyright (C) 2023 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package netutil
import (
"context"
"io"
"sync/atomic"
"time"
)
type Counted interface {
BytesRead() int64
LastRead() time.Time
BytesWritten() int64
LastWrite() time.Time
}
var rootCounter Counter
func RootCounter() Counted {
return &rootCounter
}
type CountedStream interface {
Counted
Stream
}
type Counter struct {
readBytes atomic.Int64
lastRead atomic.Int64
writeBytes atomic.Int64
lastWrite atomic.Int64
parent *Counter
}
func NewCounter() *Counter {
return newCounterWithParent(&rootCounter)
}
func newCounterWithParent(parent *Counter) *Counter {
return &Counter{
parent: parent,
}
}
func (c *Counter) BytesRead() int64 {
return c.readBytes.Load()
}
func (c *Counter) BytesWritten() int64 {
return c.writeBytes.Load()
}
func (c *Counter) LastRead() time.Time {
return time.Unix(0, c.lastRead.Load())
}
func (c *Counter) LastWrite() time.Time {
return time.Unix(0, c.lastWrite.Load())
}
func (c *Counter) addRead(n int) {
c.readBytes.Add(int64(n))
c.lastRead.Store(time.Now().UnixNano())
if c.parent != nil {
c.parent.addRead(n)
}
}
func (c *Counter) addWrite(n int) {
c.writeBytes.Add(int64(n))
c.lastWrite.Store(time.Now().UnixNano())
if c.parent != nil {
c.parent.addWrite(n)
}
}
type CountingStream struct {
Stream
*Counter
}
func NewCountingStream(s Stream, c *Counter) *CountingStream {
return &CountingStream{
Stream: s,
Counter: c,
}
}
func (c *CountingStream) Read(bs []byte) (int, error) {
n, err := c.Stream.Read(bs)
c.Counter.addRead(n)
return n, err
}
func (c *CountingStream) Write(bs []byte) (int, error) {
n, err := c.Stream.Write(bs)
c.Counter.addWrite(n)
return n, err
}
func (c *CountingStream) CreateSubstream(ctx context.Context) (io.ReadWriteCloser, error) {
s, err := c.Stream.CreateSubstream(ctx)
if err != nil {
return nil, err
}
return &readWriteCloser{
Reader: NewCountingReader(s, c.Counter),
Writer: NewCountingWriter(s, c.Counter),
Closer: s,
}, nil
}
func (c *CountingStream) AcceptSubstream(ctx context.Context) (io.ReadWriteCloser, error) {
s, err := c.Stream.AcceptSubstream(ctx)
if err != nil {
return nil, err
}
return &readWriteCloser{
Reader: NewCountingReader(s, c.Counter),
Writer: NewCountingWriter(s, c.Counter),
Closer: s,
}, nil
}
type countingReader struct {
io.Reader
*Counter
}
func NewCountingReader(r io.Reader, c *Counter) io.Reader {
return &countingReader{
Reader: r,
Counter: c,
}
}
func (c *countingReader) Read(bs []byte) (int, error) {
n, err := c.Reader.Read(bs)
c.Counter.addRead(n)
return n, err
}
type countingWriter struct {
io.Writer
*Counter
}
func NewCountingWriter(w io.Writer, c *Counter) io.Writer {
return &countingWriter{
Writer: w,
Counter: c,
}
}
func (c *countingWriter) Write(bs []byte) (int, error) {
n, err := c.Writer.Write(bs)
c.Counter.addWrite(n)
return n, err
}