lib/protocol: Further interface refactor (#9396)

This is a symmetric change to #9375 -- where that PR changed the
protocol->model interface, this changes the model->protocol one.
This commit is contained in:
Jakob Borg 2024-08-24 12:45:10 +02:00 committed by GitHub
parent 7df75e681d
commit 5342bec1b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 170 additions and 221 deletions

View File

@ -30,8 +30,8 @@ func newFakeConnection(id protocol.DeviceID, model Model) *fakeConnection {
model: model,
closed: make(chan struct{}),
}
f.RequestCalls(func(ctx context.Context, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
return f.fileData[name], nil
f.RequestCalls(func(ctx context.Context, req *protocol.Request) ([]byte, error) {
return f.fileData[req.Name], nil
})
f.DeviceIDReturns(id)
f.ConnectionIDReturns(rand.String(16))
@ -60,14 +60,16 @@ type fakeConnection struct {
}
func (f *fakeConnection) setIndexFn(fn func(_ context.Context, folder string, fs []protocol.FileInfo) error) {
f.IndexCalls(fn)
f.IndexUpdateCalls(fn)
f.IndexCalls(func(ctx context.Context, idx *protocol.Index) error { return fn(ctx, idx.Folder, idx.Files) })
f.IndexUpdateCalls(func(ctx context.Context, idxUp *protocol.IndexUpdate) error {
return fn(ctx, idxUp.Folder, idxUp.Files)
})
}
func (f *fakeConnection) DownloadProgress(_ context.Context, folder string, updates []protocol.FileDownloadProgressUpdate) {
func (f *fakeConnection) DownloadProgress(_ context.Context, dp *protocol.DownloadProgress) {
f.downloadProgressMessages = append(f.downloadProgressMessages, downloadProgressMessage{
folder: folder,
updates: updates,
folder: dp.Folder,
updates: dp.Updates,
})
}

View File

@ -228,9 +228,9 @@ func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error
l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
if initial {
initial = false
return s.conn.Index(ctx, s.folder, fs)
return s.conn.Index(ctx, &protocol.Index{Folder: s.folder, Files: fs})
}
return s.conn.IndexUpdate(ctx, s.folder, fs)
return s.conn.IndexUpdate(ctx, &protocol.IndexUpdate{Folder: s.folder, Files: fs})
})
var err error

View File

@ -40,10 +40,10 @@ func TestIndexhandlerConcurrency(t *testing.T) {
c2.Start()
defer c2.Close(io.EOF)
c1.ClusterConfig(protocol.ClusterConfig{})
c2.ClusterConfig(protocol.ClusterConfig{})
c1.Index(ctx, "foo", nil)
c2.Index(ctx, "foo", nil)
c1.ClusterConfig(&protocol.ClusterConfig{})
c2.ClusterConfig(&protocol.ClusterConfig{})
c1.Index(ctx, &protocol.Index{Folder: "foo"})
c2.Index(ctx, &protocol.Index{Folder: "foo"})
const msgs = 5e2
const files = 1e3
@ -64,7 +64,7 @@ func TestIndexhandlerConcurrency(t *testing.T) {
})
b1 := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
return c1.IndexUpdate(ctx, "foo", fs)
return c1.IndexUpdate(ctx, &protocol.IndexUpdate{Folder: "foo", Files: fs})
})
sentEntries := 0
for i := 0; i < msgs; i++ {

View File

@ -2414,7 +2414,7 @@ func (m *model) promoteConnections() {
if conn.Statistics().StartedAt.IsZero() {
conn.SetFolderPasswords(passwords)
conn.Start()
conn.ClusterConfig(protocol.ClusterConfig{Secondary: true})
conn.ClusterConfig(&protocol.ClusterConfig{Secondary: true})
}
}
}
@ -2469,7 +2469,7 @@ func (m *model) RequestGlobal(ctx context.Context, deviceID protocol.DeviceID, f
}
l.Debugf("%v REQ(out): %s (%s): %q / %q b=%d o=%d s=%d h=%x wh=%x ft=%t", m, deviceID.Short(), conn, folder, name, blockNo, offset, size, hash, weakHash, fromTemporary)
return conn.Request(ctx, folder, name, blockNo, offset, size, hash, weakHash, fromTemporary)
return conn.Request(ctx, &protocol.Request{Folder: folder, Name: name, BlockNo: blockNo, Offset: offset, Size: size, Hash: hash, WeakHash: weakHash, FromTemporary: fromTemporary})
}
// requestConnectionForDevice returns a connection to the given device, to
@ -2586,14 +2586,14 @@ func (m *model) numHashers(folder string) int {
// generateClusterConfig returns a ClusterConfigMessage that is correct and the
// set of folder passwords for the given peer device
func (m *model) generateClusterConfig(device protocol.DeviceID) (protocol.ClusterConfig, map[string]string) {
func (m *model) generateClusterConfig(device protocol.DeviceID) (*protocol.ClusterConfig, map[string]string) {
m.mut.RLock()
defer m.mut.RUnlock()
return m.generateClusterConfigRLocked(device)
}
func (m *model) generateClusterConfigRLocked(device protocol.DeviceID) (protocol.ClusterConfig, map[string]string) {
var message protocol.ClusterConfig
func (m *model) generateClusterConfigRLocked(device protocol.DeviceID) (*protocol.ClusterConfig, map[string]string) {
message := &protocol.ClusterConfig{}
folders := m.cfg.FolderList()
passwords := make(map[string]string, len(folders))
for _, folderCfg := range folders {

View File

@ -3631,11 +3631,11 @@ func testConfigChangeTriggersClusterConfigs(t *testing.T, expectFirst, expectSec
cc1 := make(chan struct{}, 1)
cc2 := make(chan struct{}, 1)
fc1 := newFakeConnection(device1, m)
fc1.ClusterConfigCalls(func(_ protocol.ClusterConfig) {
fc1.ClusterConfigCalls(func(_ *protocol.ClusterConfig) {
cc1 <- struct{}{}
})
fc2 := newFakeConnection(device2, m)
fc2.ClusterConfigCalls(func(_ protocol.ClusterConfig) {
fc2.ClusterConfigCalls(func(_ *protocol.ClusterConfig) {
cc2 <- struct{}{}
})
m.AddConnection(fc1, protocol.Hello{})

View File

@ -39,7 +39,7 @@ type progressUpdate struct {
}
func (p progressUpdate) send(ctx context.Context) {
p.conn.DownloadProgress(ctx, p.folder, p.updates)
p.conn.DownloadProgress(ctx, &protocol.DownloadProgress{Folder: p.folder, Updates: p.updates})
}
// NewProgressEmitter creates a new progress emitter which emits
@ -334,7 +334,7 @@ func (t *ProgressEmitter) clearLocked() {
}
for _, folder := range state.folders() {
if updates := state.cleanup(folder); len(updates) > 0 {
conn.DownloadProgress(context.Background(), folder, updates)
conn.DownloadProgress(context.Background(), &protocol.DownloadProgress{Folder: folder, Updates: updates})
}
}
}

View File

@ -143,11 +143,11 @@ func TestSymlinkTraversalWrite(t *testing.T) {
}
return nil
})
fc.RequestCalls(func(ctx context.Context, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
if name != "symlink" && strings.HasPrefix(name, "symlink") {
badReq <- name
fc.RequestCalls(func(ctx context.Context, req *protocol.Request) ([]byte, error) {
if req.Name != "symlink" && strings.HasPrefix(req.Name, "symlink") {
badReq <- req.Name
}
return fc.fileData[name], nil
return fc.fileData[req.Name], nil
})
// Send an update for the symlink, wait for it to sync and be reported back.
@ -338,7 +338,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
})
// Make sure pulling doesn't interfere, as index updates are racy and
// thus we cannot distinguish between scan and pull results.
fc.RequestCalls(func(ctx context.Context, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
fc.RequestCalls(func(_ context.Context, _ *protocol.Request) ([]byte, error) {
return nil, nil
})
@ -926,7 +926,7 @@ func TestNeedFolderFiles(t *testing.T) {
defer sub.Unsubscribe()
errPreventSync := errors.New("you aren't getting any of this")
fc.RequestCalls(func(ctx context.Context, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
fc.RequestCalls(func(_ context.Context, _ *protocol.Request) ([]byte, error) {
return nil, errPreventSync
})
@ -1065,9 +1065,9 @@ func TestRequestLastFileProgress(t *testing.T) {
done := make(chan struct{})
fc.RequestCalls(func(ctx context.Context, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
fc.RequestCalls(func(_ context.Context, req *protocol.Request) ([]byte, error) {
defer close(done)
progress, queued, rest, err := m.NeedFolderFiles(folder, 1, 10)
progress, queued, rest, err := m.NeedFolderFiles(req.Folder, 1, 10)
must(t, err)
if len(queued)+len(rest) != 0 {
t.Error(`There should not be any queued or "rest" items`)
@ -1075,7 +1075,7 @@ func TestRequestLastFileProgress(t *testing.T) {
if len(progress) != 1 {
t.Error("Expected exactly one item in progress.")
}
return fc.fileData[name], nil
return fc.fileData[req.Name], nil
})
contents := []byte("test file contents\n")
@ -1232,7 +1232,7 @@ func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
done := make(chan struct{})
defer close(done) // Must be the last thing to be deferred, thus first to run.
indexChan := make(chan []protocol.FileInfo, 1)
ccChan := make(chan protocol.ClusterConfig, 1)
ccChan := make(chan *protocol.ClusterConfig, 1)
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
select {
case indexChan <- fs:
@ -1240,7 +1240,7 @@ func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
}
return nil
})
fc.ClusterConfigCalls(func(cc protocol.ClusterConfig) {
fc.ClusterConfigCalls(func(cc *protocol.ClusterConfig) {
select {
case ccChan <- cc:
case <-done:

View File

@ -66,8 +66,8 @@ func benchmarkRequestsConnPair(b *testing.B, conn0, conn1 net.Conn) {
c1.Start()
// Satisfy the assertions in the protocol by sending an initial cluster config
c0.ClusterConfig(ClusterConfig{})
c1.ClusterConfig(ClusterConfig{})
c0.ClusterConfig(&ClusterConfig{})
c1.ClusterConfig(&ClusterConfig{})
// Report some useful stats and reset the timer for the actual test
b.ReportAllocs()
@ -82,9 +82,9 @@ func benchmarkRequestsConnPair(b *testing.B, conn0, conn1 net.Conn) {
// Use c0 and c1 for each alternating request, so we get as much
// data flowing in both directions.
if i%2 == 0 {
buf, err = c0.Request(context.Background(), "folder", "file", i, int64(i), 128<<10, nil, 0, false)
buf, err = c0.Request(context.Background(), &Request{Folder: "folder", Name: "file", BlockNo: i, Offset: int64(i), Size: 128 << 10})
} else {
buf, err = c1.Request(context.Background(), "folder", "file", i, int64(i), 128<<10, nil, 0, false)
buf, err = c1.Request(context.Background(), &Request{Folder: "folder", Name: "file", BlockNo: i, Offset: int64(i), Size: 128 << 10})
}
if err != nil {

View File

@ -193,48 +193,52 @@ func (e encryptedConnection) DeviceID() DeviceID {
return e.conn.DeviceID()
}
func (e encryptedConnection) Index(ctx context.Context, folder string, files []FileInfo) error {
if folderKey, ok := e.folderKeys.get(folder); ok {
encryptFileInfos(e.keyGen, files, folderKey)
func (e encryptedConnection) Index(ctx context.Context, idx *Index) error {
if folderKey, ok := e.folderKeys.get(idx.Folder); ok {
encryptFileInfos(e.keyGen, idx.Files, folderKey)
}
return e.conn.Index(ctx, folder, files)
return e.conn.Index(ctx, idx)
}
func (e encryptedConnection) IndexUpdate(ctx context.Context, folder string, files []FileInfo) error {
if folderKey, ok := e.folderKeys.get(folder); ok {
encryptFileInfos(e.keyGen, files, folderKey)
func (e encryptedConnection) IndexUpdate(ctx context.Context, idxUp *IndexUpdate) error {
if folderKey, ok := e.folderKeys.get(idxUp.Folder); ok {
encryptFileInfos(e.keyGen, idxUp.Files, folderKey)
}
return e.conn.IndexUpdate(ctx, folder, files)
return e.conn.IndexUpdate(ctx, idxUp)
}
func (e encryptedConnection) Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
folderKey, ok := e.folderKeys.get(folder)
func (e encryptedConnection) Request(ctx context.Context, req *Request) ([]byte, error) {
folderKey, ok := e.folderKeys.get(req.Folder)
if !ok {
return e.conn.Request(ctx, folder, name, blockNo, offset, size, hash, weakHash, fromTemporary)
return e.conn.Request(ctx, req)
}
// Encrypt / adjust the request parameters.
origSize := size
if size < minPaddedSize {
origSize := req.Size
origName := req.Name
if req.Size < minPaddedSize {
// Make a request for minPaddedSize data instead of the smaller
// block. We'll chop of the extra data later.
size = minPaddedSize
req.Size = minPaddedSize
}
encName := encryptName(name, folderKey)
encOffset := offset + int64(blockNo*blockOverhead)
encSize := size + blockOverhead
encName := encryptName(req.Name, folderKey)
encOffset := req.Offset + int64(req.BlockNo*blockOverhead)
encSize := req.Size + blockOverhead
// Perform that request, getting back and encrypted block.
// Perform that request, getting back an encrypted block.
bs, err := e.conn.Request(ctx, folder, encName, blockNo, encOffset, encSize, nil, 0, false)
req.Name = encName
req.Offset = encOffset
req.Size = encSize
bs, err := e.conn.Request(ctx, req)
if err != nil {
return nil, err
}
// Return the decrypted block (or an error if it fails decryption)
fileKey := e.keyGen.FileKey(name, folderKey)
fileKey := e.keyGen.FileKey(origName, folderKey)
bs, err = DecryptBytes(bs, fileKey)
if err != nil {
return nil, err
@ -242,15 +246,15 @@ func (e encryptedConnection) Request(ctx context.Context, folder string, name st
return bs[:origSize], nil
}
func (e encryptedConnection) DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate) {
if _, ok := e.folderKeys.get(folder); !ok {
e.conn.DownloadProgress(ctx, folder, updates)
func (e encryptedConnection) DownloadProgress(ctx context.Context, dp *DownloadProgress) {
if _, ok := e.folderKeys.get(dp.Folder); !ok {
e.conn.DownloadProgress(ctx, dp)
}
// No need to send these
}
func (e encryptedConnection) ClusterConfig(config ClusterConfig) {
func (e encryptedConnection) ClusterConfig(config *ClusterConfig) {
e.conn.ClusterConfig(config)
}

View File

@ -26,10 +26,10 @@ type Connection struct {
closedReturnsOnCall map[int]struct {
result1 <-chan struct{}
}
ClusterConfigStub func(protocol.ClusterConfig)
ClusterConfigStub func(*protocol.ClusterConfig)
clusterConfigMutex sync.RWMutex
clusterConfigArgsForCall []struct {
arg1 protocol.ClusterConfig
arg1 *protocol.ClusterConfig
}
ConnectionIDStub func() string
connectionIDMutex sync.RWMutex
@ -61,12 +61,11 @@ type Connection struct {
deviceIDReturnsOnCall map[int]struct {
result1 protocol.DeviceID
}
DownloadProgressStub func(context.Context, string, []protocol.FileDownloadProgressUpdate)
DownloadProgressStub func(context.Context, *protocol.DownloadProgress)
downloadProgressMutex sync.RWMutex
downloadProgressArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 []protocol.FileDownloadProgressUpdate
arg2 *protocol.DownloadProgress
}
EstablishedAtStub func() time.Time
establishedAtMutex sync.RWMutex
@ -78,12 +77,11 @@ type Connection struct {
establishedAtReturnsOnCall map[int]struct {
result1 time.Time
}
IndexStub func(context.Context, string, []protocol.FileInfo) error
IndexStub func(context.Context, *protocol.Index) error
indexMutex sync.RWMutex
indexArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 []protocol.FileInfo
arg2 *protocol.Index
}
indexReturns struct {
result1 error
@ -91,12 +89,11 @@ type Connection struct {
indexReturnsOnCall map[int]struct {
result1 error
}
IndexUpdateStub func(context.Context, string, []protocol.FileInfo) error
IndexUpdateStub func(context.Context, *protocol.IndexUpdate) error
indexUpdateMutex sync.RWMutex
indexUpdateArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 []protocol.FileInfo
arg2 *protocol.IndexUpdate
}
indexUpdateReturns struct {
result1 error
@ -134,18 +131,11 @@ type Connection struct {
remoteAddrReturnsOnCall map[int]struct {
result1 net.Addr
}
RequestStub func(context.Context, string, string, int, int64, int, []byte, uint32, bool) ([]byte, error)
RequestStub func(context.Context, *protocol.Request) ([]byte, error)
requestMutex sync.RWMutex
requestArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 string
arg4 int
arg5 int64
arg6 int
arg7 []byte
arg8 uint32
arg9 bool
arg2 *protocol.Request
}
requestReturns struct {
result1 []byte
@ -293,10 +283,10 @@ func (fake *Connection) ClosedReturnsOnCall(i int, result1 <-chan struct{}) {
}{result1}
}
func (fake *Connection) ClusterConfig(arg1 protocol.ClusterConfig) {
func (fake *Connection) ClusterConfig(arg1 *protocol.ClusterConfig) {
fake.clusterConfigMutex.Lock()
fake.clusterConfigArgsForCall = append(fake.clusterConfigArgsForCall, struct {
arg1 protocol.ClusterConfig
arg1 *protocol.ClusterConfig
}{arg1})
stub := fake.ClusterConfigStub
fake.recordInvocation("ClusterConfig", []interface{}{arg1})
@ -312,13 +302,13 @@ func (fake *Connection) ClusterConfigCallCount() int {
return len(fake.clusterConfigArgsForCall)
}
func (fake *Connection) ClusterConfigCalls(stub func(protocol.ClusterConfig)) {
func (fake *Connection) ClusterConfigCalls(stub func(*protocol.ClusterConfig)) {
fake.clusterConfigMutex.Lock()
defer fake.clusterConfigMutex.Unlock()
fake.ClusterConfigStub = stub
}
func (fake *Connection) ClusterConfigArgsForCall(i int) protocol.ClusterConfig {
func (fake *Connection) ClusterConfigArgsForCall(i int) *protocol.ClusterConfig {
fake.clusterConfigMutex.RLock()
defer fake.clusterConfigMutex.RUnlock()
argsForCall := fake.clusterConfigArgsForCall[i]
@ -484,23 +474,17 @@ func (fake *Connection) DeviceIDReturnsOnCall(i int, result1 protocol.DeviceID)
}{result1}
}
func (fake *Connection) DownloadProgress(arg1 context.Context, arg2 string, arg3 []protocol.FileDownloadProgressUpdate) {
var arg3Copy []protocol.FileDownloadProgressUpdate
if arg3 != nil {
arg3Copy = make([]protocol.FileDownloadProgressUpdate, len(arg3))
copy(arg3Copy, arg3)
}
func (fake *Connection) DownloadProgress(arg1 context.Context, arg2 *protocol.DownloadProgress) {
fake.downloadProgressMutex.Lock()
fake.downloadProgressArgsForCall = append(fake.downloadProgressArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 []protocol.FileDownloadProgressUpdate
}{arg1, arg2, arg3Copy})
arg2 *protocol.DownloadProgress
}{arg1, arg2})
stub := fake.DownloadProgressStub
fake.recordInvocation("DownloadProgress", []interface{}{arg1, arg2, arg3Copy})
fake.recordInvocation("DownloadProgress", []interface{}{arg1, arg2})
fake.downloadProgressMutex.Unlock()
if stub != nil {
fake.DownloadProgressStub(arg1, arg2, arg3)
fake.DownloadProgressStub(arg1, arg2)
}
}
@ -510,17 +494,17 @@ func (fake *Connection) DownloadProgressCallCount() int {
return len(fake.downloadProgressArgsForCall)
}
func (fake *Connection) DownloadProgressCalls(stub func(context.Context, string, []protocol.FileDownloadProgressUpdate)) {
func (fake *Connection) DownloadProgressCalls(stub func(context.Context, *protocol.DownloadProgress)) {
fake.downloadProgressMutex.Lock()
defer fake.downloadProgressMutex.Unlock()
fake.DownloadProgressStub = stub
}
func (fake *Connection) DownloadProgressArgsForCall(i int) (context.Context, string, []protocol.FileDownloadProgressUpdate) {
func (fake *Connection) DownloadProgressArgsForCall(i int) (context.Context, *protocol.DownloadProgress) {
fake.downloadProgressMutex.RLock()
defer fake.downloadProgressMutex.RUnlock()
argsForCall := fake.downloadProgressArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
return argsForCall.arg1, argsForCall.arg2
}
func (fake *Connection) EstablishedAt() time.Time {
@ -576,25 +560,19 @@ func (fake *Connection) EstablishedAtReturnsOnCall(i int, result1 time.Time) {
}{result1}
}
func (fake *Connection) Index(arg1 context.Context, arg2 string, arg3 []protocol.FileInfo) error {
var arg3Copy []protocol.FileInfo
if arg3 != nil {
arg3Copy = make([]protocol.FileInfo, len(arg3))
copy(arg3Copy, arg3)
}
func (fake *Connection) Index(arg1 context.Context, arg2 *protocol.Index) error {
fake.indexMutex.Lock()
ret, specificReturn := fake.indexReturnsOnCall[len(fake.indexArgsForCall)]
fake.indexArgsForCall = append(fake.indexArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 []protocol.FileInfo
}{arg1, arg2, arg3Copy})
arg2 *protocol.Index
}{arg1, arg2})
stub := fake.IndexStub
fakeReturns := fake.indexReturns
fake.recordInvocation("Index", []interface{}{arg1, arg2, arg3Copy})
fake.recordInvocation("Index", []interface{}{arg1, arg2})
fake.indexMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
@ -608,17 +586,17 @@ func (fake *Connection) IndexCallCount() int {
return len(fake.indexArgsForCall)
}
func (fake *Connection) IndexCalls(stub func(context.Context, string, []protocol.FileInfo) error) {
func (fake *Connection) IndexCalls(stub func(context.Context, *protocol.Index) error) {
fake.indexMutex.Lock()
defer fake.indexMutex.Unlock()
fake.IndexStub = stub
}
func (fake *Connection) IndexArgsForCall(i int) (context.Context, string, []protocol.FileInfo) {
func (fake *Connection) IndexArgsForCall(i int) (context.Context, *protocol.Index) {
fake.indexMutex.RLock()
defer fake.indexMutex.RUnlock()
argsForCall := fake.indexArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
return argsForCall.arg1, argsForCall.arg2
}
func (fake *Connection) IndexReturns(result1 error) {
@ -644,25 +622,19 @@ func (fake *Connection) IndexReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *Connection) IndexUpdate(arg1 context.Context, arg2 string, arg3 []protocol.FileInfo) error {
var arg3Copy []protocol.FileInfo
if arg3 != nil {
arg3Copy = make([]protocol.FileInfo, len(arg3))
copy(arg3Copy, arg3)
}
func (fake *Connection) IndexUpdate(arg1 context.Context, arg2 *protocol.IndexUpdate) error {
fake.indexUpdateMutex.Lock()
ret, specificReturn := fake.indexUpdateReturnsOnCall[len(fake.indexUpdateArgsForCall)]
fake.indexUpdateArgsForCall = append(fake.indexUpdateArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 []protocol.FileInfo
}{arg1, arg2, arg3Copy})
arg2 *protocol.IndexUpdate
}{arg1, arg2})
stub := fake.IndexUpdateStub
fakeReturns := fake.indexUpdateReturns
fake.recordInvocation("IndexUpdate", []interface{}{arg1, arg2, arg3Copy})
fake.recordInvocation("IndexUpdate", []interface{}{arg1, arg2})
fake.indexUpdateMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
@ -676,17 +648,17 @@ func (fake *Connection) IndexUpdateCallCount() int {
return len(fake.indexUpdateArgsForCall)
}
func (fake *Connection) IndexUpdateCalls(stub func(context.Context, string, []protocol.FileInfo) error) {
func (fake *Connection) IndexUpdateCalls(stub func(context.Context, *protocol.IndexUpdate) error) {
fake.indexUpdateMutex.Lock()
defer fake.indexUpdateMutex.Unlock()
fake.IndexUpdateStub = stub
}
func (fake *Connection) IndexUpdateArgsForCall(i int) (context.Context, string, []protocol.FileInfo) {
func (fake *Connection) IndexUpdateArgsForCall(i int) (context.Context, *protocol.IndexUpdate) {
fake.indexUpdateMutex.RLock()
defer fake.indexUpdateMutex.RUnlock()
argsForCall := fake.indexUpdateArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
return argsForCall.arg1, argsForCall.arg2
}
func (fake *Connection) IndexUpdateReturns(result1 error) {
@ -871,31 +843,19 @@ func (fake *Connection) RemoteAddrReturnsOnCall(i int, result1 net.Addr) {
}{result1}
}
func (fake *Connection) Request(arg1 context.Context, arg2 string, arg3 string, arg4 int, arg5 int64, arg6 int, arg7 []byte, arg8 uint32, arg9 bool) ([]byte, error) {
var arg7Copy []byte
if arg7 != nil {
arg7Copy = make([]byte, len(arg7))
copy(arg7Copy, arg7)
}
func (fake *Connection) Request(arg1 context.Context, arg2 *protocol.Request) ([]byte, error) {
fake.requestMutex.Lock()
ret, specificReturn := fake.requestReturnsOnCall[len(fake.requestArgsForCall)]
fake.requestArgsForCall = append(fake.requestArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 string
arg4 int
arg5 int64
arg6 int
arg7 []byte
arg8 uint32
arg9 bool
}{arg1, arg2, arg3, arg4, arg5, arg6, arg7Copy, arg8, arg9})
arg2 *protocol.Request
}{arg1, arg2})
stub := fake.RequestStub
fakeReturns := fake.requestReturns
fake.recordInvocation("Request", []interface{}{arg1, arg2, arg3, arg4, arg5, arg6, arg7Copy, arg8, arg9})
fake.recordInvocation("Request", []interface{}{arg1, arg2})
fake.requestMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
@ -909,17 +869,17 @@ func (fake *Connection) RequestCallCount() int {
return len(fake.requestArgsForCall)
}
func (fake *Connection) RequestCalls(stub func(context.Context, string, string, int, int64, int, []byte, uint32, bool) ([]byte, error)) {
func (fake *Connection) RequestCalls(stub func(context.Context, *protocol.Request) ([]byte, error)) {
fake.requestMutex.Lock()
defer fake.requestMutex.Unlock()
fake.RequestStub = stub
}
func (fake *Connection) RequestArgsForCall(i int) (context.Context, string, string, int, int64, int, []byte, uint32, bool) {
func (fake *Connection) RequestArgsForCall(i int) (context.Context, *protocol.Request) {
fake.requestMutex.RLock()
defer fake.requestMutex.RUnlock()
argsForCall := fake.requestArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6, argsForCall.arg7, argsForCall.arg8, argsForCall.arg9
return argsForCall.arg1, argsForCall.arg2
}
func (fake *Connection) RequestReturns(result1 []byte, result2 error) {

View File

@ -154,30 +154,30 @@ type RequestResponse interface {
}
type Connection interface {
// Send an index message. The connection will read and marshal the
// parameters asynchronously, so they should not be modified after
// calling Index().
Index(ctx context.Context, folder string, files []FileInfo) error
// Send an Index message to the peer device. The message in the
// parameter may be altered by the connection and should not be used
// further by the caller.
Index(ctx context.Context, idx *Index) error
// Send an index update message. The connection will read and marshal
// the parameters asynchronously, so they should not be modified after
// calling IndexUpdate().
IndexUpdate(ctx context.Context, folder string, files []FileInfo) error
// Send an Index Update message to the peer device. The message in the
// parameter may be altered by the connection and should not be used
// further by the caller.
IndexUpdate(ctx context.Context, idxUp *IndexUpdate) error
// Send a request message. The connection will read and marshal the
// parameters asynchronously, so they should not be modified after
// calling Request().
Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
// Send a Request message to the peer device. The message in the
// parameter may be altered by the connection and should not be used
// further by the caller.
Request(ctx context.Context, req *Request) ([]byte, error)
// Send a cluster configuration message. The connection will read and
// marshal the message asynchronously, so it should not be modified
// after calling ClusterConfig().
ClusterConfig(config ClusterConfig)
// Send a Cluster Configuration message to the peer device. The message
// in the parameter may be altered by the connection and should not be
// used further by the caller.
ClusterConfig(config *ClusterConfig)
// Send a download progress message. The connection will read and
// marshal the parameters asynchronously, so they should not be modified
// after calling DownloadProgress().
DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate)
// Send a Download Progress message to the peer device. The message in
// the parameter may be altered by the connection and should not be used
// further by the caller.
DownloadProgress(ctx context.Context, dp *DownloadProgress)
Start()
SetFolderPasswords(passwords map[string]string)
@ -185,6 +185,7 @@ type Connection interface {
DeviceID() DeviceID
Statistics() Statistics
Closed() <-chan struct{}
ConnectionInfo
}
@ -349,39 +350,33 @@ func (c *rawConnection) DeviceID() DeviceID {
}
// Index writes the list of file information to the connected peer device
func (c *rawConnection) Index(ctx context.Context, folder string, idx []FileInfo) error {
func (c *rawConnection) Index(ctx context.Context, idx *Index) error {
select {
case <-c.closed:
return ErrClosed
default:
}
c.idxMut.Lock()
c.send(ctx, &Index{
Folder: folder,
Files: idx,
}, nil)
c.send(ctx, idx, nil)
c.idxMut.Unlock()
return nil
}
// IndexUpdate writes the list of file information to the connected peer device as an update
func (c *rawConnection) IndexUpdate(ctx context.Context, folder string, idx []FileInfo) error {
func (c *rawConnection) IndexUpdate(ctx context.Context, idxUp *IndexUpdate) error {
select {
case <-c.closed:
return ErrClosed
default:
}
c.idxMut.Lock()
c.send(ctx, &IndexUpdate{
Folder: folder,
Files: idx,
}, nil)
c.send(ctx, idxUp, nil)
c.idxMut.Unlock()
return nil
}
// Request returns the bytes for the specified block after fetching them from the connected peer.
func (c *rawConnection) Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
func (c *rawConnection) Request(ctx context.Context, req *Request) ([]byte, error) {
rc := make(chan asyncResult, 1)
c.awaitingMut.Lock()
@ -394,17 +389,8 @@ func (c *rawConnection) Request(ctx context.Context, folder string, name string,
c.awaiting[id] = rc
c.awaitingMut.Unlock()
ok := c.send(ctx, &Request{
ID: id,
Folder: folder,
Name: name,
Offset: offset,
Size: size,
BlockNo: blockNo,
Hash: hash,
WeakHash: weakHash,
FromTemporary: fromTemporary,
}, nil)
req.ID = id
ok := c.send(ctx, req, nil)
if !ok {
return nil, ErrClosed
}
@ -421,9 +407,9 @@ func (c *rawConnection) Request(ctx context.Context, folder string, name string,
}
// ClusterConfig sends the cluster configuration message to the peer.
func (c *rawConnection) ClusterConfig(config ClusterConfig) {
func (c *rawConnection) ClusterConfig(config *ClusterConfig) {
select {
case c.clusterConfigBox <- &config:
case c.clusterConfigBox <- config:
case <-c.closed:
}
}
@ -433,11 +419,8 @@ func (c *rawConnection) Closed() <-chan struct{} {
}
// DownloadProgress sends the progress updates for the files that are currently being downloaded.
func (c *rawConnection) DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate) {
c.send(ctx, &DownloadProgress{
Folder: folder,
Updates: updates,
}, nil)
func (c *rawConnection) DownloadProgress(ctx context.Context, dp *DownloadProgress) {
c.send(ctx, dp, nil)
}
func (c *rawConnection) ping() bool {

View File

@ -38,8 +38,8 @@ func TestPing(t *testing.T) {
c1 := getRawConnection(NewConnection(c1ID, br, aw, testutil.NoopCloser{}, newTestModel(), new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen))
c1.Start()
defer closeAndWait(c1, ar, bw)
c0.ClusterConfig(ClusterConfig{})
c1.ClusterConfig(ClusterConfig{})
c0.ClusterConfig(&ClusterConfig{})
c1.ClusterConfig(&ClusterConfig{})
if ok := c0.ping(); !ok {
t.Error("c0 ping failed")
@ -64,8 +64,8 @@ func TestClose(t *testing.T) {
c1 := NewConnection(c1ID, br, aw, testutil.NoopCloser{}, m1, new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen)
c1.Start()
defer closeAndWait(c1, ar, bw)
c0.ClusterConfig(ClusterConfig{})
c1.ClusterConfig(ClusterConfig{})
c0.ClusterConfig(&ClusterConfig{})
c1.ClusterConfig(&ClusterConfig{})
c0.internalClose(errManual)
@ -82,10 +82,10 @@ func TestClose(t *testing.T) {
ctx := context.Background()
c0.Index(ctx, "default", nil)
c0.Index(ctx, "default", nil)
c0.Index(ctx, &Index{Folder: "default"})
c0.Index(ctx, &Index{Folder: "default"})
if _, err := c0.Request(ctx, "default", "foo", 0, 0, 0, nil, 0, false); err == nil {
if _, err := c0.Request(ctx, &Request{Folder: "default", Name: "foo"}); err == nil {
t.Error("Request should return an error")
}
}
@ -111,7 +111,7 @@ func TestCloseOnBlockingSend(t *testing.T) {
wg.Add(1)
go func() {
c.ClusterConfig(ClusterConfig{})
c.ClusterConfig(&ClusterConfig{})
wg.Done()
}()
@ -160,10 +160,10 @@ func TestCloseRace(t *testing.T) {
c1 := NewConnection(c1ID, br, aw, testutil.NoopCloser{}, m1, new(mockedConnectionInfo), CompressionNever, nil, testKeyGen)
c1.Start()
defer closeAndWait(c1, ar, bw)
c0.ClusterConfig(ClusterConfig{})
c1.ClusterConfig(ClusterConfig{})
c0.ClusterConfig(&ClusterConfig{})
c1.ClusterConfig(&ClusterConfig{})
c1.Index(context.Background(), "default", nil)
c1.Index(context.Background(), &Index{Folder: "default"})
select {
case <-indexReceived:
case <-time.After(time.Second):
@ -205,7 +205,7 @@ func TestClusterConfigFirst(t *testing.T) {
// Allow some time for c.writerLoop to setup after c.Start
}
c.ClusterConfig(ClusterConfig{})
c.ClusterConfig(&ClusterConfig{})
done := make(chan struct{})
if ok := c.send(context.Background(), &Ping{}, done); !ok {
@ -907,7 +907,7 @@ func TestClusterConfigAfterClose(t *testing.T) {
done := make(chan struct{})
go func() {
c.ClusterConfig(ClusterConfig{})
c.ClusterConfig(&ClusterConfig{})
close(done)
}()

View File

@ -13,23 +13,23 @@ type wireFormatConnection struct {
Connection
}
func (c wireFormatConnection) Index(ctx context.Context, folder string, fs []FileInfo) error {
for i := range fs {
fs[i].Name = norm.NFC.String(filepath.ToSlash(fs[i].Name))
func (c wireFormatConnection) Index(ctx context.Context, idx *Index) error {
for i := range idx.Files {
idx.Files[i].Name = norm.NFC.String(filepath.ToSlash(idx.Files[i].Name))
}
return c.Connection.Index(ctx, folder, fs)
return c.Connection.Index(ctx, idx)
}
func (c wireFormatConnection) IndexUpdate(ctx context.Context, folder string, fs []FileInfo) error {
for i := range fs {
fs[i].Name = norm.NFC.String(filepath.ToSlash(fs[i].Name))
func (c wireFormatConnection) IndexUpdate(ctx context.Context, idxUp *IndexUpdate) error {
for i := range idxUp.Files {
idxUp.Files[i].Name = norm.NFC.String(filepath.ToSlash(idxUp.Files[i].Name))
}
return c.Connection.IndexUpdate(ctx, folder, fs)
return c.Connection.IndexUpdate(ctx, idxUp)
}
func (c wireFormatConnection) Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
name = norm.NFC.String(filepath.ToSlash(name))
return c.Connection.Request(ctx, folder, name, blockNo, offset, size, hash, weakHash, fromTemporary)
func (c wireFormatConnection) Request(ctx context.Context, req *Request) ([]byte, error) {
req.Name = norm.NFC.String(filepath.ToSlash(req.Name))
return c.Connection.Request(ctx, req)
}