Update godeps, reduce amount of time spent testing a relay. Goddamit godeps.

This commit is contained in:
Audrius Butkevicius 2015-11-23 21:33:22 +00:00
parent 845f31b98f
commit f39f816a98
18 changed files with 346 additions and 440 deletions

View File

@ -29,27 +29,27 @@
{
"ImportPath": "github.com/syncthing/syncthing/lib/protocol",
"Comment": "v0.12.0-beta1-119-g24c499d",
"Rev": "24c499d2822ae891c95406066456872e8d6c8164"
"Rev": "431d51f5c49ed7860e289ddab3c1fcf30004ff00"
},
{
"ImportPath": "github.com/syncthing/syncthing/lib/relay/client",
"Comment": "v0.12.0-beta1-119-g24c499d",
"Rev": "24c499d2822ae891c95406066456872e8d6c8164"
"Rev": "431d51f5c49ed7860e289ddab3c1fcf30004ff00"
},
{
"ImportPath": "github.com/syncthing/syncthing/lib/relay/protocol",
"Comment": "v0.12.0-beta1-119-g24c499d",
"Rev": "24c499d2822ae891c95406066456872e8d6c8164"
"Rev": "431d51f5c49ed7860e289ddab3c1fcf30004ff00"
},
{
"ImportPath": "github.com/syncthing/syncthing/lib/sync",
"Comment": "v0.12.0-beta1-119-g24c499d",
"Rev": "24c499d2822ae891c95406066456872e8d6c8164"
"Rev": "431d51f5c49ed7860e289ddab3c1fcf30004ff00"
},
{
"ImportPath": "github.com/syncthing/syncthing/lib/tlsutil",
"Comment": "v0.12.0-beta1-119-g24c499d",
"Rev": "24c499d2822ae891c95406066456872e8d6c8164"
"Rev": "431d51f5c49ed7860e289ddab3c1fcf30004ff00"
},
{
"ImportPath": "golang.org/x/text/transform",

View File

@ -6,10 +6,17 @@ import (
"os"
"strings"
"github.com/calmh/logger"
"github.com/syncthing/syncthing/lib/logger"
)
var (
debug = strings.Contains(os.Getenv("STTRACE"), "protocol") || os.Getenv("STTRACE") == "all"
l = logger.DefaultLogger
l = logger.DefaultLogger.NewFacility("protocol", "The BEP protocol")
)
func init() {
l.SetDebug("protocol", strings.Contains(os.Getenv("STTRACE"), "protocol") || os.Getenv("STTRACE") == "all")
}
func shouldDebug() bool {
return l.ShouldDebug("protocol")
}

View File

@ -14,7 +14,7 @@ const (
)
var (
ErrNoError error = nil
ErrNoError error
ErrGeneric = errors.New("generic error")
ErrNoSuchFile = errors.New("no such file")
ErrInvalid = errors.New("file is invalid")

View File

@ -1,14 +1,20 @@
// Copyright (C) 2014 The Protocol Authors.
//go:generate -command genxdr go run ../syncthing/Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go
//go:generate -command genxdr go run ../../Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go
//go:generate genxdr -o message_xdr.go message.go
package protocol
import "fmt"
import (
"bytes"
"crypto/sha256"
"fmt"
)
var sha256OfEmptyBlock = sha256.Sum256(make([]byte, BlockSize))
type IndexMessage struct {
Folder string
Folder string // max:256
Files []FileInfo // max:1000000
Flags uint32
Options []Option // max:64
@ -33,9 +39,13 @@ func (f FileInfo) Size() (bytes int64) {
if f.IsDeleted() || f.IsDirectory() {
return 128
}
if f.CachedSize > 0 {
return f.CachedSize
}
for _, b := range f.Blocks {
bytes += int64(b.Size)
}
f.CachedSize = bytes
return
}
@ -94,8 +104,13 @@ func (b BlockInfo) String() string {
return fmt.Sprintf("Block{%d/%d/%x}", b.Offset, b.Size, b.Hash)
}
// IsEmpty returns true if the block is a full block of zeroes.
func (b BlockInfo) IsEmpty() bool {
return b.Size == BlockSize && bytes.Equal(b.Hash, sha256OfEmptyBlock[:])
}
type RequestMessage struct {
Folder string // max:64
Folder string // max:256
Name string // max:8192
Offset int64
Size int32
@ -110,6 +125,7 @@ type ResponseMessage struct {
}
type ClusterConfigMessage struct {
DeviceName string // max:64
ClientName string // max:64
ClientVersion string // max:64
Folders []Folder // max:1000000
@ -126,7 +142,7 @@ func (o *ClusterConfigMessage) GetOption(key string) string {
}
type Folder struct {
ID string // max:64
ID string // max:256
Devices []Device // max:1000000
Flags uint32
Options []Option // max:64
@ -134,6 +150,10 @@ type Folder struct {
type Device struct {
ID []byte // max:32
Name string // max:64
Addresses []string // max:64,2083
Compression uint32
CertName string // max:64
MaxLocalVersion int64
Flags uint32
Options []Option // max:64

View File

@ -41,7 +41,7 @@ IndexMessage Structure:
struct IndexMessage {
string Folder<>;
string Folder<256>;
FileInfo Files<1000000>;
unsigned int Flags;
Option Options<64>;
@ -74,6 +74,9 @@ func (o IndexMessage) AppendXDR(bs []byte) ([]byte, error) {
}
func (o IndexMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) {
if l := len(o.Folder); l > 256 {
return xw.Tot(), xdr.ElementSizeExceeded("Folder", l, 256)
}
xw.WriteString(o.Folder)
if l := len(o.Files); l > 1000000 {
return xw.Tot(), xdr.ElementSizeExceeded("Files", l, 1000000)
@ -111,7 +114,7 @@ func (o *IndexMessage) UnmarshalXDR(bs []byte) error {
}
func (o *IndexMessage) DecodeXDRFrom(xr *xdr.Reader) error {
o.Folder = xr.ReadString()
o.Folder = xr.ReadStringMax(256)
_FilesSize := int(xr.ReadUint32())
if _FilesSize < 0 {
return xdr.ElementSizeExceeded("Files", _FilesSize, 1000000)
@ -380,7 +383,7 @@ RequestMessage Structure:
struct RequestMessage {
string Folder<64>;
string Folder<256>;
string Name<8192>;
hyper Offset;
int Size;
@ -416,8 +419,8 @@ func (o RequestMessage) AppendXDR(bs []byte) ([]byte, error) {
}
func (o RequestMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) {
if l := len(o.Folder); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("Folder", l, 64)
if l := len(o.Folder); l > 256 {
return xw.Tot(), xdr.ElementSizeExceeded("Folder", l, 256)
}
xw.WriteString(o.Folder)
if l := len(o.Name); l > 8192 {
@ -456,7 +459,7 @@ func (o *RequestMessage) UnmarshalXDR(bs []byte) error {
}
func (o *RequestMessage) DecodeXDRFrom(xr *xdr.Reader) error {
o.Folder = xr.ReadStringMax(64)
o.Folder = xr.ReadStringMax(256)
o.Name = xr.ReadStringMax(8192)
o.Offset = int64(xr.ReadUint64())
o.Size = int32(xr.ReadUint32())
@ -554,6 +557,12 @@ ClusterConfigMessage Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Device Name |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Device Name (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Client Name |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
@ -581,6 +590,7 @@ ClusterConfigMessage Structure:
struct ClusterConfigMessage {
string DeviceName<64>;
string ClientName<64>;
string ClientVersion<64>;
Folder Folders<1000000>;
@ -614,6 +624,10 @@ func (o ClusterConfigMessage) AppendXDR(bs []byte) ([]byte, error) {
}
func (o ClusterConfigMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) {
if l := len(o.DeviceName); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("DeviceName", l, 64)
}
xw.WriteString(o.DeviceName)
if l := len(o.ClientName); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("ClientName", l, 64)
}
@ -657,6 +671,7 @@ func (o *ClusterConfigMessage) UnmarshalXDR(bs []byte) error {
}
func (o *ClusterConfigMessage) DecodeXDRFrom(xr *xdr.Reader) error {
o.DeviceName = xr.ReadStringMax(64)
o.ClientName = xr.ReadStringMax(64)
o.ClientVersion = xr.ReadStringMax(64)
_FoldersSize := int(xr.ReadUint32())
@ -714,7 +729,7 @@ Folder Structure:
struct Folder {
string ID<64>;
string ID<256>;
Device Devices<1000000>;
unsigned int Flags;
Option Options<64>;
@ -747,8 +762,8 @@ func (o Folder) AppendXDR(bs []byte) ([]byte, error) {
}
func (o Folder) EncodeXDRInto(xw *xdr.Writer) (int, error) {
if l := len(o.ID); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("ID", l, 64)
if l := len(o.ID); l > 256 {
return xw.Tot(), xdr.ElementSizeExceeded("ID", l, 256)
}
xw.WriteString(o.ID)
if l := len(o.Devices); l > 1000000 {
@ -787,7 +802,7 @@ func (o *Folder) UnmarshalXDR(bs []byte) error {
}
func (o *Folder) DecodeXDRFrom(xr *xdr.Reader) error {
o.ID = xr.ReadStringMax(64)
o.ID = xr.ReadStringMax(256)
_DevicesSize := int(xr.ReadUint32())
if _DevicesSize < 0 {
return xdr.ElementSizeExceeded("Devices", _DevicesSize, 1000000)
@ -827,6 +842,28 @@ Device Structure:
\ ID (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Name |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Name (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Addresses |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Addresses |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Addresses (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Compression |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Cert Name |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Cert Name (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ Max Local Version (64 bits) +
| |
@ -843,6 +880,10 @@ Device Structure:
struct Device {
opaque ID<32>;
string Name<64>;
string Addresses<64>;
unsigned int Compression;
string CertName<64>;
hyper MaxLocalVersion;
unsigned int Flags;
Option Options<64>;
@ -879,6 +920,22 @@ func (o Device) EncodeXDRInto(xw *xdr.Writer) (int, error) {
return xw.Tot(), xdr.ElementSizeExceeded("ID", l, 32)
}
xw.WriteBytes(o.ID)
if l := len(o.Name); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("Name", l, 64)
}
xw.WriteString(o.Name)
if l := len(o.Addresses); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("Addresses", l, 64)
}
xw.WriteUint32(uint32(len(o.Addresses)))
for i := range o.Addresses {
xw.WriteString(o.Addresses[i])
}
xw.WriteUint32(o.Compression)
if l := len(o.CertName); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("CertName", l, 64)
}
xw.WriteString(o.CertName)
xw.WriteUint64(uint64(o.MaxLocalVersion))
xw.WriteUint32(o.Flags)
if l := len(o.Options); l > 64 {
@ -907,6 +964,20 @@ func (o *Device) UnmarshalXDR(bs []byte) error {
func (o *Device) DecodeXDRFrom(xr *xdr.Reader) error {
o.ID = xr.ReadBytesMax(32)
o.Name = xr.ReadStringMax(64)
_AddressesSize := int(xr.ReadUint32())
if _AddressesSize < 0 {
return xdr.ElementSizeExceeded("Addresses", _AddressesSize, 64)
}
if _AddressesSize > 64 {
return xdr.ElementSizeExceeded("Addresses", _AddressesSize, 64)
}
o.Addresses = make([]string, _AddressesSize)
for i := range o.Addresses {
o.Addresses[i] = xr.ReadStringMax(2083)
}
o.Compression = xr.ReadUint32()
o.CertName = xr.ReadStringMax(64)
o.MaxLocalVersion = int64(xr.ReadUint64())
o.Flags = xr.ReadUint32()
_OptionsSize := int(xr.ReadUint32())

View File

@ -15,10 +15,10 @@ import (
)
const (
// Data block size (128 KiB)
// BlockSize is the standard ata block size (128 KiB)
BlockSize = 128 << 10
// We reject messages larger than this when encountered on the wire. (64 MiB)
// MaxMessageLen is the largest message size allowed on the wire. (64 MiB)
MaxMessageLen = 64 << 20
)
@ -61,6 +61,13 @@ const (
FlagRequestTemporary uint32 = 1 << iota
)
// ClusterConfigMessage.Folders flags
const (
FlagFolderReadOnly uint32 = 1 << 0
FlagFolderIgnorePerms = 1 << 1
FlagFolderIgnoreDelete = 1 << 2
)
// ClusterConfigMessage.Folders.Devices flags
const (
FlagShareTrusted uint32 = 1 << 0
@ -145,9 +152,11 @@ type isEofer interface {
}
const (
// We make sure to send a message at least this often, by triggering pings.
// PingSendInterval is how often we make sure to send a message, by
// triggering pings if necessary.
PingSendInterval = 90 * time.Second
// If we haven't received a message from the other side for this long, close the connection.
// ReceiveTimeout is the longest we'll wait for a message from the other
// side before closing the connection.
ReceiveTimeout = 300 * time.Second
)
@ -367,9 +376,7 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4]))
msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8]))
if debug {
l.Debugf("read header %v (msglen=%d)", hdr, msglen)
}
if msglen > MaxMessageLen {
err = fmt.Errorf("message length %d exceeds maximum %d", msglen, MaxMessageLen)
@ -391,9 +398,7 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
return
}
if debug {
l.Debugf("read %d bytes", len(c.rdbuf0))
}
msgBuf := c.rdbuf0
if hdr.compression && msglen > 0 {
@ -403,12 +408,10 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
return
}
msgBuf = c.rdbuf1
if debug {
l.Debugf("decompressed to %d bytes", len(msgBuf))
}
}
if debug {
if shouldDebug() {
if len(msgBuf) > 1024 {
l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
} else {
@ -475,16 +478,12 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
}
func (c *rawConnection) handleIndex(im IndexMessage) {
if debug {
l.Debugf("Index(%v, %v, %d file, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
}
c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
}
func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
if debug {
l.Debugf("queueing IndexUpdate(%v, %v, %d files, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
}
c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
}
@ -634,9 +633,7 @@ func (c *rawConnection) writerLoop() {
binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
msgBuf = msgBuf[0 : len(tempBuf)+8]
if debug {
l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
}
} else {
// No point in compressing very short messages
hm.hdr.compression = false
@ -650,14 +647,10 @@ func (c *rawConnection) writerLoop() {
msgBuf = msgBuf[0 : len(uncBuf)+8]
copy(msgBuf[8:], uncBuf)
if debug {
l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
}
}
} else {
if debug {
l.Debugf("write empty message; %v", hm.hdr)
}
binary.BigEndian.PutUint32(msgBuf[4:8], 0)
msgBuf = msgBuf[:8]
}
@ -667,10 +660,8 @@ func (c *rawConnection) writerLoop() {
if err == nil {
var n int
n, err = c.cw.Write(msgBuf)
if debug {
l.Debugf("wrote %d bytes on the wire", n)
}
}
if err != nil {
c.close(err)
return
@ -723,15 +714,11 @@ func (c *rawConnection) pingSender() {
case <-ticker:
d := time.Since(c.cw.Last())
if d < PingSendInterval/2 {
if debug {
l.Debugln(c.id, "ping skipped after wr", d)
}
continue
}
if debug {
l.Debugln(c.id, "ping -> after", d)
}
c.ping()
case <-c.closed:
@ -751,15 +738,11 @@ func (c *rawConnection) pingReceiver() {
case <-ticker:
d := time.Since(c.cr.Last())
if d > ReceiveTimeout {
if debug {
l.Debugln(c.id, "ping timeout", d)
}
c.close(ErrTimeout)
}
if debug {
l.Debugln(c.id, "last read within", d)
}
case <-c.closed:
return

View File

@ -6,6 +6,7 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
@ -21,9 +22,19 @@ import (
var (
c0ID = NewDeviceID([]byte{1})
c1ID = NewDeviceID([]byte{2})
quickCfg = &quick.Config{}
)
func TestMain(m *testing.M) {
flag.Parse()
if flag.Lookup("test.short").Value.String() != "false" {
quickCfg.MaxCount = 10
}
os.Exit(m.Run())
}
func TestHeaderFunctions(t *testing.T) {
t.Parallel()
f := func(ver, id, typ int) bool {
ver = int(uint(ver) % 16)
id = int(uint(id) % 4096)
@ -38,6 +49,7 @@ func TestHeaderFunctions(t *testing.T) {
}
func TestHeaderLayout(t *testing.T) {
t.Parallel()
var e, a uint32
// Version are the first four bits
@ -63,6 +75,7 @@ func TestHeaderLayout(t *testing.T) {
}
func TestPing(t *testing.T) {
t.Parallel()
ar, aw := io.Pipe()
br, bw := io.Pipe()
@ -82,6 +95,7 @@ func TestPing(t *testing.T) {
}
func TestVersionErr(t *testing.T) {
t.Parallel()
m0 := newTestModel()
m1 := newTestModel()
@ -109,6 +123,7 @@ func TestVersionErr(t *testing.T) {
}
func TestTypeErr(t *testing.T) {
t.Parallel()
m0 := newTestModel()
m1 := newTestModel()
@ -136,6 +151,7 @@ func TestTypeErr(t *testing.T) {
}
func TestClose(t *testing.T) {
t.Parallel()
m0 := newTestModel()
m1 := newTestModel()
@ -171,10 +187,9 @@ func TestClose(t *testing.T) {
}
func TestElementSizeExceededNested(t *testing.T) {
t.Parallel()
m := ClusterConfigMessage{
Folders: []Folder{
{ID: "longstringlongstringlongstringinglongstringlongstringlonlongstringlongstringlon"},
},
ClientName: "longstringlongstringlongstringinglongstringlongstringlonlongstringlongstringlon",
}
_, err := m.EncodeXDR(ioutil.Discard)
if err == nil {
@ -183,11 +198,7 @@ func TestElementSizeExceededNested(t *testing.T) {
}
func TestMarshalIndexMessage(t *testing.T) {
var quickCfg = &quick.Config{MaxCountScale: 10}
if testing.Short() {
quickCfg = nil
}
t.Parallel()
f := func(m1 IndexMessage) bool {
for i, f := range m1.Files {
m1.Files[i].CachedSize = 0
@ -208,11 +219,7 @@ func TestMarshalIndexMessage(t *testing.T) {
}
func TestMarshalRequestMessage(t *testing.T) {
var quickCfg = &quick.Config{MaxCountScale: 10}
if testing.Short() {
quickCfg = nil
}
t.Parallel()
f := func(m1 RequestMessage) bool {
return testMarshal(t, "request", &m1, &RequestMessage{})
}
@ -223,11 +230,7 @@ func TestMarshalRequestMessage(t *testing.T) {
}
func TestMarshalResponseMessage(t *testing.T) {
var quickCfg = &quick.Config{MaxCountScale: 10}
if testing.Short() {
quickCfg = nil
}
t.Parallel()
f := func(m1 ResponseMessage) bool {
if len(m1.Data) == 0 {
m1.Data = nil
@ -241,11 +244,7 @@ func TestMarshalResponseMessage(t *testing.T) {
}
func TestMarshalClusterConfigMessage(t *testing.T) {
var quickCfg = &quick.Config{MaxCountScale: 10}
if testing.Short() {
quickCfg = nil
}
t.Parallel()
f := func(m1 ClusterConfigMessage) bool {
return testMarshal(t, "clusterconfig", &m1, &ClusterConfigMessage{})
}
@ -256,11 +255,7 @@ func TestMarshalClusterConfigMessage(t *testing.T) {
}
func TestMarshalCloseMessage(t *testing.T) {
var quickCfg = &quick.Config{MaxCountScale: 10}
if testing.Short() {
quickCfg = nil
}
t.Parallel()
f := func(m1 CloseMessage) bool {
return testMarshal(t, "close", &m1, &CloseMessage{})
}

View File

@ -4,7 +4,7 @@ package protocol
// The Vector type represents a version vector. The zero value is a usable
// version vector. The vector has slice semantics and some operations on it
// are "append-like" in that they may return the same vector modified, or a
// are "append-like" in that they may return the same vector modified, or v
// new allocated Vector with the modified contents.
type Vector []Counter
@ -33,43 +33,43 @@ func (v Vector) Update(ID uint64) Vector {
return nv
}
}
// Append a new new index
// Append a new index
return append(v, Counter{ID, 1})
}
// Merge returns the vector containing the maximum indexes from a and b. If it
// is possible, the vector a is updated and returned. If it is not, a copy
// Merge returns the vector containing the maximum indexes from v and b. If it
// is possible, the vector v is updated and returned. If it is not, a copy
// will be created, updated and returned.
func (a Vector) Merge(b Vector) Vector {
var ai, bi int
func (v Vector) Merge(b Vector) Vector {
var vi, bi int
for bi < len(b) {
if ai == len(a) {
// We've reach the end of a, all that remains are appends
return append(a, b[bi:]...)
if vi == len(v) {
// We've reach the end of v, all that remains are appends
return append(v, b[bi:]...)
}
if a[ai].ID > b[bi].ID {
if v[vi].ID > b[bi].ID {
// The index from b should be inserted here
n := make(Vector, len(a)+1)
copy(n, a[:ai])
n[ai] = b[bi]
copy(n[ai+1:], a[ai:])
a = n
n := make(Vector, len(v)+1)
copy(n, v[:vi])
n[vi] = b[bi]
copy(n[vi+1:], v[vi:])
v = n
}
if a[ai].ID == b[bi].ID {
if v := b[bi].Value; v > a[ai].Value {
a[ai].Value = v
if v[vi].ID == b[bi].ID {
if val := b[bi].Value; val > v[vi].Value {
v[vi].Value = val
}
}
if bi < len(b) && a[ai].ID == b[bi].ID {
if bi < len(b) && v[vi].ID == b[bi].ID {
bi++
}
ai++
vi++
}
return a
return v
}
// Copy returns an identical vector that is not shared with v.
@ -80,27 +80,27 @@ func (v Vector) Copy() Vector {
}
// Equal returns true when the two vectors are equivalent.
func (a Vector) Equal(b Vector) bool {
return a.Compare(b) == Equal
func (v Vector) Equal(b Vector) bool {
return v.Compare(b) == Equal
}
// LesserEqual returns true when the two vectors are equivalent or a is Lesser
// LesserEqual returns true when the two vectors are equivalent or v is Lesser
// than b.
func (a Vector) LesserEqual(b Vector) bool {
comp := a.Compare(b)
func (v Vector) LesserEqual(b Vector) bool {
comp := v.Compare(b)
return comp == Lesser || comp == Equal
}
// LesserEqual returns true when the two vectors are equivalent or a is Greater
// GreaterEqual returns true when the two vectors are equivalent or v is Greater
// than b.
func (a Vector) GreaterEqual(b Vector) bool {
comp := a.Compare(b)
func (v Vector) GreaterEqual(b Vector) bool {
comp := v.Compare(b)
return comp == Greater || comp == Equal
}
// Concurrent returns true when the two vectors are concrurrent.
func (a Vector) Concurrent(b Vector) bool {
comp := a.Compare(b)
func (v Vector) Concurrent(b Vector) bool {
comp := v.Compare(b)
return comp == ConcurrentGreater || comp == ConcurrentLesser
}

View File

@ -123,12 +123,12 @@ func TestMerge(t *testing.T) {
func TestCounterValue(t *testing.T) {
v0 := Vector{Counter{42, 1}, Counter{64, 5}}
if v0.Counter(42) != 1 {
t.Error("Counter error, %d != %d", v0.Counter(42), 1)
t.Errorf("Counter error, %d != %d", v0.Counter(42), 1)
}
if v0.Counter(64) != 5 {
t.Error("Counter error, %d != %d", v0.Counter(64), 5)
t.Errorf("Counter error, %d != %d", v0.Counter(64), 5)
}
if v0.Counter(72) != 0 {
t.Error("Counter error, %d != %d", v0.Counter(72), 0)
t.Errorf("Counter error, %d != %d", v0.Counter(72), 0)
}
}

View File

@ -5,294 +5,37 @@ package client
import (
"crypto/tls"
"fmt"
"log"
"net"
"net/url"
"time"
syncthingprotocol "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
type ProtocolClient struct {
URI *url.URL
Invitations chan protocol.SessionInvitation
type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient
closeInvitationsOnFinish bool
var (
supportedSchemes = map[string]relayClientFactory{
"relay": newStaticClient,
"dynamic+http": newDynamicClient,
"dynamic+https": newDynamicClient,
}
)
config *tls.Config
timeout time.Duration
stop chan struct{}
stopped chan struct{}
conn *tls.Conn
mut sync.RWMutex
connected bool
latency time.Duration
type RelayClient interface {
Serve()
Stop()
StatusOK() bool
Latency() time.Duration
String() string
Invitations() chan protocol.SessionInvitation
URI() *url.URL
}
func NewProtocolClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) *ProtocolClient {
closeInvitationsOnFinish := false
if invitations == nil {
closeInvitationsOnFinish = true
invitations = make(chan protocol.SessionInvitation)
func NewClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) (RelayClient, error) {
factory, ok := supportedSchemes[uri.Scheme]
if !ok {
return nil, fmt.Errorf("Unsupported scheme: %s", uri.Scheme)
}
return &ProtocolClient{
URI: uri,
Invitations: invitations,
closeInvitationsOnFinish: closeInvitationsOnFinish,
config: configForCerts(certs),
timeout: time.Minute * 2,
stop: make(chan struct{}),
stopped: make(chan struct{}),
mut: sync.NewRWMutex(),
connected: false,
}
}
func (c *ProtocolClient) Serve() {
c.stop = make(chan struct{})
c.stopped = make(chan struct{})
defer close(c.stopped)
if err := c.connect(); err != nil {
if debug {
l.Debugln("Relay connect:", err)
}
return
}
if debug {
l.Debugln(c, "connected", c.conn.RemoteAddr())
}
if err := c.join(); err != nil {
c.conn.Close()
l.Infoln("Relay join:", err)
return
}
if err := c.conn.SetDeadline(time.Time{}); err != nil {
l.Infoln("Relay set deadline:", err)
return
}
if debug {
l.Debugln(c, "joined", c.conn.RemoteAddr(), "via", c.conn.LocalAddr())
}
defer c.cleanup()
c.mut.Lock()
c.connected = true
c.mut.Unlock()
messages := make(chan interface{})
errors := make(chan error, 1)
go messageReader(c.conn, messages, errors)
timeout := time.NewTimer(c.timeout)
for {
select {
case message := <-messages:
timeout.Reset(c.timeout)
if debug {
log.Printf("%s received message %T", c, message)
}
switch msg := message.(type) {
case protocol.Ping:
if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil {
l.Infoln("Relay write:", err)
return
}
if debug {
l.Debugln(c, "sent pong")
}
case protocol.SessionInvitation:
ip := net.IP(msg.Address)
if len(ip) == 0 || ip.IsUnspecified() {
msg.Address = c.conn.RemoteAddr().(*net.TCPAddr).IP[:]
}
c.Invitations <- msg
default:
l.Infoln("Relay: protocol error: unexpected message %v", msg)
return
}
case <-c.stop:
if debug {
l.Debugln(c, "stopping")
}
return
case err := <-errors:
l.Infoln("Relay received:", err)
return
case <-timeout.C:
if debug {
l.Debugln(c, "timed out")
}
return
}
}
}
func (c *ProtocolClient) Stop() {
if c.stop == nil {
return
}
close(c.stop)
<-c.stopped
}
func (c *ProtocolClient) StatusOK() bool {
c.mut.RLock()
con := c.connected
c.mut.RUnlock()
return con
}
func (c *ProtocolClient) Latency() time.Duration {
c.mut.RLock()
lat := c.latency
c.mut.RUnlock()
return lat
}
func (c *ProtocolClient) String() string {
return fmt.Sprintf("ProtocolClient@%p", c)
}
func (c *ProtocolClient) connect() error {
if c.URI.Scheme != "relay" {
return fmt.Errorf("Unsupported relay schema:", c.URI.Scheme)
}
t0 := time.Now()
tcpConn, err := net.Dial("tcp", c.URI.Host)
if err != nil {
return err
}
c.mut.Lock()
c.latency = time.Since(t0)
c.mut.Unlock()
conn := tls.Client(tcpConn, c.config)
if err = conn.Handshake(); err != nil {
return err
}
if err := conn.SetDeadline(time.Now().Add(10 * time.Second)); err != nil {
conn.Close()
return err
}
if err := performHandshakeAndValidation(conn, c.URI); err != nil {
conn.Close()
return err
}
c.conn = conn
return nil
}
func (c *ProtocolClient) cleanup() {
if c.closeInvitationsOnFinish {
close(c.Invitations)
c.Invitations = make(chan protocol.SessionInvitation)
}
if debug {
l.Debugln(c, "cleaning up")
}
c.mut.Lock()
c.connected = false
c.mut.Unlock()
c.conn.Close()
}
func (c *ProtocolClient) join() error {
if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil {
return err
}
message, err := protocol.ReadMessage(c.conn)
if err != nil {
return err
}
switch msg := message.(type) {
case protocol.Response:
if msg.Code != 0 {
return fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message)
}
default:
return fmt.Errorf("protocol error: expecting response got %v", msg)
}
return nil
}
func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error {
if err := conn.Handshake(); err != nil {
return err
}
cs := conn.ConnectionState()
if !cs.NegotiatedProtocolIsMutual || cs.NegotiatedProtocol != protocol.ProtocolName {
return fmt.Errorf("protocol negotiation error")
}
q := uri.Query()
relayIDs := q.Get("id")
if relayIDs != "" {
relayID, err := syncthingprotocol.DeviceIDFromString(relayIDs)
if err != nil {
return fmt.Errorf("relay address contains invalid verification id: %s", err)
}
certs := cs.PeerCertificates
if cl := len(certs); cl != 1 {
return fmt.Errorf("unexpected certificate count: %d", cl)
}
remoteID := syncthingprotocol.NewDeviceID(certs[0].Raw)
if remoteID != relayID {
return fmt.Errorf("relay id does not match. Expected %v got %v", relayID, remoteID)
}
}
return nil
}
func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) {
for {
msg, err := protocol.ReadMessage(conn)
if err != nil {
errors <- err
return
}
messages <- msg
}
return factory(uri, certs, invitations, timeout), nil
}

View File

@ -6,10 +6,13 @@ import (
"os"
"strings"
"github.com/calmh/logger"
"github.com/syncthing/syncthing/lib/logger"
)
var (
debug = strings.Contains(os.Getenv("STTRACE"), "relay") || os.Getenv("STTRACE") == "all"
l = logger.DefaultLogger
l = logger.DefaultLogger.NewFacility("relay", "")
)
func init() {
l.SetDebug("relay", strings.Contains(os.Getenv("STTRACE"), "relay") || os.Getenv("STTRACE") == "all")
}

View File

@ -11,20 +11,23 @@ import (
"strings"
"time"
"github.com/syncthing/syncthing/lib/dialer"
syncthingprotocol "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay/protocol"
)
func GetInvitationFromRelay(uri *url.URL, id syncthingprotocol.DeviceID, certs []tls.Certificate) (protocol.SessionInvitation, error) {
func GetInvitationFromRelay(uri *url.URL, id syncthingprotocol.DeviceID, certs []tls.Certificate, timeout time.Duration) (protocol.SessionInvitation, error) {
if uri.Scheme != "relay" {
return protocol.SessionInvitation{}, fmt.Errorf("Unsupported relay scheme:", uri.Scheme)
return protocol.SessionInvitation{}, fmt.Errorf("Unsupported relay scheme: %v", uri.Scheme)
}
conn, err := tls.Dial("tcp", uri.Host, configForCerts(certs))
rconn, err := dialer.Dial("tcp", uri.Host)
if err != nil {
return protocol.SessionInvitation{}, err
}
conn.SetDeadline(time.Now().Add(10 * time.Second))
conn := tls.Client(rconn, configForCerts(certs))
conn.SetDeadline(time.Now().Add(timeout))
if err := performHandshakeAndValidation(conn, uri); err != nil {
return protocol.SessionInvitation{}, err
@ -49,9 +52,7 @@ func GetInvitationFromRelay(uri *url.URL, id syncthingprotocol.DeviceID, certs [
case protocol.Response:
return protocol.SessionInvitation{}, fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message)
case protocol.SessionInvitation:
if debug {
l.Debugln("Received invitation", msg, "via", conn.LocalAddr())
}
ip := net.IP(msg.Address)
if len(ip) == 0 || ip.IsUnspecified() {
msg.Address = conn.RemoteAddr().(*net.TCPAddr).IP[:]
@ -65,7 +66,7 @@ func GetInvitationFromRelay(uri *url.URL, id syncthingprotocol.DeviceID, certs [
func JoinSession(invitation protocol.SessionInvitation) (net.Conn, error) {
addr := net.JoinHostPort(net.IP(invitation.Address).String(), strconv.Itoa(int(invitation.Port)))
conn, err := net.Dial("tcp", addr)
conn, err := dialer.Dial("tcp", addr)
if err != nil {
return nil, err
}
@ -98,18 +99,22 @@ func JoinSession(invitation protocol.SessionInvitation) (net.Conn, error) {
}
}
func TestRelay(uri *url.URL, certs []tls.Certificate, sleep time.Duration, times int) bool {
func TestRelay(uri *url.URL, certs []tls.Certificate, sleep, timeout time.Duration, times int) bool {
id := syncthingprotocol.NewDeviceID(certs[0].Certificate[0])
invs := make(chan protocol.SessionInvitation, 1)
c := NewProtocolClient(uri, certs, invs)
c, err := NewClient(uri, certs, invs, timeout)
if err != nil {
close(invs)
return false
}
go c.Serve()
defer func() {
close(invs)
c.Stop()
close(invs)
}()
for i := 0; i < times; i++ {
_, err := GetInvitationFromRelay(uri, id, certs)
_, err := GetInvitationFromRelay(uri, id, certs, timeout)
if err == nil {
return true
}

View File

@ -1,6 +1,6 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
//go:generate -command genxdr go run ../../syncthing/Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go
//go:generate -command genxdr go run ../../../Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go
//go:generate genxdr -o packets_xdr.go packets.go
package protocol
@ -20,6 +20,7 @@ const (
messageTypeResponse
messageTypeConnectRequest
messageTypeSessionInvitation
messageTypeRelayFull
)
type header struct {
@ -31,6 +32,7 @@ type header struct {
type Ping struct{}
type Pong struct{}
type JoinRelayRequest struct{}
type RelayFull struct{}
type JoinSessionRequest struct {
Key []byte // max:32

View File

@ -256,6 +256,63 @@ func (o *JoinRelayRequest) DecodeXDRFrom(xr *xdr.Reader) error {
/*
RelayFull Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct RelayFull {
}
*/
func (o RelayFull) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.EncodeXDRInto(xw)
}
func (o RelayFull) MarshalXDR() ([]byte, error) {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o RelayFull) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o RelayFull) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o RelayFull) EncodeXDRInto(xw *xdr.Writer) (int, error) {
return xw.Tot(), xw.Error()
}
func (o *RelayFull) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.DecodeXDRFrom(xr)
}
func (o *RelayFull) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.DecodeXDRFrom(xr)
}
func (o *RelayFull) DecodeXDRFrom(xr *xdr.Reader) error {
return xr.Error()
}
/*
JoinSessionRequest Structure:
0 1 2 3

View File

@ -50,6 +50,9 @@ func WriteMessage(w io.Writer, message interface{}) error {
case SessionInvitation:
payload, err = msg.MarshalXDR()
header.messageType = messageTypeSessionInvitation
case RelayFull:
payload, err = msg.MarshalXDR()
header.messageType = messageTypeRelayFull
default:
err = fmt.Errorf("Unknown message type")
}
@ -108,6 +111,10 @@ func ReadMessage(r io.Reader) (interface{}, error) {
var msg SessionInvitation
err := msg.DecodeXDR(r)
return msg, err
case messageTypeRelayFull:
var msg RelayFull
err := msg.DecodeXDR(r)
return msg, err
}
return nil, fmt.Errorf("Unknown message type")

View File

@ -12,20 +12,24 @@ import (
"strings"
"time"
"github.com/calmh/logger"
"github.com/syncthing/syncthing/lib/logger"
)
var (
debug = strings.Contains(os.Getenv("STTRACE"), "locks") || os.Getenv("STTRACE") == "all"
threshold = time.Duration(100 * time.Millisecond)
l = logger.DefaultLogger
l = logger.DefaultLogger.NewFacility("sync", "Mutexes")
// We make an exception in this package and have an actual "if debug { ...
// }" variable, as it may be rather performance critical and does
// nonstandard things (from a debug logging PoV).
debug = strings.Contains(os.Getenv("STTRACE"), "sync") || os.Getenv("STTRACE") == "all"
)
func init() {
if n, err := strconv.Atoi(os.Getenv("STLOCKTHRESHOLD")); debug && err == nil {
l.SetDebug("sync", strings.Contains(os.Getenv("STTRACE"), "sync") || os.Getenv("STTRACE") == "all")
if n, err := strconv.Atoi(os.Getenv("STLOCKTHRESHOLD")); err == nil {
threshold = time.Duration(n) * time.Millisecond
}
if debug {
l.Debugf("Enabling lock logging at %v threshold", threshold)
}
}

View File

@ -12,7 +12,7 @@ import (
"testing"
"time"
"github.com/calmh/logger"
"github.com/syncthing/syncthing/lib/logger"
)
const (
@ -23,6 +23,7 @@ const (
func TestTypes(t *testing.T) {
debug = false
l.SetDebug("sync", false)
if _, ok := NewMutex().(*sync.Mutex); !ok {
t.Error("Wrong type")
@ -37,6 +38,7 @@ func TestTypes(t *testing.T) {
}
debug = true
l.SetDebug("sync", true)
if _, ok := NewMutex().(*loggedMutex); !ok {
t.Error("Wrong type")
@ -51,10 +53,12 @@ func TestTypes(t *testing.T) {
}
debug = false
l.SetDebug("sync", false)
}
func TestMutex(t *testing.T) {
debug = true
l.SetDebug("sync", true)
threshold = logThreshold
msgmut := sync.Mutex{}
@ -84,10 +88,12 @@ func TestMutex(t *testing.T) {
}
debug = false
l.SetDebug("sync", false)
}
func TestRWMutex(t *testing.T) {
debug = true
l.SetDebug("sync", true)
threshold = logThreshold
msgmut := sync.Mutex{}
@ -142,10 +148,12 @@ func TestRWMutex(t *testing.T) {
mut.RUnlock()
debug = false
l.SetDebug("sync", false)
}
func TestWaitGroup(t *testing.T) {
debug = true
l.SetDebug("sync", true)
threshold = logThreshold
msgmut := sync.Mutex{}
@ -182,4 +190,5 @@ func TestWaitGroup(t *testing.T) {
}
debug = false
l.SetDebug("sync", false)
}

View File

@ -371,7 +371,7 @@ func requestProcessor() {
if debug {
log.Println("Request for", request.relay)
}
if !client.TestRelay(request.uri, []tls.Certificate{testCert}, 250*time.Millisecond, 4) {
if !client.TestRelay(request.uri, []tls.Certificate{testCert}, time.Second, 2*time.Second, 3) {
if debug {
log.Println("Test for relay", request.relay, "failed")
}