lib/connections: Always run a simple connection test (#7866)

This commit is contained in:
Jakob Borg 2022-04-10 22:54:42 +04:00 committed by GitHub
parent 22e12904c9
commit 9b09bcc5f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -7,10 +7,12 @@
package connections package connections
import ( import (
"bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
"io"
"math/rand" "math/rand"
"net" "net"
"net/url" "net/url"
@ -295,7 +297,7 @@ func TestNextDialRegistryCleanup(t *testing.T) {
} }
} }
func BenchmarkConnections(pb *testing.B) { func BenchmarkConnections(b *testing.B) {
addrs := []string{ addrs := []string{
"tcp://127.0.0.1:0", "tcp://127.0.0.1:0",
"quic://127.0.0.1:0", "quic://127.0.0.1:0",
@ -316,9 +318,13 @@ func BenchmarkConnections(pb *testing.B) {
} }
for _, addr := range addrs { for _, addr := range addrs {
for _, sz := range sizes { for _, sz := range sizes {
data := make([]byte, sz)
if _, err := rand.Read(data); err != nil {
b.Fatal(err)
}
for _, direction := range []string{"cs", "sc"} { for _, direction := range []string{"cs", "sc"} {
proto := strings.SplitN(addr, ":", 2)[0] proto := strings.SplitN(addr, ":", 2)[0]
pb.Run(fmt.Sprintf("%s_%d_%s", proto, sz, direction), func(b *testing.B) { b.Run(fmt.Sprintf("%s_%d_%s", proto, sz, direction), func(b *testing.B) {
if proto == "relay" && !haveRelay { if proto == "relay" && !haveRelay {
b.Skip("could not connect to relay") b.Skip("could not connect to relay")
} }
@ -326,61 +332,79 @@ func BenchmarkConnections(pb *testing.B) {
if direction == "sc" { if direction == "sc" {
server, client = client, server server, client = client, server
} }
data := make([]byte, sz)
if _, err := rand.Read(data); err != nil {
b.Fatal(err)
}
total := 0 total := 0
wg := sync.NewWaitGroup()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
wg := sync.NewWaitGroup()
wg.Add(2) wg.Add(2)
errC := make(chan error, 2)
go func() { go func() {
if err := sendMsg(client, data); err != nil { if _, err := client.Write(data); err != nil {
b.Fatal(err) errC <- err
return
} }
wg.Done() wg.Done()
}() }()
go func() { go func() {
if err := recvMsg(server, data); err != nil { if _, err := io.ReadFull(server, data); err != nil {
b.Fatal(err) errC <- err
return
} }
total += sz total += sz
wg.Done() wg.Done()
}() }()
wg.Wait() wg.Wait()
close(errC)
err := <-errC
if err != nil {
b.Fatal(err)
}
} }
b.ReportAllocs() b.ReportAllocs()
b.SetBytes(int64(total / b.N)) b.SetBytes(int64(total / b.N))
}) })
}) })
} }
} }
} }
} }
func sendMsg(c internalConn, buf []byte) error { func TestConnectionEstablishment(t *testing.T) {
n, err := c.Write(buf) addrs := []string{
if n != len(buf) || err != nil { "tcp://127.0.0.1:0",
return err "quic://127.0.0.1:0",
}
return nil
} }
func recvMsg(c internalConn, buf []byte) error { send := make([]byte, 128<<10)
for read := 0; read != len(buf); { if _, err := rand.Read(send); err != nil {
n, err := c.Read(buf) t.Fatal(err)
read += n
if err != nil {
return err
}
}
return nil
} }
func withConnectionPair(b *testing.B, connUri string, h func(client, server internalConn)) { for _, addr := range addrs {
proto := strings.SplitN(addr, ":", 2)[0]
t.Run(proto, func(t *testing.T) {
withConnectionPair(t, addr, func(client, server internalConn) {
if _, err := client.Write(send); err != nil {
t.Fatal(err)
}
recv := make([]byte, len(send))
if _, err := io.ReadFull(server, recv); err != nil {
t.Fatal(err)
}
if !bytes.Equal(recv, send) {
t.Fatal("data mismatch")
}
})
})
}
}
func withConnectionPair(b interface{ Fatal(...interface{}) }, connUri string, h func(client, server internalConn)) {
// Root of the service tree. // Root of the service tree.
supervisor := suture.New("main", suture.Spec{ supervisor := suture.New("main", suture.Spec{
PassThroughPanics: true, PassThroughPanics: true,
@ -449,19 +473,22 @@ func withConnectionPair(b *testing.B, connUri string, h func(client, server inte
} }
} }
data := []byte("hello")
// Quic does not start a stream until some data is sent through, so send something for the AcceptStream // Quic does not start a stream until some data is sent through, so send something for the AcceptStream
// to fire on the other side. // to fire on the other side.
if err := sendMsg(clientConn, data); err != nil { send := []byte("hello")
if _, err := clientConn.Write(send); err != nil {
b.Fatal(err) b.Fatal(err)
} }
serverConn := <-conns serverConn := <-conns
if err := recvMsg(serverConn, data); err != nil { recv := make([]byte, len(send))
if _, err := io.ReadFull(serverConn, recv); err != nil {
b.Fatal(err) b.Fatal(err)
} }
if !bytes.Equal(recv, send) {
b.Fatal("data mismatch")
}
h(clientConn, serverConn) h(clientConn, serverConn)
@ -469,7 +496,7 @@ func withConnectionPair(b *testing.B, connUri string, h func(client, server inte
_ = serverConn.Close() _ = serverConn.Close()
} }
func mustGetCert(b *testing.B) tls.Certificate { func mustGetCert(b interface{ Fatal(...interface{}) }) tls.Certificate {
cert, err := tlsutil.NewCertificateInMemory("bench", 10) cert, err := tlsutil.NewCertificateInMemory("bench", 10)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)