From 1a5f524ae4a47588ed633eff78097c6255aa2bcf Mon Sep 17 00:00:00 2001 From: AudriusButkevicius Date: Fri, 15 Apr 2016 10:59:41 +0000 Subject: [PATCH] lib/model, lib/protocol: Implement temporary indexes (fixes #950) GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/2252 --- cmd/syncthing/gui.go | 5 +- cmd/syncthing/mocked_model_test.go | 3 +- lib/config/config_test.go | 2 + lib/config/folderconfiguration.go | 1 + lib/config/optionsconfiguration.go | 1 + lib/config/testdata/overridenvalues.xml | 1 + lib/model/deviceactivity.go | 22 +- lib/model/deviceactivity_test.go | 34 +-- lib/model/devicedownloadstate.go | 156 ++++++++++ lib/model/devicedownloadstate_test.go | 111 ++++++++ lib/model/model.go | 160 ++++++++--- lib/model/model_test.go | 29 +- lib/model/progressemitter.go | 178 ++++++++++-- lib/model/progressemitter_test.go | 361 +++++++++++++++++++++++- lib/model/rwfolder.go | 81 ++++-- lib/model/rwfolder_test.go | 48 +++- lib/model/sentdownloadstate.go | 184 ++++++++++++ lib/model/sharedpullerstate.go | 73 +++-- lib/model/sharedpullerstate_test.go | 6 +- lib/protocol/common_test.go | 3 + lib/protocol/message.go | 14 + lib/protocol/message_xdr.go | 232 +++++++++++++++ lib/protocol/nativemodel_darwin.go | 16 +- lib/protocol/nativemodel_unix.go | 22 +- lib/protocol/nativemodel_windows.go | 16 +- lib/protocol/protocol.go | 81 ++++-- lib/protocol/protocol_test.go | 2 +- lib/protocol/wireformat.go | 4 +- 28 files changed, 1612 insertions(+), 234 deletions(-) create mode 100644 lib/model/devicedownloadstate.go create mode 100644 lib/model/devicedownloadstate_test.go create mode 100644 lib/model/sentdownloadstate.go diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index 1ea2cbc2f..5fa63e10d 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -32,6 +32,7 @@ import ( "github.com/syncthing/syncthing/lib/discover" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/logger" + "github.com/syncthing/syncthing/lib/model" "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/relay" @@ -85,7 +86,7 @@ type modelIntf interface { CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool) CurrentGlobalFile(folder string, file string) (protocol.FileInfo, bool) ResetFolder(folder string) - Availability(folder, file string) []protocol.DeviceID + Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []model.Availability GetIgnores(folder string) ([]string, []string, error) SetIgnores(folder string, content []string) error PauseDevice(device protocol.DeviceID) @@ -696,7 +697,7 @@ func (s *apiService) getDBFile(w http.ResponseWriter, r *http.Request) { return } - av := s.model.Availability(folder, file) + av := s.model.Availability(folder, file, protocol.Vector{}, protocol.BlockInfo{}) sendJSON(w, map[string]interface{}{ "global": jsonFileInfo(gf), "local": jsonFileInfo(lf), diff --git a/cmd/syncthing/mocked_model_test.go b/cmd/syncthing/mocked_model_test.go index 141c9f751..4db670827 100644 --- a/cmd/syncthing/mocked_model_test.go +++ b/cmd/syncthing/mocked_model_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/syncthing/syncthing/lib/db" + "github.com/syncthing/syncthing/lib/model" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/stats" ) @@ -57,7 +58,7 @@ func (m *mockedModel) CurrentGlobalFile(folder string, file string) (protocol.Fi func (m *mockedModel) ResetFolder(folder string) { } -func (m *mockedModel) Availability(folder, file string) []protocol.DeviceID { +func (m *mockedModel) Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []model.Availability { return nil } diff --git a/lib/config/config_test.go b/lib/config/config_test.go index 70241e851..2067746b4 100644 --- a/lib/config/config_test.go +++ b/lib/config/config_test.go @@ -62,6 +62,7 @@ func TestDefaultValues(t *testing.T) { ReleasesURL: "https://api.github.com/repos/syncthing/syncthing/releases?per_page=30", AlwaysLocalNets: []string{}, OverwriteNames: false, + TempIndexMinBlocks: 10, } cfg := New(device1) @@ -192,6 +193,7 @@ func TestOverriddenValues(t *testing.T) { ReleasesURL: "https://localhost/releases", AlwaysLocalNets: []string{}, OverwriteNames: true, + TempIndexMinBlocks: 100, } cfg, err := Load("testdata/overridenvalues.xml", device1) diff --git a/lib/config/folderconfiguration.go b/lib/config/folderconfiguration.go index f5c5504b9..3cab0d25d 100644 --- a/lib/config/folderconfiguration.go +++ b/lib/config/folderconfiguration.go @@ -37,6 +37,7 @@ type FolderConfiguration struct { PullerPauseS int `xml:"pullerPauseS" json:"pullerPauseS"` MaxConflicts int `xml:"maxConflicts" json:"maxConflicts"` DisableSparseFiles bool `xml:"disableSparseFiles" json:"disableSparseFiles"` + DisableTempIndexes bool `xml:"disableTempIndexes" json:"disableTempIndexes"` Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved cachedPath string diff --git a/lib/config/optionsconfiguration.go b/lib/config/optionsconfiguration.go index cfca3161d..35c3834f6 100644 --- a/lib/config/optionsconfiguration.go +++ b/lib/config/optionsconfiguration.go @@ -40,6 +40,7 @@ type OptionsConfiguration struct { ReleasesURL string `xml:"releasesURL" json:"releasesURL" default:"https://api.github.com/repos/syncthing/syncthing/releases?per_page=30"` AlwaysLocalNets []string `xml:"alwaysLocalNet" json:"alwaysLocalNets"` OverwriteNames bool `xml:"overwriteNames" json:"overwriteNames" default:"false"` + TempIndexMinBlocks int `xml:"tempIndexMinBlocks" json:"tempIndexMinBlocks" default:"10"` DeprecatedUPnPEnabled bool `xml:"upnpEnabled"` DeprecatedUPnPLeaseM int `xml:"upnpLeaseMinutes"` diff --git a/lib/config/testdata/overridenvalues.xml b/lib/config/testdata/overridenvalues.xml index bc60ab399..01341bb34 100755 --- a/lib/config/testdata/overridenvalues.xml +++ b/lib/config/testdata/overridenvalues.xml @@ -35,5 +35,6 @@ true https://localhost/releases true + 100 diff --git a/lib/model/deviceactivity.go b/lib/model/deviceactivity.go index f6c5ba665..35fae7c7a 100644 --- a/lib/model/deviceactivity.go +++ b/lib/model/deviceactivity.go @@ -26,28 +26,30 @@ func newDeviceActivity() *deviceActivity { } } -func (m *deviceActivity) leastBusy(availability []protocol.DeviceID) protocol.DeviceID { +func (m *deviceActivity) leastBusy(availability []Availability) (Availability, bool) { m.mut.Lock() low := 2<<30 - 1 - var selected protocol.DeviceID - for _, device := range availability { - if usage := m.act[device]; usage < low { + found := false + var selected Availability + for _, info := range availability { + if usage := m.act[info.ID]; usage < low { low = usage - selected = device + selected = info + found = true } } m.mut.Unlock() - return selected + return selected, found } -func (m *deviceActivity) using(device protocol.DeviceID) { +func (m *deviceActivity) using(availability Availability) { m.mut.Lock() - m.act[device]++ + m.act[availability.ID]++ m.mut.Unlock() } -func (m *deviceActivity) done(device protocol.DeviceID) { +func (m *deviceActivity) done(availability Availability) { m.mut.Lock() - m.act[device]-- + m.act[availability.ID]-- m.mut.Unlock() } diff --git a/lib/model/deviceactivity_test.go b/lib/model/deviceactivity_test.go index 465957711..04d617a24 100644 --- a/lib/model/deviceactivity_test.go +++ b/lib/model/deviceactivity_test.go @@ -13,46 +13,48 @@ import ( ) func TestDeviceActivity(t *testing.T) { - n0 := protocol.DeviceID([32]byte{1, 2, 3, 4}) - n1 := protocol.DeviceID([32]byte{5, 6, 7, 8}) - n2 := protocol.DeviceID([32]byte{9, 10, 11, 12}) - devices := []protocol.DeviceID{n0, n1, n2} + n0 := Availability{protocol.DeviceID([32]byte{1, 2, 3, 4}), false} + n1 := Availability{protocol.DeviceID([32]byte{5, 6, 7, 8}), true} + n2 := Availability{protocol.DeviceID([32]byte{9, 10, 11, 12}), false} + devices := []Availability{n0, n1, n2} na := newDeviceActivity() - if lb := na.leastBusy(devices); lb != n0 { + if lb, ok := na.leastBusy(devices); !ok || lb != n0 { t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb) } - if lb := na.leastBusy(devices); lb != n0 { + if lb, ok := na.leastBusy(devices); !ok || lb != n0 { t.Errorf("Least busy device should still be n0 (%v) not %v", n0, lb) } - na.using(na.leastBusy(devices)) - if lb := na.leastBusy(devices); lb != n1 { + lb, _ := na.leastBusy(devices) + na.using(lb) + if lb, ok := na.leastBusy(devices); !ok || lb != n1 { t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb) } - - na.using(na.leastBusy(devices)) - if lb := na.leastBusy(devices); lb != n2 { + lb, _ = na.leastBusy(devices) + na.using(lb) + if lb, ok := na.leastBusy(devices); !ok || lb != n2 { t.Errorf("Least busy device should be n2 (%v) not %v", n2, lb) } - na.using(na.leastBusy(devices)) - if lb := na.leastBusy(devices); lb != n0 { + lb, _ = na.leastBusy(devices) + na.using(lb) + if lb, ok := na.leastBusy(devices); !ok || lb != n0 { t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb) } na.done(n1) - if lb := na.leastBusy(devices); lb != n1 { + if lb, ok := na.leastBusy(devices); !ok || lb != n1 { t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb) } na.done(n2) - if lb := na.leastBusy(devices); lb != n1 { + if lb, ok := na.leastBusy(devices); !ok || lb != n1 { t.Errorf("Least busy device should still be n1 (%v) not %v", n1, lb) } na.done(n0) - if lb := na.leastBusy(devices); lb != n0 { + if lb, ok := na.leastBusy(devices); !ok || lb != n0 { t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb) } } diff --git a/lib/model/devicedownloadstate.go b/lib/model/devicedownloadstate.go new file mode 100644 index 000000000..f95b2631b --- /dev/null +++ b/lib/model/devicedownloadstate.go @@ -0,0 +1,156 @@ +// Copyright (C) 2015 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package model + +import ( + "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/sync" +) + +// deviceFolderFileDownloadState holds current download state of a file that +// a remote device has advertised. blockIndexes represends indexes within +// FileInfo.Blocks that the remote device already has, and version represents +// the version of the file that the remote device is downloading. +type deviceFolderFileDownloadState struct { + blockIndexes []int32 + version protocol.Vector +} + +// deviceFolderDownloadState holds current download state of all files that +// a remote device is currently downloading in a specific folder. +type deviceFolderDownloadState struct { + mut sync.RWMutex + files map[string]deviceFolderFileDownloadState + numberOfBlocksInProgress int +} + +// Has returns wether a block at that specific index, and that specific version of the file +// is currently available on the remote device for pulling from a temporary file. +func (p *deviceFolderDownloadState) Has(file string, version protocol.Vector, index int32) bool { + p.mut.RLock() + defer p.mut.RUnlock() + + local, ok := p.files[file] + + if !ok || !local.version.Equal(version) { + return false + } + + for _, existingIndex := range local.blockIndexes { + if existingIndex == index { + return true + } + } + return false +} + +// Update updates internal state of what has been downloaded into the temporary +// files by the remote device for this specific folder. +func (p *deviceFolderDownloadState) Update(updates []protocol.FileDownloadProgressUpdate) { + p.mut.Lock() + defer p.mut.Unlock() + + for _, update := range updates { + local, ok := p.files[update.Name] + if update.UpdateType == protocol.UpdateTypeForget && ok && local.version.Equal(update.Version) { + p.numberOfBlocksInProgress -= len(local.blockIndexes) + delete(p.files, update.Name) + } else if update.UpdateType == protocol.UpdateTypeAppend { + if !ok { + local = deviceFolderFileDownloadState{ + blockIndexes: update.BlockIndexes, + version: update.Version, + } + } else if !local.version.Equal(update.Version) { + p.numberOfBlocksInProgress -= len(local.blockIndexes) + local.blockIndexes = append(local.blockIndexes[:0], update.BlockIndexes...) + local.version = update.Version + } else { + local.blockIndexes = append(local.blockIndexes, update.BlockIndexes...) + } + p.files[update.Name] = local + p.numberOfBlocksInProgress += len(update.BlockIndexes) + } + } +} + +// NumberOfBlocksInProgress returns the number of blocks the device has downloaded +// for a specific folder. +func (p *deviceFolderDownloadState) NumberOfBlocksInProgress() int { + p.mut.RLock() + n := p.numberOfBlocksInProgress + p.mut.RUnlock() + return n +} + +// deviceDownloadState represents the state of all in progress downloads +// for all folders of a specific device. +type deviceDownloadState struct { + mut sync.RWMutex + folders map[string]*deviceFolderDownloadState + numberOfBlocksInProgress int +} + +// Update updates internal state of what has been downloaded into the temporary +// files by the remote device for this specific folder. +func (t *deviceDownloadState) Update(folder string, updates []protocol.FileDownloadProgressUpdate) { + t.mut.RLock() + f, ok := t.folders[folder] + t.mut.RUnlock() + + if !ok { + f = &deviceFolderDownloadState{ + mut: sync.NewRWMutex(), + files: make(map[string]deviceFolderFileDownloadState), + } + t.mut.Lock() + t.folders[folder] = f + t.mut.Unlock() + } + + f.Update(updates) +} + +// Has returns wether block at that specific index, and that specific version of the file +// is currently available on the remote device for pulling from a temporary file. +func (t *deviceDownloadState) Has(folder, file string, version protocol.Vector, index int32) bool { + if t == nil { + return false + } + t.mut.RLock() + f, ok := t.folders[folder] + t.mut.RUnlock() + + if !ok { + return false + } + + return f.Has(file, version, index) +} + +// NumberOfBlocksInProgress returns the number of blocks the device has downloaded +// for all folders. +func (t *deviceDownloadState) NumberOfBlocksInProgress() int { + if t == nil { + return 0 + } + + n := 0 + t.mut.RLock() + for _, folder := range t.folders { + n += folder.NumberOfBlocksInProgress() + } + t.mut.RUnlock() + return n +} + +func newDeviceDownloadState() *deviceDownloadState { + return &deviceDownloadState{ + mut: sync.NewRWMutex(), + folders: make(map[string]*deviceFolderDownloadState), + } +} diff --git a/lib/model/devicedownloadstate_test.go b/lib/model/devicedownloadstate_test.go new file mode 100644 index 000000000..62e122c7f --- /dev/null +++ b/lib/model/devicedownloadstate_test.go @@ -0,0 +1,111 @@ +// Copyright (C) 2016 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package model + +import ( + "testing" + + "github.com/syncthing/syncthing/lib/protocol" +) + +func TestDeviceDownloadState(t *testing.T) { + v1 := (protocol.Vector{}).Update(0) + v2 := (protocol.Vector{}).Update(1) + + // file 1 version 1 part 1 + f1v1p1 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v1, []int32{0, 1, 2}} + f1v1p2 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v1, []int32{3, 4, 5}} + f1v1del := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeForget, "f1", v1, nil} + f1v2p1 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v2, []int32{10, 11, 12}} + f1v2p2 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v2, []int32{13, 14, 15}} + f1v2del := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeForget, "f1", v2, nil} + + f2v1p1 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f2", v1, []int32{20, 21, 22}} + f2v1p2 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f2", v1, []int32{23, 24, 25}} + f2v1del := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeForget, "f2", v1, nil} + + tests := []struct { + updates []protocol.FileDownloadProgressUpdate + shouldHaveIndexesFrom []protocol.FileDownloadProgressUpdate + shouldNotHaveIndexesFrom []protocol.FileDownloadProgressUpdate + }{ + { //1 + []protocol.FileDownloadProgressUpdate{f1v1p1}, + []protocol.FileDownloadProgressUpdate{f1v1p1}, + []protocol.FileDownloadProgressUpdate{f1v1p2, f1v2p1, f1v2p2}, + }, + { //2 + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2}, + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2}, + []protocol.FileDownloadProgressUpdate{f1v2p1, f1v2p2}, + }, + { //3 + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v1del}, + nil, + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1, f1v2p2}}, + { //4 + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2del}, + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2}, + []protocol.FileDownloadProgressUpdate{f1v2p1, f1v2p2}, + }, + { //5 + // v2 replaces old v1 data + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1}, + []protocol.FileDownloadProgressUpdate{f1v2p1}, + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p2}, + }, + { //6 + // v1 delete on v2 data does nothing + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1, f1v1del}, + []protocol.FileDownloadProgressUpdate{f1v2p1}, + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p2}, + }, + { //7 + // v2 replacees v1, v2 gets deleted, and v2 part 2 gets added. + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1, f1v2del, f1v2p2}, + []protocol.FileDownloadProgressUpdate{f1v2p2}, + []protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1}, + }, + // Multiple files in one go + { //8 + []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1}, + []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1}, + []protocol.FileDownloadProgressUpdate{f1v1p2, f2v1p2}, + }, + { //9 + []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1, f2v1del}, + []protocol.FileDownloadProgressUpdate{f1v1p1}, + []protocol.FileDownloadProgressUpdate{f2v1p1, f2v1p1}, + }, + { //10 + []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1del, f2v1p1}, + []protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1}, + []protocol.FileDownloadProgressUpdate{f1v1p2, f2v1p2}, + }, + } + + for i, test := range tests { + s := newDeviceDownloadState() + s.Update("folder", test.updates) + + for _, expected := range test.shouldHaveIndexesFrom { + for _, n := range expected.BlockIndexes { + if !s.Has("folder", expected.Name, expected.Version, n) { + t.Error("Test", i+1, "error:", expected.Name, expected.Version, "missing", n) + } + } + } + + for _, unexpected := range test.shouldNotHaveIndexesFrom { + for _, n := range unexpected.BlockIndexes { + if s.Has("folder", unexpected.Name, unexpected.Version, n) { + t.Error("Test", i+1, "error:", unexpected.Name, unexpected.Version, "has extra", n) + } + } + } + } +} diff --git a/lib/model/model.go b/lib/model/model.go index f2550adb2..fdf7b3e4a 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -60,6 +60,11 @@ type service interface { getState() (folderState, time.Time, error) } +type Availability struct { + ID protocol.DeviceID `json:"id"` + FromTemporary bool `json:"fromTemporary"` +} + type Model struct { *suture.Supervisor @@ -87,10 +92,12 @@ type Model struct { folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef fmut sync.RWMutex // protects the above - conn map[protocol.DeviceID]Connection - helloMessages map[protocol.DeviceID]protocol.HelloMessage - devicePaused map[protocol.DeviceID]bool - pmut sync.RWMutex // protects the above + conn map[protocol.DeviceID]Connection + helloMessages map[protocol.DeviceID]protocol.HelloMessage + deviceClusterConf map[protocol.DeviceID]protocol.ClusterConfigMessage + devicePaused map[protocol.DeviceID]bool + deviceDownloads map[protocol.DeviceID]*deviceDownloadState + pmut sync.RWMutex // protects the above } var ( @@ -129,10 +136,11 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, folderStatRefs: make(map[string]*stats.FolderStatisticsReference), conn: make(map[protocol.DeviceID]Connection), helloMessages: make(map[protocol.DeviceID]protocol.HelloMessage), + deviceClusterConf: make(map[protocol.DeviceID]protocol.ClusterConfigMessage), devicePaused: make(map[protocol.DeviceID]bool), - - fmut: sync.NewRWMutex(), - pmut: sync.NewRWMutex(), + deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), + fmut: sync.NewRWMutex(), + pmut: sync.NewRWMutex(), } if cfg.Options().ProgressUpdateIntervalS > -1 { go m.progressEmitter.Serve() @@ -388,6 +396,11 @@ func (m *Model) Completion(device protocol.DeviceID, folder string) float64 { return true }) + // This might might be more than it really is, because some blocks can be of a smaller size. + m.pmut.RLock() + need -= int64(m.deviceDownloads[device].NumberOfBlocksInProgress() * protocol.BlockSize) + m.pmut.RUnlock() + needRatio := float64(need) / float64(tot) completionPct := 100 * (1 - needRatio) l.Debugf("%v Completion(%s, %q): %f (%d / %d = %f)", m, device, folder, completionPct, need, tot, needRatio) @@ -609,6 +622,10 @@ func (m *Model) folderSharedWithUnlocked(folder string, deviceID protocol.Device func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfigMessage) { // Check the peer device's announced folders against our own. Emits events // for folders that we don't expect (unknown or not shared). + // Also, collect a list of folders we do share, and if he's interested in + // temporary indexes, subscribe the connection. + + tempIndexFolders := make([]string, 0, len(cm.Folders)) m.fmut.Lock() nextFolder: @@ -635,9 +652,24 @@ nextFolder: l.Infof("Unexpected folder ID %q sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder.ID, deviceID) continue } + if folder.Flags&protocol.FlagFolderDisabledTempIndexes == 0 { + tempIndexFolders = append(tempIndexFolders, folder.ID) + } } m.fmut.Unlock() + // This breaks if we send multiple CM messages during the same connection. + if len(tempIndexFolders) > 0 { + m.pmut.RLock() + conn, ok := m.conn[deviceID] + m.pmut.RUnlock() + // In case we've got ClusterConfig, and the connection disappeared + // from infront of our nose. + if ok { + m.progressEmitter.temporaryIndexSubscribe(conn, tempIndexFolders) + } + } + var changed bool if m.cfg.Devices()[deviceID].Introducer { @@ -645,9 +677,6 @@ nextFolder: // and devices and add what we are missing. for _, folder := range cm.Folders { - // If we don't have this folder yet, skip it. Ideally, we'd - // offer up something in the GUI to create the folder, but for the - // moment we only handle folders that we already have. if _, ok := m.folderDevices[folder.ID]; !ok { continue } @@ -736,10 +765,13 @@ func (m *Model) Close(device protocol.DeviceID, err error) { conn, ok := m.conn[device] if ok { + m.progressEmitter.temporaryIndexUnsubscribe(conn) closeRawConn(conn) } delete(m.conn, device) delete(m.helloMessages, device) + delete(m.deviceClusterConf, device) + delete(m.deviceDownloads, device) m.pmut.Unlock() } @@ -752,19 +784,20 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset if !m.folderSharedWith(folder, deviceID) { l.Warnf("Request from %s for file %s in unshared folder %q", deviceID, name, folder) - return protocol.ErrInvalid + return protocol.ErrNoSuchFile } - if flags != 0 { - // We don't currently support or expect any flags. - return protocol.ErrInvalid + if flags != 0 && flags != protocol.FlagFromTemporary { + // We currently support only no flags, or FromTemporary flag. + return fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags) } if deviceID != protocol.LocalDeviceID { - l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, deviceID, folder, name, offset, len(buf)) + l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d f=%d", m, deviceID, folder, name, offset, len(buf), flags) } m.fmut.RLock() - folderPath := m.folderCfgs[folder].Path() + folderCfg := m.folderCfgs[folder] + folderPath := folderCfg.Path() folderIgnores := m.folderIgnores[folder] m.fmut.RUnlock() @@ -802,8 +835,6 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset } } - var reader io.ReaderAt - var err error if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 { target, _, err := symlinks.Read(fn) if err != nil { @@ -813,28 +844,30 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset } return protocol.ErrGeneric } - reader = strings.NewReader(target) - } else { - // Cannot easily cache fd's because we might need to delete the file - // at any moment. - reader, err = os.Open(fn) - if err != nil { - l.Debugln("os.Open:", err) - if os.IsNotExist(err) { - return protocol.ErrNoSuchFile - } + if _, err := strings.NewReader(target).ReadAt(buf, offset); err != nil { + l.Debugln("symlink.Reader.ReadAt", err) return protocol.ErrGeneric } - - defer reader.(*os.File).Close() + return nil } - _, err = reader.ReadAt(buf, offset) - if err != nil { - l.Debugln("reader.ReadAt:", err) + // Only check temp files if the flag is set, and if we are set to advertise + // the temp indexes. + if flags&protocol.FlagFromTemporary != 0 && !folderCfg.DisableTempIndexes { + tempFn := filepath.Join(folderPath, defTempNamer.TempName(name)) + if err := readOffsetIntoBuf(tempFn, offset, buf); err == nil { + return nil + } + // Fall through to reading from a non-temp file, just incase the temp + // file has finished downloading. + } + + err := readOffsetIntoBuf(fn, offset, buf) + if os.IsNotExist(err) { + return protocol.ErrNoSuchFile + } else if err != nil { return protocol.ErrGeneric } - return nil } @@ -986,6 +1019,7 @@ func (m *Model) AddConnection(conn Connection, hello protocol.HelloMessage) { panic("add existing device") } m.conn[deviceID] = conn + m.deviceDownloads[deviceID] = newDeviceDownloadState() m.helloMessages[deviceID] = hello @@ -1040,6 +1074,24 @@ func (m *Model) PauseDevice(device protocol.DeviceID) { events.Default.Log(events.DevicePaused, map[string]string{"device": device.String()}) } +func (m *Model) DownloadProgress(device protocol.DeviceID, folder string, updates []protocol.FileDownloadProgressUpdate, flags uint32, options []protocol.Option) { + if !m.folderSharedWith(folder, device) { + return + } + + m.fmut.RLock() + cfg, ok := m.folderCfgs[folder] + m.fmut.RUnlock() + + if !ok || cfg.ReadOnly || cfg.DisableTempIndexes { + return + } + + m.pmut.RLock() + m.deviceDownloads[device].Update(folder, updates) + m.pmut.RUnlock() +} + func (m *Model) ResumeDevice(device protocol.DeviceID) { m.pmut.Lock() m.devicePaused[device] = false @@ -1211,7 +1263,7 @@ func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) { }) } -func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []protocol.Option) ([]byte, error) { +func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) { m.pmut.RLock() nc, ok := m.conn[deviceID] m.pmut.RUnlock() @@ -1220,9 +1272,9 @@ func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, o return nil, fmt.Errorf("requestGlobal: no such device: %s", deviceID) } - l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x f=%x op=%s", m, deviceID, folder, name, offset, size, hash, flags, options) + l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x ft=%t op=%s", m, deviceID, folder, name, offset, size, hash, fromTemporary) - return nc.Request(folder, name, offset, size, hash, flags, options) + return nc.Request(folder, name, offset, size, hash, fromTemporary) } func (m *Model) AddFolder(cfg config.FolderConfiguration) { @@ -1553,6 +1605,9 @@ func (m *Model) generateClusterConfig(device protocol.DeviceID) protocol.Cluster if folderCfg.IgnoreDelete { flags |= protocol.FlagFolderIgnoreDelete } + if folderCfg.DisableTempIndexes { + flags |= protocol.FlagFolderDisabledTempIndexes + } protocolFolder.Flags = flags for _, device := range m.folderDevices[folder] { // DeviceID is a value type, but with an underlying array. Copy it @@ -1736,7 +1791,7 @@ func (m *Model) GlobalDirectoryTree(folder, prefix string, levels int, dirsonly return output } -func (m *Model) Availability(folder, file string) []protocol.DeviceID { +func (m *Model) Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []Availability { // Acquire this lock first, as the value returned from foldersFiles can // get heavily modified on Close() m.pmut.RLock() @@ -1744,19 +1799,27 @@ func (m *Model) Availability(folder, file string) []protocol.DeviceID { m.fmut.RLock() fs, ok := m.folderFiles[folder] + devices := m.folderDevices[folder] m.fmut.RUnlock() if !ok { return nil } - availableDevices := []protocol.DeviceID{} + var availabilities []Availability for _, device := range fs.Availability(file) { _, ok := m.conn[device] if ok { - availableDevices = append(availableDevices, device) + availabilities = append(availabilities, Availability{ID: device, FromTemporary: false}) } } - return availableDevices + + for _, device := range devices { + if m.deviceDownloads[device].Has(folder, file, version, int32(block.Offset/protocol.BlockSize)) { + availabilities = append(availabilities, Availability{ID: device, FromTemporary: true}) + } + } + + return availabilities } // BringToFront bumps the given files priority in the job queue. @@ -2086,6 +2149,21 @@ func stringSliceWithout(ss []string, s string) []string { return ss } +func readOffsetIntoBuf(file string, offset int64, buf []byte) error { + fd, err := os.Open(file) + if err != nil { + l.Debugln("readOffsetIntoBuf.Open", file, err) + return err + } + + defer fd.Close() + _, err = fd.ReadAt(buf, offset) + if err != nil { + l.Debugln("readOffsetIntoBuf.ReadAt", file, err) + } + return err +} + // The exists function is expected to return true for all known paths // (excluding "" and ".") func unifySubs(dirs []string, exists func(dir string) bool) []string { diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 828437cc5..24c7352ff 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -203,9 +203,17 @@ func benchmarkIndexUpdate(b *testing.B, nfiles, nufiles int) { b.ReportAllocs() } +type downloadProgressMessage struct { + folder string + updates []protocol.FileDownloadProgressUpdate + flags uint32 + options []protocol.Option +} + type FakeConnection struct { - id protocol.DeviceID - requestData []byte + id protocol.DeviceID + requestData []byte + downloadProgressMessages []downloadProgressMessage } func (FakeConnection) Close() error { @@ -235,7 +243,7 @@ func (FakeConnection) IndexUpdate(string, []protocol.FileInfo, uint32, []protoco return nil } -func (f FakeConnection) Request(folder, name string, offset int64, size int, hash []byte, flags uint32, options []protocol.Option) ([]byte, error) { +func (f FakeConnection) Request(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) { return f.requestData, nil } @@ -253,6 +261,15 @@ func (FakeConnection) Statistics() protocol.Statistics { return protocol.Statistics{} } +func (f *FakeConnection) DownloadProgress(folder string, updates []protocol.FileDownloadProgressUpdate, flags uint32, options []protocol.Option) { + f.downloadProgressMessages = append(f.downloadProgressMessages, downloadProgressMessage{ + folder: folder, + updates: updates, + flags: flags, + options: options, + }) +} + func BenchmarkRequest(b *testing.B) { db := db.OpenMemory() m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db, nil) @@ -271,7 +288,7 @@ func BenchmarkRequest(b *testing.B) { } } - fc := FakeConnection{ + fc := &FakeConnection{ id: device1, requestData: []byte("some data to return"), } @@ -284,7 +301,7 @@ func BenchmarkRequest(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - data, err := m.requestGlobal(device1, "default", files[i%n].Name, 0, 32, nil, 0, nil) + data, err := m.requestGlobal(device1, "default", files[i%n].Name, 0, 32, nil, false) if err != nil { b.Error(err) } @@ -318,7 +335,7 @@ func TestDeviceRename(t *testing.T) { conn := Connection{ &net.TCPConn{}, - FakeConnection{ + &FakeConnection{ id: device1, requestData: []byte("some data to return"), }, diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go index 70b7ba9df..a4d225d74 100755 --- a/lib/model/progressemitter.go +++ b/lib/model/progressemitter.go @@ -9,19 +9,22 @@ package model import ( "fmt" "path/filepath" - "reflect" "time" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/events" + "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" ) type ProgressEmitter struct { - registry map[string]*sharedPullerState - interval time.Duration - last map[string]map[string]*pullerProgress - mut sync.Mutex + registry map[string]*sharedPullerState + interval time.Duration + minBlocks int + lastUpdate time.Time + sentDownloadStates map[protocol.DeviceID]*sentDownloadState // States representing what we've sent to the other peer via DownloadProgress messages. + connections map[string][]protocol.Connection + mut sync.Mutex timer *time.Timer @@ -32,11 +35,12 @@ type ProgressEmitter struct { // DownloadProgress events every interval. func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter { t := &ProgressEmitter{ - stop: make(chan struct{}), - registry: make(map[string]*sharedPullerState), - last: make(map[string]map[string]*pullerProgress), - timer: time.NewTimer(time.Millisecond), - mut: sync.NewMutex(), + stop: make(chan struct{}), + registry: make(map[string]*sharedPullerState), + timer: time.NewTimer(time.Millisecond), + sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState), + connections: make(map[string][]protocol.Connection), + mut: sync.NewMutex(), } t.CommitConfiguration(config.Configuration{}, cfg.Raw()) @@ -48,6 +52,8 @@ func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter { // Serve starts the progress emitter which starts emitting DownloadProgress // events as the progress happens. func (t *ProgressEmitter) Serve() { + var lastUpdate time.Time + var lastCount, newCount int for { select { case <-t.stop: @@ -56,21 +62,28 @@ func (t *ProgressEmitter) Serve() { case <-t.timer.C: t.mut.Lock() l.Debugln("progress emitter: timer - looking after", len(t.registry)) - output := make(map[string]map[string]*pullerProgress) + + newLastUpdated := lastUpdate + newCount = len(t.registry) for _, puller := range t.registry { - if output[puller.folder] == nil { - output[puller.folder] = make(map[string]*pullerProgress) + updated := puller.Updated() + if updated.After(newLastUpdated) { + newLastUpdated = updated } - output[puller.folder][puller.file.Name] = puller.Progress() } - if !reflect.DeepEqual(t.last, output) { - events.Default.Log(events.DownloadProgress, output) - t.last = output - l.Debugf("progress emitter: emitting %#v", output) + + if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount { + lastUpdate = newLastUpdated + lastCount = newCount + t.sendDownloadProgressEvent() + if len(t.connections) > 0 { + t.sendDownloadProgressMessages() + } } else { l.Debugln("progress emitter: nothing new") } - if len(t.registry) != 0 { + + if newCount != 0 { t.timer.Reset(t.interval) } t.mut.Unlock() @@ -78,6 +91,95 @@ func (t *ProgressEmitter) Serve() { } } +func (t *ProgressEmitter) sendDownloadProgressEvent() { + // registry lock already held + output := make(map[string]map[string]*pullerProgress) + for _, puller := range t.registry { + if output[puller.folder] == nil { + output[puller.folder] = make(map[string]*pullerProgress) + } + output[puller.folder][puller.file.Name] = puller.Progress() + } + events.Default.Log(events.DownloadProgress, output) + l.Debugf("progress emitter: emitting %#v", output) +} + +func (t *ProgressEmitter) sendDownloadProgressMessages() { + // registry lock already held + sharedFolders := make(map[protocol.DeviceID][]string) + deviceConns := make(map[protocol.DeviceID]protocol.Connection) + subscribers := t.connections + for folder, conns := range subscribers { + for _, conn := range conns { + id := conn.ID() + + deviceConns[id] = conn + sharedFolders[id] = append(sharedFolders[id], folder) + + state, ok := t.sentDownloadStates[id] + if !ok { + state = &sentDownloadState{ + folderStates: make(map[string]*sentFolderDownloadState), + } + t.sentDownloadStates[id] = state + } + + var activePullers []*sharedPullerState + for _, puller := range t.registry { + if puller.folder != folder || puller.file.IsSymlink() || puller.file.IsDirectory() || len(puller.file.Blocks) <= t.minBlocks { + continue + } + activePullers = append(activePullers, puller) + } + + // For every new puller that hasn't yet been seen, it will send all the blocks the puller has available + // For every existing puller, it will check for new blocks, and send update for the new blocks only + // For every puller that we've seen before but is no longer there, we will send a forget message + updates := state.update(folder, activePullers) + + if len(updates) > 0 { + conn.DownloadProgress(folder, updates, 0, nil) + } + } + } + + // Clean up sentDownloadStates for devices which we are no longer connected to. + for id := range t.sentDownloadStates { + _, ok := deviceConns[id] + if !ok { + // Null out outstanding entries for device + delete(t.sentDownloadStates, id) + } + } + + // If a folder was unshared from some device, tell it that all temp files + // are now gone. + for id, sharedDeviceFolders := range sharedFolders { + state := t.sentDownloadStates[id] + nextFolder: + // For each of the folders that the state is aware of, + // try to match it with a shared folder we've discovered above, + for _, folder := range state.folders() { + for _, existingFolder := range sharedDeviceFolders { + if existingFolder == folder { + continue nextFolder + } + } + + // If we fail to find that folder, we tell the state to forget about it + // and return us a list of updates which would clean up the state + // on the remote end. + updates := state.cleanup(folder) + if len(updates) > 0 { + // XXX: Don't send this now, as the only way we've unshared a folder + // is by breaking the connection and reconnecting, hence sending + // forget messages for some random folder currently makes no sense. + // deviceConns[id].DownloadProgress(folder, updates, 0, nil) + } + } + } +} + // VerifyConfiguration implements the config.Committer interface func (t *ProgressEmitter) VerifyConfiguration(from, to config.Configuration) error { return nil @@ -89,6 +191,7 @@ func (t *ProgressEmitter) CommitConfiguration(from, to config.Configuration) boo defer t.mut.Unlock() t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second + t.minBlocks = to.Options.TempIndexMinBlocks l.Debugln("progress emitter: updated interval", t.interval) return true @@ -115,7 +218,9 @@ func (t *ProgressEmitter) Register(s *sharedPullerState) { func (t *ProgressEmitter) Deregister(s *sharedPullerState) { t.mut.Lock() defer t.mut.Unlock() + l.Debugln("progress emitter: deregistering", s.folder, s.file.Name) + delete(t.registry, filepath.Join(s.folder, s.file.Name)) } @@ -142,3 +247,38 @@ func (t *ProgressEmitter) lenRegistry() int { defer t.mut.Unlock() return len(t.registry) } + +func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) { + t.mut.Lock() + for _, folder := range folders { + t.connections[folder] = append(t.connections[folder], conn) + } + t.mut.Unlock() +} + +func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) { + t.mut.Lock() + left := make(map[string][]protocol.Connection, len(t.connections)) + for folder, conns := range t.connections { + connsLeft := connsWithout(conns, conn) + if len(connsLeft) > 0 { + left[folder] = connsLeft + } + } + t.connections = left + t.mut.Unlock() +} + +func connsWithout(conns []protocol.Connection, conn protocol.Connection) []protocol.Connection { + if len(conns) == 0 { + return nil + } + + newConns := make([]protocol.Connection, 0, len(conns)-1) + for _, existingConn := range conns { + if existingConn != conn { + newConns = append(newConns, existingConn) + } + } + return newConns +} diff --git a/lib/model/progressemitter_test.go b/lib/model/progressemitter_test.go index cd8bba22d..bb20c2248 100644 --- a/lib/model/progressemitter_test.go +++ b/lib/model/progressemitter_test.go @@ -7,34 +7,46 @@ package model import ( + "fmt" + "path/filepath" + "runtime" "testing" "time" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/events" + "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" ) -var timeout = 10 * time.Millisecond +var timeout = 100 * time.Millisecond + +func caller(skip int) string { + _, file, line, ok := runtime.Caller(skip + 1) + if !ok { + return "unknown" + } + return fmt.Sprintf("%s:%d", filepath.Base(file), line) +} func expectEvent(w *events.Subscription, t *testing.T, size int) { event, err := w.Poll(timeout) if err != nil { - t.Fatal("Unexpected error:", err) + t.Fatal("Unexpected error:", err, "at", caller(1)) } if event.Type != events.DownloadProgress { - t.Fatal("Unexpected event:", event) + t.Fatal("Unexpected event:", event, "at", caller(1)) } data := event.Data.(map[string]map[string]*pullerProgress) if len(data) != size { - t.Fatal("Unexpected event data size:", data) + t.Fatal("Unexpected event data size:", data, "at", caller(1)) } } func expectTimeout(w *events.Subscription, t *testing.T) { _, err := w.Poll(timeout) if err != events.ErrTimeout { - t.Fatal("Unexpected non-Timeout error:", err) + t.Fatal("Unexpected non-Timeout error:", err, "at", caller(1)) } } @@ -52,14 +64,15 @@ func TestProgressEmitter(t *testing.T) { expectTimeout(w, t) s := sharedPullerState{ - mut: sync.NewMutex(), + updated: time.Now(), + mut: sync.NewRWMutex(), } p.Register(&s) expectEvent(w, t, 1) expectTimeout(w, t) - s.copyDone() + s.copyDone(protocol.BlockInfo{}) expectEvent(w, t, 1) expectTimeout(w, t) @@ -74,7 +87,7 @@ func TestProgressEmitter(t *testing.T) { expectEvent(w, t, 1) expectTimeout(w, t) - s.pullDone() + s.pullDone(protocol.BlockInfo{}) expectEvent(w, t, 1) expectTimeout(w, t) @@ -85,3 +98,335 @@ func TestProgressEmitter(t *testing.T) { expectTimeout(w, t) } + +func TestSendDownloadProgressMessages(t *testing.T) { + + c := config.Wrap("/tmp/test", config.Configuration{}) + c.SetOptions(config.OptionsConfiguration{ + ProgressUpdateIntervalS: 0, + TempIndexMinBlocks: 10, + }) + + fc := &FakeConnection{} + + p := NewProgressEmitter(c) + p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"}) + + expect := func(updateIdx int, state *sharedPullerState, updateType uint32, version protocol.Vector, blocks []int32, remove bool) { + messageIdx := -1 + for i, msg := range fc.downloadProgressMessages { + if msg.folder == state.folder { + messageIdx = i + break + } + } + if messageIdx < 0 { + t.Errorf("Message for folder %s does not exist at %s", state.folder, caller(1)) + } + + msg := fc.downloadProgressMessages[messageIdx] + + // Don't know the index (it's random due to iterating maps) + if updateIdx == -1 { + for i, upd := range msg.updates { + if upd.Name == state.file.Name { + updateIdx = i + break + } + } + } + + if updateIdx == -1 { + t.Errorf("Could not find update for %s at %s", state.file.Name, caller(1)) + } + + if updateIdx > len(msg.updates)-1 { + t.Errorf("Update at index %d does not exist at %s", updateIdx, caller(1)) + } + + update := msg.updates[updateIdx] + + if update.UpdateType != updateType { + t.Errorf("Wrong update type at %s", caller(1)) + } + + if !update.Version.Equal(version) { + t.Errorf("Wrong version at %s", caller(1)) + } + + if len(update.BlockIndexes) != len(blocks) { + t.Errorf("Wrong indexes. Have %d expect %d at %s", len(update.BlockIndexes), len(blocks), caller(1)) + } + for i := range update.BlockIndexes { + if update.BlockIndexes[i] != blocks[i] { + t.Errorf("Index %d incorrect at %s", i, caller(1)) + } + } + + if remove { + fc.downloadProgressMessages = append(fc.downloadProgressMessages[:messageIdx], fc.downloadProgressMessages[messageIdx+1:]...) + } + } + expectEmpty := func() { + if len(fc.downloadProgressMessages) > 0 { + t.Errorf("Still have something at %s: %#v", caller(1), fc.downloadProgressMessages) + } + } + + now := time.Now() + tick := func() time.Time { + now = now.Add(time.Second) + return now + } + + if len(fc.downloadProgressMessages) != 0 { + t.Error("Expected no requests") + } + + v1 := (protocol.Vector{}).Update(0) + v2 := (protocol.Vector{}).Update(1) + + // Requires more than 10 blocks to work. + blocks := make([]protocol.BlockInfo, 11, 11) + + state1 := &sharedPullerState{ + folder: "folder", + file: protocol.FileInfo{ + Name: "state1", + Version: v1, + Blocks: blocks, + }, + mut: sync.NewRWMutex(), + availableUpdated: time.Now(), + } + p.registry["1"] = state1 + + // Has no blocks, hence no message is sent + p.sendDownloadProgressMessages() + expectEmpty() + + // Returns update for puller with new extra blocks + state1.available = []int32{1} + p.sendDownloadProgressMessages() + + expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{1}, true) + expectEmpty() + + // Does nothing if nothing changes + p.sendDownloadProgressMessages() + expectEmpty() + + // Does nothing if timestamp updated, but no new blocks (should never happen) + state1.availableUpdated = tick() + + p.sendDownloadProgressMessages() + expectEmpty() + + // Does not return an update if date blocks change but date does not (should never happen) + state1.available = []int32{1, 2} + + p.sendDownloadProgressMessages() + expectEmpty() + + // If the date and blocks changes, returns only the diff + state1.availableUpdated = tick() + + p.sendDownloadProgressMessages() + + expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{2}, true) + expectEmpty() + + // Returns forget and update if puller version has changed + state1.file.Version = v2 + + p.sendDownloadProgressMessages() + + expect(0, state1, protocol.UpdateTypeForget, v1, nil, false) + expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1, 2}, true) + expectEmpty() + + // Sends an empty update if new file exists, but does not have any blocks yet. (To indicate that the old blocks are no longer available) + state1.file.Version = v1 + state1.available = nil + state1.availableUpdated = tick() + + p.sendDownloadProgressMessages() + + expect(0, state1, protocol.UpdateTypeForget, v2, nil, false) + expect(1, state1, protocol.UpdateTypeAppend, v1, nil, true) + expectEmpty() + + // Updates for multiple files and folders can be combined + state1.available = []int32{1, 2, 3} + state1.availableUpdated = tick() + + state2 := &sharedPullerState{ + folder: "folder2", + file: protocol.FileInfo{ + Name: "state2", + Version: v1, + Blocks: blocks, + }, + mut: sync.NewRWMutex(), + available: []int32{1, 2, 3}, + availableUpdated: time.Now(), + } + state3 := &sharedPullerState{ + folder: "folder", + file: protocol.FileInfo{ + Name: "state3", + Version: v1, + Blocks: blocks, + }, + mut: sync.NewRWMutex(), + available: []int32{1, 2, 3}, + availableUpdated: time.Now(), + } + state4 := &sharedPullerState{ + folder: "folder2", + file: protocol.FileInfo{ + Name: "state4", + Version: v1, + Blocks: blocks, + }, + mut: sync.NewRWMutex(), + available: []int32{1, 2, 3}, + availableUpdated: time.Now(), + } + p.registry["2"] = state2 + p.registry["3"] = state3 + p.registry["4"] = state4 + + p.sendDownloadProgressMessages() + + expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false) + expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true) + expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false) + expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true) + expectEmpty() + + // Returns forget if puller no longer exists, as well as updates if it has been updated. + state1.available = []int32{1, 2, 3, 4, 5} + state1.availableUpdated = tick() + state2.available = []int32{1, 2, 3, 4, 5} + state2.availableUpdated = tick() + + delete(p.registry, "3") + delete(p.registry, "4") + + p.sendDownloadProgressMessages() + + expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false) + expect(-1, state3, protocol.UpdateTypeForget, v1, nil, true) + expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false) + expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true) + expectEmpty() + + // Deletions are sent only once (actual bug I found writing the tests) + p.sendDownloadProgressMessages() + p.sendDownloadProgressMessages() + expectEmpty() + + // Not sent for "inactive" (symlinks, dirs, or wrong folder) pullers + // Directory + state5 := &sharedPullerState{ + folder: "folder", + file: protocol.FileInfo{ + Name: "state5", + Version: v1, + Flags: protocol.FlagDirectory, + Blocks: blocks, + }, + mut: sync.NewRWMutex(), + available: []int32{1, 2, 3}, + availableUpdated: time.Now(), + } + // Symlink + state6 := &sharedPullerState{ + folder: "folder", + file: protocol.FileInfo{ + Name: "state6", + Version: v1, + Flags: protocol.FlagSymlink, + }, + mut: sync.NewRWMutex(), + available: []int32{1, 2, 3}, + availableUpdated: time.Now(), + } + // Some other directory + state7 := &sharedPullerState{ + folder: "folderXXX", + file: protocol.FileInfo{ + Name: "state7", + Version: v1, + Blocks: blocks, + }, + mut: sync.NewRWMutex(), + available: []int32{1, 2, 3}, + availableUpdated: time.Now(), + } + // Less than 10 blocks + state8 := &sharedPullerState{ + folder: "folder", + file: protocol.FileInfo{ + Name: "state8", + Version: v1, + Blocks: blocks[:3], + }, + mut: sync.NewRWMutex(), + available: []int32{1, 2, 3}, + availableUpdated: time.Now(), + } + p.registry["5"] = state5 + p.registry["6"] = state6 + p.registry["7"] = state7 + p.registry["8"] = state8 + + p.sendDownloadProgressMessages() + + expectEmpty() + + // Device is no longer subscribed to a particular folder + delete(p.registry, "1") // Clean up first + delete(p.registry, "2") // Clean up first + + p.sendDownloadProgressMessages() + expect(-1, state1, protocol.UpdateTypeForget, v1, nil, true) + expect(-1, state2, protocol.UpdateTypeForget, v1, nil, true) + + expectEmpty() + + p.registry["1"] = state1 + p.registry["2"] = state2 + p.registry["3"] = state3 + p.registry["4"] = state4 + + p.sendDownloadProgressMessages() + + expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false) + expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true) + expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false) + expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true) + expectEmpty() + + p.temporaryIndexUnsubscribe(fc) + p.temporaryIndexSubscribe(fc, []string{"folder"}) + + p.sendDownloadProgressMessages() + + // See progressemitter.go for explanation why this is commented out. + // Search for state.cleanup + //expect(-1, state2, protocol.UpdateTypeForget, v1, nil, false) + //expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true) + + expectEmpty() + + // Cleanup when device no longer exists + p.temporaryIndexUnsubscribe(fc) + + p.sendDownloadProgressMessages() + _, ok := p.sentDownloadStates[fc.ID()] + if ok { + t.Error("Should not be there") + } +} diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go index a0b17e4fd..3947f38f6 100644 --- a/lib/model/rwfolder.go +++ b/lib/model/rwfolder.go @@ -970,9 +970,9 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks scanner.PopulateOffsets(file.Blocks) - reused := 0 var blocks []protocol.BlockInfo var blocksSize int64 + var reused []int32 // Check for an old temporary file which might have some blocks we could // reuse. @@ -988,25 +988,27 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks } // Since the blocks are already there, we don't need to get them. - for _, block := range file.Blocks { + for i, block := range file.Blocks { _, ok := existingBlocks[block.String()] if !ok { blocks = append(blocks, block) blocksSize += int64(block.Size) + } else { + reused = append(reused, int32(i)) } } // The sharedpullerstate will know which flags to use when opening the // temp file depending if we are reusing any blocks or not. - reused = len(file.Blocks) - len(blocks) - if reused == 0 { + if len(reused) == 0 { // Otherwise, discard the file ourselves in order for the // sharedpuller not to panic when it fails to exclusively create a // file which already exists osutil.InWritableDir(osutil.Remove, tempName) } } else { - blocks = file.Blocks + // Copy the blocks, as we don't want to shuffle them on the FileInfo + blocks = append(blocks, file.Blocks...) blocksSize = file.Size() } @@ -1018,6 +1020,12 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks } } + // Shuffle the blocks + for i := range blocks { + j := rand.Intn(i + 1) + blocks[i], blocks[j] = blocks[j], blocks[i] + } + events.Default.Log(events.ItemStarted, map[string]string{ "folder": p.folder, "item": file.Name, @@ -1026,17 +1034,20 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks }) s := sharedPullerState{ - file: file, - folder: p.folder, - tempName: tempName, - realName: realName, - copyTotal: len(blocks), - copyNeeded: len(blocks), - reused: reused, - ignorePerms: p.ignorePermissions(file), - version: curFile.Version, - mut: sync.NewMutex(), - sparse: p.allowSparse, + file: file, + folder: p.folder, + tempName: tempName, + realName: realName, + copyTotal: len(blocks), + copyNeeded: len(blocks), + reused: len(reused), + updated: time.Now(), + available: reused, + availableUpdated: time.Now(), + ignorePerms: p.ignorePermissions(file), + version: curFile.Version, + mut: sync.NewRWMutex(), + sparse: p.allowSparse, } l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused) @@ -1184,7 +1195,7 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull } pullChan <- ps } else { - state.copyDone() + state.copyDone(block) } } out <- state.sharedPullerState @@ -1210,19 +1221,19 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul if p.allowSparse && state.reused == 0 && state.block.IsEmpty() { // There is no need to request a block of all zeroes. Pretend we // requested it and handled it correctly. - state.pullDone() + state.pullDone(state.block) out <- state.sharedPullerState continue } var lastError error - potentialDevices := p.model.Availability(p.folder, state.file.Name) + candidates := p.model.Availability(p.folder, state.file.Name, state.file.Version, state.block) for { // Select the least busy device to pull the block from. If we found no // feasible device at all, fail the block (and in the long run, the // file). - selected := activity.leastBusy(potentialDevices) - if selected == (protocol.DeviceID{}) { + selected, found := activity.leastBusy(candidates) + if !found { if lastError != nil { state.fail("pull", lastError) } else { @@ -1231,12 +1242,12 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul break } - potentialDevices = removeDevice(potentialDevices, selected) + candidates = removeAvailability(candidates, selected) // Fetch the block, while marking the selected device as in use so that // leastBusy can select another device when someone else asks. activity.using(selected) - buf, lastError := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, 0, nil) + buf, lastError := p.model.requestGlobal(selected.ID, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary) activity.done(selected) if lastError != nil { l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError) @@ -1256,7 +1267,7 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul if err != nil { state.fail("save", err) } else { - state.pullDone() + state.pullDone(state.block) } break } @@ -1481,14 +1492,24 @@ func (p *rwFolder) inConflict(current, replacement protocol.Vector) bool { return false } -func removeDevice(devices []protocol.DeviceID, device protocol.DeviceID) []protocol.DeviceID { - for i := range devices { - if devices[i] == device { - devices[i] = devices[len(devices)-1] - return devices[:len(devices)-1] +func invalidateFolder(cfg *config.Configuration, folderID string, err error) { + for i := range cfg.Folders { + folder := &cfg.Folders[i] + if folder.ID == folderID { + folder.Invalid = err.Error() + return } } - return devices +} + +func removeAvailability(availabilities []Availability, availability Availability) []Availability { + for i := range availabilities { + if availabilities[i] == availability { + availabilities[i] = availabilities[len(availabilities)-1] + return availabilities[:len(availabilities)-1] + } + } + return availabilities } func (p *rwFolder) moveForConflict(name string) error { diff --git a/lib/model/rwfolder_test.go b/lib/model/rwfolder_test.go index a7fb4b280..62f0e5b02 100644 --- a/lib/model/rwfolder_test.go +++ b/lib/model/rwfolder_test.go @@ -104,9 +104,16 @@ func TestHandleFile(t *testing.T) { t.Errorf("Unexpected count of copy blocks: %d != 8", len(toCopy.blocks)) } - for i, block := range toCopy.blocks { - if string(block.Hash) != string(blocks[i+1].Hash) { - t.Errorf("Block mismatch: %s != %s", block.String(), blocks[i+1].String()) + for _, block := range blocks[1:] { + found := false + for _, toCopyBlock := range toCopy.blocks { + if string(toCopyBlock.Hash) == string(block.Hash) { + found = true + break + } + } + if !found { + t.Errorf("Did not find block %s", block.String()) } } } @@ -138,9 +145,17 @@ func TestHandleFileWithTemp(t *testing.T) { t.Errorf("Unexpected count of copy blocks: %d != 4", len(toCopy.blocks)) } - for i, eq := range []int{1, 5, 6, 8} { - if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) { - t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String()) + for _, idx := range []int{1, 5, 6, 8} { + found := false + block := blocks[idx] + for _, toCopyBlock := range toCopy.blocks { + if string(toCopyBlock.Hash) == string(block.Hash) { + found = true + break + } + } + if !found { + t.Errorf("Did not find block %s", block.String()) } } } @@ -187,13 +202,22 @@ func TestCopierFinder(t *testing.T) { default: } - // Verify that the right blocks went into the pull list - for i, eq := range []int{1, 5, 6, 8} { - if string(pulls[i].block.Hash) != string(blocks[eq].Hash) { - t.Errorf("Block %d mismatch: %s != %s", eq, pulls[i].block.String(), blocks[eq].String()) + // Verify that the right blocks went into the pull list. + // They are pulled in random order. + for _, idx := range []int{1, 5, 6, 8} { + found := false + block := blocks[idx] + for _, pulledBlock := range pulls { + if string(pulledBlock.block.Hash) == string(block.Hash) { + found = true + break + } } - if string(finish.file.Blocks[eq-1].Hash) != string(blocks[eq].Hash) { - t.Errorf("Block %d mismatch: %s != %s", eq, finish.file.Blocks[eq-1].String(), blocks[eq].String()) + if !found { + t.Errorf("Did not find block %s", block.String()) + } + if string(finish.file.Blocks[idx-1].Hash) != string(blocks[idx].Hash) { + t.Errorf("Block %d mismatch: %s != %s", idx, finish.file.Blocks[idx-1].String(), blocks[idx].String()) } } diff --git a/lib/model/sentdownloadstate.go b/lib/model/sentdownloadstate.go new file mode 100644 index 000000000..9fd50b0bb --- /dev/null +++ b/lib/model/sentdownloadstate.go @@ -0,0 +1,184 @@ +// Copyright (C) 2015 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package model + +import ( + "time" + + "github.com/syncthing/syncthing/lib/protocol" +) + +// sentFolderFileDownloadState represents a state of what we've announced as available +// to some remote device for a specific file. +type sentFolderFileDownloadState struct { + blockIndexes []int32 + version protocol.Vector + updated time.Time +} + +// sentFolderDownloadState represents a state of what we've announced as available +// to some remote device for a specific folder. +type sentFolderDownloadState struct { + files map[string]*sentFolderFileDownloadState +} + +// update takes a set of currently active sharedPullerStates, and returns a list +// of updates which we need to send to the client to become up to date. +func (s *sentFolderDownloadState) update(pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate { + var name string + var updates []protocol.FileDownloadProgressUpdate + seen := make(map[string]struct{}, len(pullers)) + + for _, puller := range pullers { + name = puller.file.Name + + seen[name] = struct{}{} + + pullerBlockIndexes := puller.Available() + pullerVersion := puller.file.Version + pullerBlockIndexesUpdated := puller.AvailableUpdated() + + localFile, ok := s.files[name] + + // New file we haven't seen before + if !ok { + // Only send an update if the file actually has some blocks. + if len(pullerBlockIndexes) > 0 { + s.files[name] = &sentFolderFileDownloadState{ + blockIndexes: pullerBlockIndexes, + updated: pullerBlockIndexesUpdated, + version: pullerVersion, + } + + updates = append(updates, protocol.FileDownloadProgressUpdate{ + Name: name, + Version: pullerVersion, + UpdateType: protocol.UpdateTypeAppend, + BlockIndexes: pullerBlockIndexes, + }) + } + continue + } + + // Existing file we've already sent an update for. + if pullerBlockIndexesUpdated.Equal(localFile.updated) && pullerVersion.Equal(localFile.version) { + // The file state hasn't changed, go to next. + continue + } + + if !pullerVersion.Equal(localFile.version) { + // The version has changed, clean up whatever we had for the old + // file, and advertise the new file. + updates = append(updates, protocol.FileDownloadProgressUpdate{ + Name: name, + Version: localFile.version, + UpdateType: protocol.UpdateTypeForget, + }) + updates = append(updates, protocol.FileDownloadProgressUpdate{ + Name: name, + Version: pullerVersion, + UpdateType: protocol.UpdateTypeAppend, + BlockIndexes: pullerBlockIndexes, + }) + localFile.blockIndexes = pullerBlockIndexes + localFile.updated = pullerBlockIndexesUpdated + localFile.version = pullerVersion + continue + } + + // Relies on the fact that sharedPullerState.Available() should always + // append. + newBlocks := pullerBlockIndexes[len(localFile.blockIndexes):] + + localFile.blockIndexes = append(localFile.blockIndexes, newBlocks...) + localFile.updated = pullerBlockIndexesUpdated + + // If there are new blocks, send the update. + if len(newBlocks) > 0 { + updates = append(updates, protocol.FileDownloadProgressUpdate{ + Name: name, + Version: localFile.version, + UpdateType: protocol.UpdateTypeAppend, + BlockIndexes: newBlocks, + }) + } + } + + // For each file that we are tracking, see if there still is a puller for it + // if not, the file completed or errored out. + for name, info := range s.files { + _, ok := seen[name] + if !ok { + updates = append(updates, protocol.FileDownloadProgressUpdate{ + Name: name, + Version: info.version, + UpdateType: protocol.UpdateTypeForget, + }) + delete(s.files, name) + } + } + + return updates +} + +// destroy removes all stored state, and returns a set of updates we need to +// dispatch to clean up the state on the remote end. +func (s *sentFolderDownloadState) destroy() []protocol.FileDownloadProgressUpdate { + updates := make([]protocol.FileDownloadProgressUpdate, 0, len(s.files)) + for name, info := range s.files { + updates = append(updates, protocol.FileDownloadProgressUpdate{ + Name: name, + Version: info.version, + UpdateType: protocol.UpdateTypeForget, + }) + delete(s.files, name) + } + return updates +} + +// sentDownloadState represents a state of what we've announced as available +// to some remote device. It is used from within the progress emitter +// which only has one routine, hence is deemed threadsafe. +type sentDownloadState struct { + folderStates map[string]*sentFolderDownloadState +} + +// update receives a folder, and a slice of pullers that are currently available +// for the given folder, and according to the state of what we've seen before +// returns a set of updates which we should send to the remote device to make +// it aware of everything that we currently have available. +func (s *sentDownloadState) update(folder string, pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate { + fs, ok := s.folderStates[folder] + if !ok { + fs = &sentFolderDownloadState{ + files: make(map[string]*sentFolderFileDownloadState), + } + s.folderStates[folder] = fs + } + return fs.update(pullers) +} + +// folders returns a set of folders this state is currently aware off. +func (s *sentDownloadState) folders() []string { + folders := make([]string, 0, len(s.folderStates)) + for key := range s.folderStates { + folders = append(folders, key) + } + return folders +} + +// cleanup cleans up all state related to a folder, and returns a set of updates +// which would clean up the state on the remote device. +func (s *sentDownloadState) cleanup(folder string) []protocol.FileDownloadProgressUpdate { + fs, ok := s.folderStates[folder] + if ok { + updates := fs.destroy() + delete(s.folderStates, folder) + return updates + } + return nil +} diff --git a/lib/model/sharedpullerstate.go b/lib/model/sharedpullerstate.go index 509b1fcfd..f079fdad5 100644 --- a/lib/model/sharedpullerstate.go +++ b/lib/model/sharedpullerstate.go @@ -10,6 +10,7 @@ import ( "io" "os" "path/filepath" + "time" "github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/protocol" @@ -30,15 +31,18 @@ type sharedPullerState struct { sparse bool // Mutable, must be locked for access - err error // The first error we hit - fd *os.File // The fd of the temp file - copyTotal int // Total number of copy actions for the whole job - pullTotal int // Total number of pull actions for the whole job - copyOrigin int // Number of blocks copied from the original file - copyNeeded int // Number of copy actions still pending - pullNeeded int // Number of block pulls still pending - closed bool // True if the file has been finalClosed. - mut sync.Mutex // Protects the above + err error // The first error we hit + fd *os.File // The fd of the temp file + copyTotal int // Total number of copy actions for the whole job + pullTotal int // Total number of pull actions for the whole job + copyOrigin int // Number of blocks copied from the original file + copyNeeded int // Number of copy actions still pending + pullNeeded int // Number of block pulls still pending + updated time.Time // Time when any of the counters above were last updated + closed bool // True if the file has been finalClosed. + available []int32 // Indexes of the blocks that are available in the temporary file + availableUpdated time.Time // Time when list of available blocks was last updated + mut sync.RWMutex // Protects the above } // A momentary state representing the progress of the puller @@ -56,7 +60,7 @@ type pullerProgress struct { // A lockedWriterAt synchronizes WriteAt calls with an external mutex. // WriteAt() is goroutine safe by itself, but not against for example Close(). type lockedWriterAt struct { - mut *sync.Mutex + mut *sync.RWMutex wr io.WriterAt } @@ -196,15 +200,19 @@ func (s *sharedPullerState) failLocked(context string, err error) { } func (s *sharedPullerState) failed() error { - s.mut.Lock() - defer s.mut.Unlock() + s.mut.RLock() + err := s.err + s.mut.RUnlock() - return s.err + return err } -func (s *sharedPullerState) copyDone() { +func (s *sharedPullerState) copyDone(block protocol.BlockInfo) { s.mut.Lock() s.copyNeeded-- + s.updated = time.Now() + s.available = append(s.available, int32(block.Offset/protocol.BlockSize)) + s.availableUpdated = time.Now() l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded) s.mut.Unlock() } @@ -212,6 +220,7 @@ func (s *sharedPullerState) copyDone() { func (s *sharedPullerState) copiedFromOrigin() { s.mut.Lock() s.copyOrigin++ + s.updated = time.Now() s.mut.Unlock() } @@ -221,13 +230,17 @@ func (s *sharedPullerState) pullStarted() { s.copyNeeded-- s.pullTotal++ s.pullNeeded++ + s.updated = time.Now() l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded) s.mut.Unlock() } -func (s *sharedPullerState) pullDone() { +func (s *sharedPullerState) pullDone(block protocol.BlockInfo) { s.mut.Lock() s.pullNeeded-- + s.updated = time.Now() + s.available = append(s.available, int32(block.Offset/protocol.BlockSize)) + s.availableUpdated = time.Now() l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded) s.mut.Unlock() } @@ -265,10 +278,10 @@ func (s *sharedPullerState) finalClose() (bool, error) { return true, s.err } -// Returns the momentarily progress for the puller +// Progress returns the momentarily progress for the puller func (s *sharedPullerState) Progress() *pullerProgress { - s.mut.Lock() - defer s.mut.Unlock() + s.mut.RLock() + defer s.mut.RUnlock() total := s.reused + s.copyTotal + s.pullTotal done := total - s.copyNeeded - s.pullNeeded return &pullerProgress{ @@ -282,3 +295,27 @@ func (s *sharedPullerState) Progress() *pullerProgress { BytesDone: db.BlocksToSize(done), } } + +// Updated returns the time when any of the progress related counters was last updated. +func (s *sharedPullerState) Updated() time.Time { + s.mut.RLock() + t := s.updated + s.mut.RUnlock() + return t +} + +// AvailableUpdated returns the time last time list of available blocks was updated +func (s *sharedPullerState) AvailableUpdated() time.Time { + s.mut.RLock() + t := s.availableUpdated + s.mut.RUnlock() + return t +} + +// Available returns blocks available in the current temporary file +func (s *sharedPullerState) Available() []int32 { + s.mut.RLock() + blocks := s.available + s.mut.RUnlock() + return blocks +} diff --git a/lib/model/sharedpullerstate_test.go b/lib/model/sharedpullerstate_test.go index edcafde0b..315c3d413 100644 --- a/lib/model/sharedpullerstate_test.go +++ b/lib/model/sharedpullerstate_test.go @@ -16,7 +16,7 @@ import ( func TestSourceFileOK(t *testing.T) { s := sharedPullerState{ realName: "testdata/foo", - mut: sync.NewMutex(), + mut: sync.NewRWMutex(), } fd, err := s.sourceFile() @@ -45,7 +45,7 @@ func TestSourceFileOK(t *testing.T) { func TestSourceFileBad(t *testing.T) { s := sharedPullerState{ realName: "nonexistent", - mut: sync.NewMutex(), + mut: sync.NewRWMutex(), } fd, err := s.sourceFile() @@ -71,7 +71,7 @@ func TestReadOnlyDir(t *testing.T) { s := sharedPullerState{ tempName: "testdata/read_only_dir/.temp_name", - mut: sync.NewMutex(), + mut: sync.NewRWMutex(), } fd, err := s.tempFile() diff --git a/lib/protocol/common_test.go b/lib/protocol/common_test.go index c3b8cce5b..849b4e1d4 100644 --- a/lib/protocol/common_test.go +++ b/lib/protocol/common_test.go @@ -49,6 +49,9 @@ func (t *TestModel) Close(deviceID DeviceID, err error) { func (t *TestModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { } +func (t *TestModel) DownloadProgress(DeviceID, string, []FileDownloadProgressUpdate, uint32, []Option) { +} + func (t *TestModel) closedError() error { select { case <-t.closedCh: diff --git a/lib/protocol/message.go b/lib/protocol/message.go index 92626b754..9e195661a 100644 --- a/lib/protocol/message.go +++ b/lib/protocol/message.go @@ -138,6 +138,13 @@ type ClusterConfigMessage struct { Options []Option // max:64 } +type DownloadProgressMessage struct { + Folder string // max:64 + Updates []FileDownloadProgressUpdate // max:1000000 + Flags uint32 + Options []Option // max:64 +} + func (o *ClusterConfigMessage) GetOption(key string) string { for _, option := range o.Options { if option.Key == key { @@ -166,6 +173,13 @@ type Device struct { Options []Option // max:64 } +type FileDownloadProgressUpdate struct { + UpdateType uint32 + Name string // max:8192 + Version Vector + BlockIndexes []int32 // max:1000000 +} + type Option struct { Key string // max:64 Value string // max:1024 diff --git a/lib/protocol/message_xdr.go b/lib/protocol/message_xdr.go index a7d1cec41..206bd394b 100644 --- a/lib/protocol/message_xdr.go +++ b/lib/protocol/message_xdr.go @@ -690,6 +690,135 @@ func (o *ClusterConfigMessage) UnmarshalXDRFrom(u *xdr.Unmarshaller) error { /* +DownloadProgressMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Folder (length + padded data) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Updates | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more FileDownloadProgressUpdate Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Options | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Option Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct DownloadProgressMessage { + string Folder<64>; + FileDownloadProgressUpdate Updates<1000000>; + unsigned int Flags; + Option Options<64>; +} + +*/ + +func (o DownloadProgressMessage) XDRSize() int { + return 4 + len(o.Folder) + xdr.Padding(len(o.Folder)) + + 4 + xdr.SizeOfSlice(o.Updates) + 4 + + 4 + xdr.SizeOfSlice(o.Options) +} + +func (o DownloadProgressMessage) MarshalXDR() ([]byte, error) { + buf := make([]byte, o.XDRSize()) + m := &xdr.Marshaller{Data: buf} + return buf, o.MarshalXDRInto(m) +} + +func (o DownloadProgressMessage) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o DownloadProgressMessage) MarshalXDRInto(m *xdr.Marshaller) error { + if l := len(o.Folder); l > 64 { + return xdr.ElementSizeExceeded("Folder", l, 64) + } + m.MarshalString(o.Folder) + if l := len(o.Updates); l > 1000000 { + return xdr.ElementSizeExceeded("Updates", l, 1000000) + } + m.MarshalUint32(uint32(len(o.Updates))) + for i := range o.Updates { + if err := o.Updates[i].MarshalXDRInto(m); err != nil { + return err + } + } + m.MarshalUint32(o.Flags) + if l := len(o.Options); l > 64 { + return xdr.ElementSizeExceeded("Options", l, 64) + } + m.MarshalUint32(uint32(len(o.Options))) + for i := range o.Options { + if err := o.Options[i].MarshalXDRInto(m); err != nil { + return err + } + } + return m.Error +} + +func (o *DownloadProgressMessage) UnmarshalXDR(bs []byte) error { + u := &xdr.Unmarshaller{Data: bs} + return o.UnmarshalXDRFrom(u) +} +func (o *DownloadProgressMessage) UnmarshalXDRFrom(u *xdr.Unmarshaller) error { + o.Folder = u.UnmarshalStringMax(64) + _UpdatesSize := int(u.UnmarshalUint32()) + if _UpdatesSize < 0 { + return xdr.ElementSizeExceeded("Updates", _UpdatesSize, 1000000) + } else if _UpdatesSize == 0 { + o.Updates = nil + } else { + if _UpdatesSize > 1000000 { + return xdr.ElementSizeExceeded("Updates", _UpdatesSize, 1000000) + } + if _UpdatesSize <= len(o.Updates) { + o.Updates = o.Updates[:_UpdatesSize] + } else { + o.Updates = make([]FileDownloadProgressUpdate, _UpdatesSize) + } + for i := range o.Updates { + (&o.Updates[i]).UnmarshalXDRFrom(u) + } + } + o.Flags = u.UnmarshalUint32() + _OptionsSize := int(u.UnmarshalUint32()) + if _OptionsSize < 0 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } else if _OptionsSize == 0 { + o.Options = nil + } else { + if _OptionsSize > 64 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + if _OptionsSize <= len(o.Options) { + o.Options = o.Options[:_OptionsSize] + } else { + o.Options = make([]Option, _OptionsSize) + } + for i := range o.Options { + (&o.Options[i]).UnmarshalXDRFrom(u) + } + } + return u.Error +} + +/* + Folder Structure: 0 1 2 3 @@ -996,6 +1125,109 @@ func (o *Device) UnmarshalXDRFrom(u *xdr.Unmarshaller) error { /* +FileDownloadProgressUpdate Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Update Type | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Name (length + padded data) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Vector Structure \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Block Indexes | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +| Block Indexes (n items) | +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct FileDownloadProgressUpdate { + unsigned int UpdateType; + string Name<8192>; + Vector Version; + int BlockIndexes<1000000>; +} + +*/ + +func (o FileDownloadProgressUpdate) XDRSize() int { + return 4 + + 4 + len(o.Name) + xdr.Padding(len(o.Name)) + + o.Version.XDRSize() + + 4 + len(o.BlockIndexes)*4 +} + +func (o FileDownloadProgressUpdate) MarshalXDR() ([]byte, error) { + buf := make([]byte, o.XDRSize()) + m := &xdr.Marshaller{Data: buf} + return buf, o.MarshalXDRInto(m) +} + +func (o FileDownloadProgressUpdate) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o FileDownloadProgressUpdate) MarshalXDRInto(m *xdr.Marshaller) error { + m.MarshalUint32(o.UpdateType) + if l := len(o.Name); l > 8192 { + return xdr.ElementSizeExceeded("Name", l, 8192) + } + m.MarshalString(o.Name) + if err := o.Version.MarshalXDRInto(m); err != nil { + return err + } + if l := len(o.BlockIndexes); l > 1000000 { + return xdr.ElementSizeExceeded("BlockIndexes", l, 1000000) + } + m.MarshalUint32(uint32(len(o.BlockIndexes))) + for i := range o.BlockIndexes { + m.MarshalUint32(uint32(o.BlockIndexes[i])) + } + return m.Error +} + +func (o *FileDownloadProgressUpdate) UnmarshalXDR(bs []byte) error { + u := &xdr.Unmarshaller{Data: bs} + return o.UnmarshalXDRFrom(u) +} +func (o *FileDownloadProgressUpdate) UnmarshalXDRFrom(u *xdr.Unmarshaller) error { + o.UpdateType = u.UnmarshalUint32() + o.Name = u.UnmarshalStringMax(8192) + (&o.Version).UnmarshalXDRFrom(u) + _BlockIndexesSize := int(u.UnmarshalUint32()) + if _BlockIndexesSize < 0 { + return xdr.ElementSizeExceeded("BlockIndexes", _BlockIndexesSize, 1000000) + } else if _BlockIndexesSize == 0 { + o.BlockIndexes = nil + } else { + if _BlockIndexesSize > 1000000 { + return xdr.ElementSizeExceeded("BlockIndexes", _BlockIndexesSize, 1000000) + } + if _BlockIndexesSize <= len(o.BlockIndexes) { + o.BlockIndexes = o.BlockIndexes[:_BlockIndexesSize] + } else { + o.BlockIndexes = make([]int32, _BlockIndexesSize) + } + for i := range o.BlockIndexes { + o.BlockIndexes[i] = int32(u.UnmarshalUint32()) + } + } + return u.Error +} + +/* + Option Structure: 0 1 2 3 diff --git a/lib/protocol/nativemodel_darwin.go b/lib/protocol/nativemodel_darwin.go index eb755a6e4..349104257 100644 --- a/lib/protocol/nativemodel_darwin.go +++ b/lib/protocol/nativemodel_darwin.go @@ -9,32 +9,24 @@ package protocol import "golang.org/x/text/unicode/norm" type nativeModel struct { - next Model + Model } func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { for i := range files { files[i].Name = norm.NFD.String(files[i].Name) } - m.next.Index(deviceID, folder, files, flags, options) + m.Model.Index(deviceID, folder, files, flags, options) } func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { for i := range files { files[i].Name = norm.NFD.String(files[i].Name) } - m.next.IndexUpdate(deviceID, folder, files, flags, options) + m.Model.IndexUpdate(deviceID, folder, files, flags, options) } func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { name = norm.NFD.String(name) - return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) -} - -func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { - m.next.ClusterConfig(deviceID, config) -} - -func (m nativeModel) Close(deviceID DeviceID, err error) { - m.next.Close(deviceID, err) + return m.Model.Request(deviceID, folder, name, offset, hash, flags, options, buf) } diff --git a/lib/protocol/nativemodel_unix.go b/lib/protocol/nativemodel_unix.go index 0611865e1..ea6ec8dbd 100644 --- a/lib/protocol/nativemodel_unix.go +++ b/lib/protocol/nativemodel_unix.go @@ -7,25 +7,5 @@ package protocol // Normal Unixes uses NFC and slashes, which is the wire format. type nativeModel struct { - next Model -} - -func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { - m.next.Index(deviceID, folder, files, flags, options) -} - -func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { - m.next.IndexUpdate(deviceID, folder, files, flags, options) -} - -func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { - return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) -} - -func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { - m.next.ClusterConfig(deviceID, config) -} - -func (m nativeModel) Close(deviceID DeviceID, err error) { - m.next.Close(deviceID, err) + Model } diff --git a/lib/protocol/nativemodel_windows.go b/lib/protocol/nativemodel_windows.go index 36a1d2749..186ddfa7f 100644 --- a/lib/protocol/nativemodel_windows.go +++ b/lib/protocol/nativemodel_windows.go @@ -21,30 +21,22 @@ var disallowedCharacters = string([]rune{ }) type nativeModel struct { - next Model + Model } func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { fixupFiles(folder, files) - m.next.Index(deviceID, folder, files, flags, options) + m.Model.Index(deviceID, folder, files, flags, options) } func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { fixupFiles(folder, files) - m.next.IndexUpdate(deviceID, folder, files, flags, options) + m.Model.IndexUpdate(deviceID, folder, files, flags, options) } func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { name = filepath.FromSlash(name) - return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) -} - -func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { - m.next.ClusterConfig(deviceID, config) -} - -func (m nativeModel) Close(deviceID DeviceID, err error) { - m.next.Close(deviceID, err) + return m.Model.Request(deviceID, folder, name, offset, hash, flags, options, buf) } func fixupFiles(folder string, files []FileInfo) { diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index dfc16e059..fffffec62 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -24,13 +24,14 @@ const ( ) const ( - messageTypeClusterConfig = 0 - messageTypeIndex = 1 - messageTypeRequest = 2 - messageTypeResponse = 3 - messageTypePing = 4 - messageTypeIndexUpdate = 6 - messageTypeClose = 7 + messageTypeClusterConfig = 0 + messageTypeIndex = 1 + messageTypeRequest = 2 + messageTypeResponse = 3 + messageTypePing = 4 + messageTypeIndexUpdate = 6 + messageTypeClose = 7 + messageTypeDownloadProgress = 8 ) const ( @@ -52,22 +53,29 @@ const ( SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget ) -// IndexMessage message flags (for IndexUpdate) -const ( - FlagIndexTemporary uint32 = 1 << iota -) - // Request message flags const ( - FlagRequestTemporary uint32 = 1 << iota + FlagFromTemporary uint32 = 1 << iota +) + +// FileDownloadProgressUpdate update types +const ( + UpdateTypeAppend uint32 = iota + UpdateTypeForget +) + +// CLusterConfig flags +const ( + FlagClusterConfigTemporaryIndexes uint32 = 1 << 0 ) // ClusterConfigMessage.Folders flags const ( - FlagFolderReadOnly uint32 = 1 << 0 - FlagFolderIgnorePerms = 1 << 1 - FlagFolderIgnoreDelete = 1 << 2 - FlagFolderAll = 1<<3 - 1 + FlagFolderReadOnly uint32 = 1 << 0 + FlagFolderIgnorePerms = 1 << 1 + FlagFolderIgnoreDelete = 1 << 2 + FlagFolderDisabledTempIndexes = 1 << 3 + FlagFolderAll = 1<<4 - 1 ) // ClusterConfigMessage.Folders.Devices flags @@ -97,6 +105,8 @@ type Model interface { ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) // The peer device closed the connection Close(deviceID DeviceID, err error) + // The peer device sent progress updates for the files it is currently downloading + DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option) } type Connection interface { @@ -105,8 +115,9 @@ type Connection interface { Name() string Index(folder string, files []FileInfo, flags uint32, options []Option) error IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error - Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) + Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) ClusterConfig(config ClusterConfigMessage) + DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option) Statistics() Statistics Closed() bool } @@ -242,7 +253,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, } // Request returns the bytes for the specified block after fetching them from the connected peer. -func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { +func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) { var id int select { case id = <-c.nextID: @@ -250,6 +261,12 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i return nil, ErrClosed } + var flags uint32 + + if fromTemporary { + flags |= FlagFromTemporary + } + c.awaitingMut.Lock() if ch := c.awaiting[id]; ch != nil { panic("id taken") @@ -265,7 +282,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i Size: int32(size), Hash: hash, Flags: flags, - Options: options, + Options: nil, }, nil) if !ok { return nil, ErrClosed @@ -292,6 +309,16 @@ func (c *rawConnection) Closed() bool { } } +// DownloadProgress sends the progress updates for the files that are currently being downloaded. +func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option) { + c.send(-1, messageTypeDownloadProgress, DownloadProgressMessage{ + Folder: folder, + Updates: updates, + Flags: flags, + Options: options, + }, nil) +} + func (c *rawConnection) ping() bool { var id int select { @@ -359,6 +386,12 @@ func (c *rawConnection) readerLoop() (err error) { } c.handleResponse(hdr.msgID, msg) + case DownloadProgressMessage: + if state != stateReady { + return fmt.Errorf("protocol error: response message in state %d", state) + } + c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates, msg.Flags, msg.Options) + case pingMessage: if state != stateReady { return fmt.Errorf("protocol error: ping message in state %d", state) @@ -469,6 +502,14 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) { err = cm.UnmarshalXDR(msgBuf) msg = cm + case messageTypeDownloadProgress: + var dp DownloadProgressMessage + err := dp.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } + msg = dp + default: err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) } diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index f17c76da6..82cdd0757 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -183,7 +183,7 @@ func TestClose(t *testing.T) { c0.Index("default", nil, 0, nil) c0.Index("default", nil, 0, nil) - if _, err := c0.Request("default", "foo", 0, 0, nil, 0, nil); err == nil { + if _, err := c0.Request("default", "foo", 0, 0, nil, false); err == nil { t.Error("Request should return an error") } } diff --git a/lib/protocol/wireformat.go b/lib/protocol/wireformat.go index a0d3cd580..6847bb373 100644 --- a/lib/protocol/wireformat.go +++ b/lib/protocol/wireformat.go @@ -34,7 +34,7 @@ func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo, flags ui return c.Connection.IndexUpdate(folder, myFs, flags, options) } -func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { +func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) { name = norm.NFC.String(filepath.ToSlash(name)) - return c.Connection.Request(folder, name, offset, size, hash, flags, options) + return c.Connection.Request(folder, name, offset, size, hash, fromTemporary) }