mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 14:48:30 +00:00
Merge pull request #2498 from AudriusButkevicius/relaystuff
Change the way relays are chosen, add RelayFull message.
This commit is contained in:
commit
6a98534d5d
@ -6,6 +6,7 @@ import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
@ -82,7 +83,7 @@ func (c *dynamicClient) Serve() {
|
||||
addrs = append(addrs, ruri.String())
|
||||
}
|
||||
|
||||
for _, addr := range relayAddressesSortedByLatency(addrs) {
|
||||
for _, addr := range relayAddressesOrder(addrs) {
|
||||
select {
|
||||
case <-c.stop:
|
||||
l.Debugln(c, "stopping")
|
||||
@ -176,42 +177,44 @@ type dynamicAnnouncement struct {
|
||||
}
|
||||
}
|
||||
|
||||
// relayAddressesSortedByLatency adds local latency to the relay, and sorts them
|
||||
// by sum latency, and returns the addresses.
|
||||
func relayAddressesSortedByLatency(input []string) []string {
|
||||
relays := make(relayList, len(input))
|
||||
for i, relay := range input {
|
||||
if latency, err := osutil.GetLatencyForURL(relay); err == nil {
|
||||
relays[i] = relayWithLatency{relay, int(latency / time.Millisecond)}
|
||||
} else {
|
||||
relays[i] = relayWithLatency{relay, int(time.Hour / time.Millisecond)}
|
||||
// relayAddressesOrder checks the latency to each relay, rounds latency down to
|
||||
// the closest 50ms, and puts them in buckets of 50ms latency ranges. Then
|
||||
// shuffles each bucket, and returns all addresses starting with the ones from
|
||||
// the lowest latency bucket, ending with the highest latency buceket.
|
||||
func relayAddressesOrder(input []string) []string {
|
||||
buckets := make(map[int][]string)
|
||||
|
||||
for _, relay := range input {
|
||||
latency, err := osutil.GetLatencyForURL(relay)
|
||||
if err != nil {
|
||||
latency = time.Hour
|
||||
}
|
||||
|
||||
id := int(latency/time.Millisecond) / 50
|
||||
|
||||
buckets[id] = append(buckets[id], relay)
|
||||
}
|
||||
|
||||
sort.Sort(relays)
|
||||
|
||||
addresses := make([]string, len(relays))
|
||||
for i, relay := range relays {
|
||||
addresses[i] = relay.relay
|
||||
var ids []int
|
||||
for id, bucket := range buckets {
|
||||
shuffle(bucket)
|
||||
ids = append(ids, id)
|
||||
}
|
||||
|
||||
sort.Ints(ids)
|
||||
|
||||
addresses := make([]string, len(input))
|
||||
|
||||
for _, id := range ids {
|
||||
addresses = append(addresses, buckets[id]...)
|
||||
}
|
||||
|
||||
return addresses
|
||||
}
|
||||
|
||||
type relayWithLatency struct {
|
||||
relay string
|
||||
latency int
|
||||
}
|
||||
|
||||
type relayList []relayWithLatency
|
||||
|
||||
func (l relayList) Len() int {
|
||||
return len(l)
|
||||
}
|
||||
|
||||
func (l relayList) Less(a, b int) bool {
|
||||
return l[a].latency < l[b].latency
|
||||
}
|
||||
|
||||
func (l relayList) Swap(a, b int) {
|
||||
l[a], l[b] = l[b], l[a]
|
||||
func shuffle(slice []string) {
|
||||
for i := len(slice) - 1; i > 0; i-- {
|
||||
j := rand.Intn(i + 1)
|
||||
slice[i], slice[j] = slice[j], slice[i]
|
||||
}
|
||||
}
|
||||
|
@ -119,6 +119,10 @@ func (c *staticClient) Serve() {
|
||||
}
|
||||
c.invitations <- msg
|
||||
|
||||
case protocol.RelayFull:
|
||||
l.Infoln("Disconnected from relay due to it becoming full.")
|
||||
return
|
||||
|
||||
default:
|
||||
l.Infoln("Relay: protocol error: unexpected message %v", msg)
|
||||
return
|
||||
@ -240,6 +244,9 @@ func (c *staticClient) join() error {
|
||||
return fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message)
|
||||
}
|
||||
|
||||
case protocol.RelayFull:
|
||||
return fmt.Errorf("relay full")
|
||||
|
||||
default:
|
||||
return fmt.Errorf("protocol error: expecting response got %v", msg)
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ const (
|
||||
messageTypeResponse
|
||||
messageTypeConnectRequest
|
||||
messageTypeSessionInvitation
|
||||
messageTypeRelayFull
|
||||
)
|
||||
|
||||
type header struct {
|
||||
@ -31,6 +32,7 @@ type header struct {
|
||||
type Ping struct{}
|
||||
type Pong struct{}
|
||||
type JoinRelayRequest struct{}
|
||||
type RelayFull struct{}
|
||||
|
||||
type JoinSessionRequest struct {
|
||||
Key []byte // max:32
|
||||
|
@ -256,6 +256,63 @@ func (o *JoinRelayRequest) DecodeXDRFrom(xr *xdr.Reader) error {
|
||||
|
||||
/*
|
||||
|
||||
RelayFull Structure:
|
||||
|
||||
0 1 2 3
|
||||
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||
|
||||
|
||||
struct RelayFull {
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
func (o RelayFull) EncodeXDR(w io.Writer) (int, error) {
|
||||
var xw = xdr.NewWriter(w)
|
||||
return o.EncodeXDRInto(xw)
|
||||
}
|
||||
|
||||
func (o RelayFull) MarshalXDR() ([]byte, error) {
|
||||
return o.AppendXDR(make([]byte, 0, 128))
|
||||
}
|
||||
|
||||
func (o RelayFull) MustMarshalXDR() []byte {
|
||||
bs, err := o.MarshalXDR()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return bs
|
||||
}
|
||||
|
||||
func (o RelayFull) AppendXDR(bs []byte) ([]byte, error) {
|
||||
var aw = xdr.AppendWriter(bs)
|
||||
var xw = xdr.NewWriter(&aw)
|
||||
_, err := o.EncodeXDRInto(xw)
|
||||
return []byte(aw), err
|
||||
}
|
||||
|
||||
func (o RelayFull) EncodeXDRInto(xw *xdr.Writer) (int, error) {
|
||||
return xw.Tot(), xw.Error()
|
||||
}
|
||||
|
||||
func (o *RelayFull) DecodeXDR(r io.Reader) error {
|
||||
xr := xdr.NewReader(r)
|
||||
return o.DecodeXDRFrom(xr)
|
||||
}
|
||||
|
||||
func (o *RelayFull) UnmarshalXDR(bs []byte) error {
|
||||
var br = bytes.NewReader(bs)
|
||||
var xr = xdr.NewReader(br)
|
||||
return o.DecodeXDRFrom(xr)
|
||||
}
|
||||
|
||||
func (o *RelayFull) DecodeXDRFrom(xr *xdr.Reader) error {
|
||||
return xr.Error()
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
JoinSessionRequest Structure:
|
||||
|
||||
0 1 2 3
|
||||
|
@ -50,6 +50,9 @@ func WriteMessage(w io.Writer, message interface{}) error {
|
||||
case SessionInvitation:
|
||||
payload, err = msg.MarshalXDR()
|
||||
header.messageType = messageTypeSessionInvitation
|
||||
case RelayFull:
|
||||
payload, err = msg.MarshalXDR()
|
||||
header.messageType = messageTypeRelayFull
|
||||
default:
|
||||
err = fmt.Errorf("Unknown message type")
|
||||
}
|
||||
@ -108,6 +111,10 @@ func ReadMessage(r io.Reader) (interface{}, error) {
|
||||
var msg SessionInvitation
|
||||
err := msg.DecodeXDR(r)
|
||||
return msg, err
|
||||
case messageTypeRelayFull:
|
||||
var msg RelayFull
|
||||
err := msg.DecodeXDR(r)
|
||||
return msg, err
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("Unknown message type")
|
||||
|
Loading…
x
Reference in New Issue
Block a user