lib/model, lib/protocol: Implement temporary indexes (fixes #950)

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/2252
This commit is contained in:
AudriusButkevicius 2016-04-15 10:59:41 +00:00 committed by Jakob Borg
parent a4cd4cc253
commit 1a5f524ae4
28 changed files with 1612 additions and 234 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/syncthing/syncthing/lib/discover" "github.com/syncthing/syncthing/lib/discover"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/logger" "github.com/syncthing/syncthing/lib/logger"
"github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay" "github.com/syncthing/syncthing/lib/relay"
@ -85,7 +86,7 @@ type modelIntf interface {
CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool) CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool)
CurrentGlobalFile(folder string, file string) (protocol.FileInfo, bool) CurrentGlobalFile(folder string, file string) (protocol.FileInfo, bool)
ResetFolder(folder string) ResetFolder(folder string)
Availability(folder, file string) []protocol.DeviceID Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []model.Availability
GetIgnores(folder string) ([]string, []string, error) GetIgnores(folder string) ([]string, []string, error)
SetIgnores(folder string, content []string) error SetIgnores(folder string, content []string) error
PauseDevice(device protocol.DeviceID) PauseDevice(device protocol.DeviceID)
@ -696,7 +697,7 @@ func (s *apiService) getDBFile(w http.ResponseWriter, r *http.Request) {
return return
} }
av := s.model.Availability(folder, file) av := s.model.Availability(folder, file, protocol.Vector{}, protocol.BlockInfo{})
sendJSON(w, map[string]interface{}{ sendJSON(w, map[string]interface{}{
"global": jsonFileInfo(gf), "global": jsonFileInfo(gf),
"local": jsonFileInfo(lf), "local": jsonFileInfo(lf),

View File

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/stats"
) )
@ -57,7 +58,7 @@ func (m *mockedModel) CurrentGlobalFile(folder string, file string) (protocol.Fi
func (m *mockedModel) ResetFolder(folder string) { func (m *mockedModel) ResetFolder(folder string) {
} }
func (m *mockedModel) Availability(folder, file string) []protocol.DeviceID { func (m *mockedModel) Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []model.Availability {
return nil return nil
} }

View File

@ -62,6 +62,7 @@ func TestDefaultValues(t *testing.T) {
ReleasesURL: "https://api.github.com/repos/syncthing/syncthing/releases?per_page=30", ReleasesURL: "https://api.github.com/repos/syncthing/syncthing/releases?per_page=30",
AlwaysLocalNets: []string{}, AlwaysLocalNets: []string{},
OverwriteNames: false, OverwriteNames: false,
TempIndexMinBlocks: 10,
} }
cfg := New(device1) cfg := New(device1)
@ -192,6 +193,7 @@ func TestOverriddenValues(t *testing.T) {
ReleasesURL: "https://localhost/releases", ReleasesURL: "https://localhost/releases",
AlwaysLocalNets: []string{}, AlwaysLocalNets: []string{},
OverwriteNames: true, OverwriteNames: true,
TempIndexMinBlocks: 100,
} }
cfg, err := Load("testdata/overridenvalues.xml", device1) cfg, err := Load("testdata/overridenvalues.xml", device1)

View File

@ -37,6 +37,7 @@ type FolderConfiguration struct {
PullerPauseS int `xml:"pullerPauseS" json:"pullerPauseS"` PullerPauseS int `xml:"pullerPauseS" json:"pullerPauseS"`
MaxConflicts int `xml:"maxConflicts" json:"maxConflicts"` MaxConflicts int `xml:"maxConflicts" json:"maxConflicts"`
DisableSparseFiles bool `xml:"disableSparseFiles" json:"disableSparseFiles"` DisableSparseFiles bool `xml:"disableSparseFiles" json:"disableSparseFiles"`
DisableTempIndexes bool `xml:"disableTempIndexes" json:"disableTempIndexes"`
Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved
cachedPath string cachedPath string

View File

@ -40,6 +40,7 @@ type OptionsConfiguration struct {
ReleasesURL string `xml:"releasesURL" json:"releasesURL" default:"https://api.github.com/repos/syncthing/syncthing/releases?per_page=30"` ReleasesURL string `xml:"releasesURL" json:"releasesURL" default:"https://api.github.com/repos/syncthing/syncthing/releases?per_page=30"`
AlwaysLocalNets []string `xml:"alwaysLocalNet" json:"alwaysLocalNets"` AlwaysLocalNets []string `xml:"alwaysLocalNet" json:"alwaysLocalNets"`
OverwriteNames bool `xml:"overwriteNames" json:"overwriteNames" default:"false"` OverwriteNames bool `xml:"overwriteNames" json:"overwriteNames" default:"false"`
TempIndexMinBlocks int `xml:"tempIndexMinBlocks" json:"tempIndexMinBlocks" default:"10"`
DeprecatedUPnPEnabled bool `xml:"upnpEnabled"` DeprecatedUPnPEnabled bool `xml:"upnpEnabled"`
DeprecatedUPnPLeaseM int `xml:"upnpLeaseMinutes"` DeprecatedUPnPLeaseM int `xml:"upnpLeaseMinutes"`

View File

@ -35,5 +35,6 @@
<urPostInsecurely>true</urPostInsecurely> <urPostInsecurely>true</urPostInsecurely>
<releasesURL>https://localhost/releases</releasesURL> <releasesURL>https://localhost/releases</releasesURL>
<overwriteNames>true</overwriteNames> <overwriteNames>true</overwriteNames>
<tempIndexMinBlocks>100</tempIndexMinBlocks>
</options> </options>
</configuration> </configuration>

View File

@ -26,28 +26,30 @@ func newDeviceActivity() *deviceActivity {
} }
} }
func (m *deviceActivity) leastBusy(availability []protocol.DeviceID) protocol.DeviceID { func (m *deviceActivity) leastBusy(availability []Availability) (Availability, bool) {
m.mut.Lock() m.mut.Lock()
low := 2<<30 - 1 low := 2<<30 - 1
var selected protocol.DeviceID found := false
for _, device := range availability { var selected Availability
if usage := m.act[device]; usage < low { for _, info := range availability {
if usage := m.act[info.ID]; usage < low {
low = usage low = usage
selected = device selected = info
found = true
} }
} }
m.mut.Unlock() m.mut.Unlock()
return selected return selected, found
} }
func (m *deviceActivity) using(device protocol.DeviceID) { func (m *deviceActivity) using(availability Availability) {
m.mut.Lock() m.mut.Lock()
m.act[device]++ m.act[availability.ID]++
m.mut.Unlock() m.mut.Unlock()
} }
func (m *deviceActivity) done(device protocol.DeviceID) { func (m *deviceActivity) done(availability Availability) {
m.mut.Lock() m.mut.Lock()
m.act[device]-- m.act[availability.ID]--
m.mut.Unlock() m.mut.Unlock()
} }

View File

@ -13,46 +13,48 @@ import (
) )
func TestDeviceActivity(t *testing.T) { func TestDeviceActivity(t *testing.T) {
n0 := protocol.DeviceID([32]byte{1, 2, 3, 4}) n0 := Availability{protocol.DeviceID([32]byte{1, 2, 3, 4}), false}
n1 := protocol.DeviceID([32]byte{5, 6, 7, 8}) n1 := Availability{protocol.DeviceID([32]byte{5, 6, 7, 8}), true}
n2 := protocol.DeviceID([32]byte{9, 10, 11, 12}) n2 := Availability{protocol.DeviceID([32]byte{9, 10, 11, 12}), false}
devices := []protocol.DeviceID{n0, n1, n2} devices := []Availability{n0, n1, n2}
na := newDeviceActivity() na := newDeviceActivity()
if lb := na.leastBusy(devices); lb != n0 { if lb, ok := na.leastBusy(devices); !ok || lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb) t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
} }
if lb := na.leastBusy(devices); lb != n0 { if lb, ok := na.leastBusy(devices); !ok || lb != n0 {
t.Errorf("Least busy device should still be n0 (%v) not %v", n0, lb) t.Errorf("Least busy device should still be n0 (%v) not %v", n0, lb)
} }
na.using(na.leastBusy(devices)) lb, _ := na.leastBusy(devices)
if lb := na.leastBusy(devices); lb != n1 { na.using(lb)
if lb, ok := na.leastBusy(devices); !ok || lb != n1 {
t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb) t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb)
} }
lb, _ = na.leastBusy(devices)
na.using(na.leastBusy(devices)) na.using(lb)
if lb := na.leastBusy(devices); lb != n2 { if lb, ok := na.leastBusy(devices); !ok || lb != n2 {
t.Errorf("Least busy device should be n2 (%v) not %v", n2, lb) t.Errorf("Least busy device should be n2 (%v) not %v", n2, lb)
} }
na.using(na.leastBusy(devices)) lb, _ = na.leastBusy(devices)
if lb := na.leastBusy(devices); lb != n0 { na.using(lb)
if lb, ok := na.leastBusy(devices); !ok || lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb) t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
} }
na.done(n1) na.done(n1)
if lb := na.leastBusy(devices); lb != n1 { if lb, ok := na.leastBusy(devices); !ok || lb != n1 {
t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb) t.Errorf("Least busy device should be n1 (%v) not %v", n1, lb)
} }
na.done(n2) na.done(n2)
if lb := na.leastBusy(devices); lb != n1 { if lb, ok := na.leastBusy(devices); !ok || lb != n1 {
t.Errorf("Least busy device should still be n1 (%v) not %v", n1, lb) t.Errorf("Least busy device should still be n1 (%v) not %v", n1, lb)
} }
na.done(n0) na.done(n0)
if lb := na.leastBusy(devices); lb != n0 { if lb, ok := na.leastBusy(devices); !ok || lb != n0 {
t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb) t.Errorf("Least busy device should be n0 (%v) not %v", n0, lb)
} }
} }

View File

@ -0,0 +1,156 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
// deviceFolderFileDownloadState holds current download state of a file that
// a remote device has advertised. blockIndexes represends indexes within
// FileInfo.Blocks that the remote device already has, and version represents
// the version of the file that the remote device is downloading.
type deviceFolderFileDownloadState struct {
blockIndexes []int32
version protocol.Vector
}
// deviceFolderDownloadState holds current download state of all files that
// a remote device is currently downloading in a specific folder.
type deviceFolderDownloadState struct {
mut sync.RWMutex
files map[string]deviceFolderFileDownloadState
numberOfBlocksInProgress int
}
// Has returns wether a block at that specific index, and that specific version of the file
// is currently available on the remote device for pulling from a temporary file.
func (p *deviceFolderDownloadState) Has(file string, version protocol.Vector, index int32) bool {
p.mut.RLock()
defer p.mut.RUnlock()
local, ok := p.files[file]
if !ok || !local.version.Equal(version) {
return false
}
for _, existingIndex := range local.blockIndexes {
if existingIndex == index {
return true
}
}
return false
}
// Update updates internal state of what has been downloaded into the temporary
// files by the remote device for this specific folder.
func (p *deviceFolderDownloadState) Update(updates []protocol.FileDownloadProgressUpdate) {
p.mut.Lock()
defer p.mut.Unlock()
for _, update := range updates {
local, ok := p.files[update.Name]
if update.UpdateType == protocol.UpdateTypeForget && ok && local.version.Equal(update.Version) {
p.numberOfBlocksInProgress -= len(local.blockIndexes)
delete(p.files, update.Name)
} else if update.UpdateType == protocol.UpdateTypeAppend {
if !ok {
local = deviceFolderFileDownloadState{
blockIndexes: update.BlockIndexes,
version: update.Version,
}
} else if !local.version.Equal(update.Version) {
p.numberOfBlocksInProgress -= len(local.blockIndexes)
local.blockIndexes = append(local.blockIndexes[:0], update.BlockIndexes...)
local.version = update.Version
} else {
local.blockIndexes = append(local.blockIndexes, update.BlockIndexes...)
}
p.files[update.Name] = local
p.numberOfBlocksInProgress += len(update.BlockIndexes)
}
}
}
// NumberOfBlocksInProgress returns the number of blocks the device has downloaded
// for a specific folder.
func (p *deviceFolderDownloadState) NumberOfBlocksInProgress() int {
p.mut.RLock()
n := p.numberOfBlocksInProgress
p.mut.RUnlock()
return n
}
// deviceDownloadState represents the state of all in progress downloads
// for all folders of a specific device.
type deviceDownloadState struct {
mut sync.RWMutex
folders map[string]*deviceFolderDownloadState
numberOfBlocksInProgress int
}
// Update updates internal state of what has been downloaded into the temporary
// files by the remote device for this specific folder.
func (t *deviceDownloadState) Update(folder string, updates []protocol.FileDownloadProgressUpdate) {
t.mut.RLock()
f, ok := t.folders[folder]
t.mut.RUnlock()
if !ok {
f = &deviceFolderDownloadState{
mut: sync.NewRWMutex(),
files: make(map[string]deviceFolderFileDownloadState),
}
t.mut.Lock()
t.folders[folder] = f
t.mut.Unlock()
}
f.Update(updates)
}
// Has returns wether block at that specific index, and that specific version of the file
// is currently available on the remote device for pulling from a temporary file.
func (t *deviceDownloadState) Has(folder, file string, version protocol.Vector, index int32) bool {
if t == nil {
return false
}
t.mut.RLock()
f, ok := t.folders[folder]
t.mut.RUnlock()
if !ok {
return false
}
return f.Has(file, version, index)
}
// NumberOfBlocksInProgress returns the number of blocks the device has downloaded
// for all folders.
func (t *deviceDownloadState) NumberOfBlocksInProgress() int {
if t == nil {
return 0
}
n := 0
t.mut.RLock()
for _, folder := range t.folders {
n += folder.NumberOfBlocksInProgress()
}
t.mut.RUnlock()
return n
}
func newDeviceDownloadState() *deviceDownloadState {
return &deviceDownloadState{
mut: sync.NewRWMutex(),
folders: make(map[string]*deviceFolderDownloadState),
}
}

View File

@ -0,0 +1,111 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"testing"
"github.com/syncthing/syncthing/lib/protocol"
)
func TestDeviceDownloadState(t *testing.T) {
v1 := (protocol.Vector{}).Update(0)
v2 := (protocol.Vector{}).Update(1)
// file 1 version 1 part 1
f1v1p1 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v1, []int32{0, 1, 2}}
f1v1p2 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v1, []int32{3, 4, 5}}
f1v1del := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeForget, "f1", v1, nil}
f1v2p1 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v2, []int32{10, 11, 12}}
f1v2p2 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f1", v2, []int32{13, 14, 15}}
f1v2del := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeForget, "f1", v2, nil}
f2v1p1 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f2", v1, []int32{20, 21, 22}}
f2v1p2 := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeAppend, "f2", v1, []int32{23, 24, 25}}
f2v1del := protocol.FileDownloadProgressUpdate{protocol.UpdateTypeForget, "f2", v1, nil}
tests := []struct {
updates []protocol.FileDownloadProgressUpdate
shouldHaveIndexesFrom []protocol.FileDownloadProgressUpdate
shouldNotHaveIndexesFrom []protocol.FileDownloadProgressUpdate
}{
{ //1
[]protocol.FileDownloadProgressUpdate{f1v1p1},
[]protocol.FileDownloadProgressUpdate{f1v1p1},
[]protocol.FileDownloadProgressUpdate{f1v1p2, f1v2p1, f1v2p2},
},
{ //2
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2},
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2},
[]protocol.FileDownloadProgressUpdate{f1v2p1, f1v2p2},
},
{ //3
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v1del},
nil,
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1, f1v2p2}},
{ //4
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2del},
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2},
[]protocol.FileDownloadProgressUpdate{f1v2p1, f1v2p2},
},
{ //5
// v2 replaces old v1 data
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1},
[]protocol.FileDownloadProgressUpdate{f1v2p1},
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p2},
},
{ //6
// v1 delete on v2 data does nothing
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1, f1v1del},
[]protocol.FileDownloadProgressUpdate{f1v2p1},
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p2},
},
{ //7
// v2 replacees v1, v2 gets deleted, and v2 part 2 gets added.
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1, f1v2del, f1v2p2},
[]protocol.FileDownloadProgressUpdate{f1v2p2},
[]protocol.FileDownloadProgressUpdate{f1v1p1, f1v1p2, f1v2p1},
},
// Multiple files in one go
{ //8
[]protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1},
[]protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1},
[]protocol.FileDownloadProgressUpdate{f1v1p2, f2v1p2},
},
{ //9
[]protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1, f2v1del},
[]protocol.FileDownloadProgressUpdate{f1v1p1},
[]protocol.FileDownloadProgressUpdate{f2v1p1, f2v1p1},
},
{ //10
[]protocol.FileDownloadProgressUpdate{f1v1p1, f2v1del, f2v1p1},
[]protocol.FileDownloadProgressUpdate{f1v1p1, f2v1p1},
[]protocol.FileDownloadProgressUpdate{f1v1p2, f2v1p2},
},
}
for i, test := range tests {
s := newDeviceDownloadState()
s.Update("folder", test.updates)
for _, expected := range test.shouldHaveIndexesFrom {
for _, n := range expected.BlockIndexes {
if !s.Has("folder", expected.Name, expected.Version, n) {
t.Error("Test", i+1, "error:", expected.Name, expected.Version, "missing", n)
}
}
}
for _, unexpected := range test.shouldNotHaveIndexesFrom {
for _, n := range unexpected.BlockIndexes {
if s.Has("folder", unexpected.Name, unexpected.Version, n) {
t.Error("Test", i+1, "error:", unexpected.Name, unexpected.Version, "has extra", n)
}
}
}
}
}

View File

@ -60,6 +60,11 @@ type service interface {
getState() (folderState, time.Time, error) getState() (folderState, time.Time, error)
} }
type Availability struct {
ID protocol.DeviceID `json:"id"`
FromTemporary bool `json:"fromTemporary"`
}
type Model struct { type Model struct {
*suture.Supervisor *suture.Supervisor
@ -87,10 +92,12 @@ type Model struct {
folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef
fmut sync.RWMutex // protects the above fmut sync.RWMutex // protects the above
conn map[protocol.DeviceID]Connection conn map[protocol.DeviceID]Connection
helloMessages map[protocol.DeviceID]protocol.HelloMessage helloMessages map[protocol.DeviceID]protocol.HelloMessage
devicePaused map[protocol.DeviceID]bool deviceClusterConf map[protocol.DeviceID]protocol.ClusterConfigMessage
pmut sync.RWMutex // protects the above devicePaused map[protocol.DeviceID]bool
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
pmut sync.RWMutex // protects the above
} }
var ( var (
@ -129,10 +136,11 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName,
folderStatRefs: make(map[string]*stats.FolderStatisticsReference), folderStatRefs: make(map[string]*stats.FolderStatisticsReference),
conn: make(map[protocol.DeviceID]Connection), conn: make(map[protocol.DeviceID]Connection),
helloMessages: make(map[protocol.DeviceID]protocol.HelloMessage), helloMessages: make(map[protocol.DeviceID]protocol.HelloMessage),
deviceClusterConf: make(map[protocol.DeviceID]protocol.ClusterConfigMessage),
devicePaused: make(map[protocol.DeviceID]bool), devicePaused: make(map[protocol.DeviceID]bool),
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
fmut: sync.NewRWMutex(), fmut: sync.NewRWMutex(),
pmut: sync.NewRWMutex(), pmut: sync.NewRWMutex(),
} }
if cfg.Options().ProgressUpdateIntervalS > -1 { if cfg.Options().ProgressUpdateIntervalS > -1 {
go m.progressEmitter.Serve() go m.progressEmitter.Serve()
@ -388,6 +396,11 @@ func (m *Model) Completion(device protocol.DeviceID, folder string) float64 {
return true return true
}) })
// This might might be more than it really is, because some blocks can be of a smaller size.
m.pmut.RLock()
need -= int64(m.deviceDownloads[device].NumberOfBlocksInProgress() * protocol.BlockSize)
m.pmut.RUnlock()
needRatio := float64(need) / float64(tot) needRatio := float64(need) / float64(tot)
completionPct := 100 * (1 - needRatio) completionPct := 100 * (1 - needRatio)
l.Debugf("%v Completion(%s, %q): %f (%d / %d = %f)", m, device, folder, completionPct, need, tot, needRatio) l.Debugf("%v Completion(%s, %q): %f (%d / %d = %f)", m, device, folder, completionPct, need, tot, needRatio)
@ -609,6 +622,10 @@ func (m *Model) folderSharedWithUnlocked(folder string, deviceID protocol.Device
func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfigMessage) { func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfigMessage) {
// Check the peer device's announced folders against our own. Emits events // Check the peer device's announced folders against our own. Emits events
// for folders that we don't expect (unknown or not shared). // for folders that we don't expect (unknown or not shared).
// Also, collect a list of folders we do share, and if he's interested in
// temporary indexes, subscribe the connection.
tempIndexFolders := make([]string, 0, len(cm.Folders))
m.fmut.Lock() m.fmut.Lock()
nextFolder: nextFolder:
@ -635,9 +652,24 @@ nextFolder:
l.Infof("Unexpected folder ID %q sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder.ID, deviceID) l.Infof("Unexpected folder ID %q sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder.ID, deviceID)
continue continue
} }
if folder.Flags&protocol.FlagFolderDisabledTempIndexes == 0 {
tempIndexFolders = append(tempIndexFolders, folder.ID)
}
} }
m.fmut.Unlock() m.fmut.Unlock()
// This breaks if we send multiple CM messages during the same connection.
if len(tempIndexFolders) > 0 {
m.pmut.RLock()
conn, ok := m.conn[deviceID]
m.pmut.RUnlock()
// In case we've got ClusterConfig, and the connection disappeared
// from infront of our nose.
if ok {
m.progressEmitter.temporaryIndexSubscribe(conn, tempIndexFolders)
}
}
var changed bool var changed bool
if m.cfg.Devices()[deviceID].Introducer { if m.cfg.Devices()[deviceID].Introducer {
@ -645,9 +677,6 @@ nextFolder:
// and devices and add what we are missing. // and devices and add what we are missing.
for _, folder := range cm.Folders { for _, folder := range cm.Folders {
// If we don't have this folder yet, skip it. Ideally, we'd
// offer up something in the GUI to create the folder, but for the
// moment we only handle folders that we already have.
if _, ok := m.folderDevices[folder.ID]; !ok { if _, ok := m.folderDevices[folder.ID]; !ok {
continue continue
} }
@ -736,10 +765,13 @@ func (m *Model) Close(device protocol.DeviceID, err error) {
conn, ok := m.conn[device] conn, ok := m.conn[device]
if ok { if ok {
m.progressEmitter.temporaryIndexUnsubscribe(conn)
closeRawConn(conn) closeRawConn(conn)
} }
delete(m.conn, device) delete(m.conn, device)
delete(m.helloMessages, device) delete(m.helloMessages, device)
delete(m.deviceClusterConf, device)
delete(m.deviceDownloads, device)
m.pmut.Unlock() m.pmut.Unlock()
} }
@ -752,19 +784,20 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
if !m.folderSharedWith(folder, deviceID) { if !m.folderSharedWith(folder, deviceID) {
l.Warnf("Request from %s for file %s in unshared folder %q", deviceID, name, folder) l.Warnf("Request from %s for file %s in unshared folder %q", deviceID, name, folder)
return protocol.ErrInvalid return protocol.ErrNoSuchFile
} }
if flags != 0 { if flags != 0 && flags != protocol.FlagFromTemporary {
// We don't currently support or expect any flags. // We currently support only no flags, or FromTemporary flag.
return protocol.ErrInvalid return fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags)
} }
if deviceID != protocol.LocalDeviceID { if deviceID != protocol.LocalDeviceID {
l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, deviceID, folder, name, offset, len(buf)) l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d f=%d", m, deviceID, folder, name, offset, len(buf), flags)
} }
m.fmut.RLock() m.fmut.RLock()
folderPath := m.folderCfgs[folder].Path() folderCfg := m.folderCfgs[folder]
folderPath := folderCfg.Path()
folderIgnores := m.folderIgnores[folder] folderIgnores := m.folderIgnores[folder]
m.fmut.RUnlock() m.fmut.RUnlock()
@ -802,8 +835,6 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
} }
} }
var reader io.ReaderAt
var err error
if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 { if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 {
target, _, err := symlinks.Read(fn) target, _, err := symlinks.Read(fn)
if err != nil { if err != nil {
@ -813,28 +844,30 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
} }
return protocol.ErrGeneric return protocol.ErrGeneric
} }
reader = strings.NewReader(target) if _, err := strings.NewReader(target).ReadAt(buf, offset); err != nil {
} else { l.Debugln("symlink.Reader.ReadAt", err)
// Cannot easily cache fd's because we might need to delete the file
// at any moment.
reader, err = os.Open(fn)
if err != nil {
l.Debugln("os.Open:", err)
if os.IsNotExist(err) {
return protocol.ErrNoSuchFile
}
return protocol.ErrGeneric return protocol.ErrGeneric
} }
return nil
defer reader.(*os.File).Close()
} }
_, err = reader.ReadAt(buf, offset) // Only check temp files if the flag is set, and if we are set to advertise
if err != nil { // the temp indexes.
l.Debugln("reader.ReadAt:", err) if flags&protocol.FlagFromTemporary != 0 && !folderCfg.DisableTempIndexes {
tempFn := filepath.Join(folderPath, defTempNamer.TempName(name))
if err := readOffsetIntoBuf(tempFn, offset, buf); err == nil {
return nil
}
// Fall through to reading from a non-temp file, just incase the temp
// file has finished downloading.
}
err := readOffsetIntoBuf(fn, offset, buf)
if os.IsNotExist(err) {
return protocol.ErrNoSuchFile
} else if err != nil {
return protocol.ErrGeneric return protocol.ErrGeneric
} }
return nil return nil
} }
@ -986,6 +1019,7 @@ func (m *Model) AddConnection(conn Connection, hello protocol.HelloMessage) {
panic("add existing device") panic("add existing device")
} }
m.conn[deviceID] = conn m.conn[deviceID] = conn
m.deviceDownloads[deviceID] = newDeviceDownloadState()
m.helloMessages[deviceID] = hello m.helloMessages[deviceID] = hello
@ -1040,6 +1074,24 @@ func (m *Model) PauseDevice(device protocol.DeviceID) {
events.Default.Log(events.DevicePaused, map[string]string{"device": device.String()}) events.Default.Log(events.DevicePaused, map[string]string{"device": device.String()})
} }
func (m *Model) DownloadProgress(device protocol.DeviceID, folder string, updates []protocol.FileDownloadProgressUpdate, flags uint32, options []protocol.Option) {
if !m.folderSharedWith(folder, device) {
return
}
m.fmut.RLock()
cfg, ok := m.folderCfgs[folder]
m.fmut.RUnlock()
if !ok || cfg.ReadOnly || cfg.DisableTempIndexes {
return
}
m.pmut.RLock()
m.deviceDownloads[device].Update(folder, updates)
m.pmut.RUnlock()
}
func (m *Model) ResumeDevice(device protocol.DeviceID) { func (m *Model) ResumeDevice(device protocol.DeviceID) {
m.pmut.Lock() m.pmut.Lock()
m.devicePaused[device] = false m.devicePaused[device] = false
@ -1211,7 +1263,7 @@ func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) {
}) })
} }
func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []protocol.Option) ([]byte, error) { func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
m.pmut.RLock() m.pmut.RLock()
nc, ok := m.conn[deviceID] nc, ok := m.conn[deviceID]
m.pmut.RUnlock() m.pmut.RUnlock()
@ -1220,9 +1272,9 @@ func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, o
return nil, fmt.Errorf("requestGlobal: no such device: %s", deviceID) return nil, fmt.Errorf("requestGlobal: no such device: %s", deviceID)
} }
l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x f=%x op=%s", m, deviceID, folder, name, offset, size, hash, flags, options) l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x ft=%t op=%s", m, deviceID, folder, name, offset, size, hash, fromTemporary)
return nc.Request(folder, name, offset, size, hash, flags, options) return nc.Request(folder, name, offset, size, hash, fromTemporary)
} }
func (m *Model) AddFolder(cfg config.FolderConfiguration) { func (m *Model) AddFolder(cfg config.FolderConfiguration) {
@ -1553,6 +1605,9 @@ func (m *Model) generateClusterConfig(device protocol.DeviceID) protocol.Cluster
if folderCfg.IgnoreDelete { if folderCfg.IgnoreDelete {
flags |= protocol.FlagFolderIgnoreDelete flags |= protocol.FlagFolderIgnoreDelete
} }
if folderCfg.DisableTempIndexes {
flags |= protocol.FlagFolderDisabledTempIndexes
}
protocolFolder.Flags = flags protocolFolder.Flags = flags
for _, device := range m.folderDevices[folder] { for _, device := range m.folderDevices[folder] {
// DeviceID is a value type, but with an underlying array. Copy it // DeviceID is a value type, but with an underlying array. Copy it
@ -1736,7 +1791,7 @@ func (m *Model) GlobalDirectoryTree(folder, prefix string, levels int, dirsonly
return output return output
} }
func (m *Model) Availability(folder, file string) []protocol.DeviceID { func (m *Model) Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []Availability {
// Acquire this lock first, as the value returned from foldersFiles can // Acquire this lock first, as the value returned from foldersFiles can
// get heavily modified on Close() // get heavily modified on Close()
m.pmut.RLock() m.pmut.RLock()
@ -1744,19 +1799,27 @@ func (m *Model) Availability(folder, file string) []protocol.DeviceID {
m.fmut.RLock() m.fmut.RLock()
fs, ok := m.folderFiles[folder] fs, ok := m.folderFiles[folder]
devices := m.folderDevices[folder]
m.fmut.RUnlock() m.fmut.RUnlock()
if !ok { if !ok {
return nil return nil
} }
availableDevices := []protocol.DeviceID{} var availabilities []Availability
for _, device := range fs.Availability(file) { for _, device := range fs.Availability(file) {
_, ok := m.conn[device] _, ok := m.conn[device]
if ok { if ok {
availableDevices = append(availableDevices, device) availabilities = append(availabilities, Availability{ID: device, FromTemporary: false})
} }
} }
return availableDevices
for _, device := range devices {
if m.deviceDownloads[device].Has(folder, file, version, int32(block.Offset/protocol.BlockSize)) {
availabilities = append(availabilities, Availability{ID: device, FromTemporary: true})
}
}
return availabilities
} }
// BringToFront bumps the given files priority in the job queue. // BringToFront bumps the given files priority in the job queue.
@ -2086,6 +2149,21 @@ func stringSliceWithout(ss []string, s string) []string {
return ss return ss
} }
func readOffsetIntoBuf(file string, offset int64, buf []byte) error {
fd, err := os.Open(file)
if err != nil {
l.Debugln("readOffsetIntoBuf.Open", file, err)
return err
}
defer fd.Close()
_, err = fd.ReadAt(buf, offset)
if err != nil {
l.Debugln("readOffsetIntoBuf.ReadAt", file, err)
}
return err
}
// The exists function is expected to return true for all known paths // The exists function is expected to return true for all known paths
// (excluding "" and ".") // (excluding "" and ".")
func unifySubs(dirs []string, exists func(dir string) bool) []string { func unifySubs(dirs []string, exists func(dir string) bool) []string {

View File

@ -203,9 +203,17 @@ func benchmarkIndexUpdate(b *testing.B, nfiles, nufiles int) {
b.ReportAllocs() b.ReportAllocs()
} }
type downloadProgressMessage struct {
folder string
updates []protocol.FileDownloadProgressUpdate
flags uint32
options []protocol.Option
}
type FakeConnection struct { type FakeConnection struct {
id protocol.DeviceID id protocol.DeviceID
requestData []byte requestData []byte
downloadProgressMessages []downloadProgressMessage
} }
func (FakeConnection) Close() error { func (FakeConnection) Close() error {
@ -235,7 +243,7 @@ func (FakeConnection) IndexUpdate(string, []protocol.FileInfo, uint32, []protoco
return nil return nil
} }
func (f FakeConnection) Request(folder, name string, offset int64, size int, hash []byte, flags uint32, options []protocol.Option) ([]byte, error) { func (f FakeConnection) Request(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
return f.requestData, nil return f.requestData, nil
} }
@ -253,6 +261,15 @@ func (FakeConnection) Statistics() protocol.Statistics {
return protocol.Statistics{} return protocol.Statistics{}
} }
func (f *FakeConnection) DownloadProgress(folder string, updates []protocol.FileDownloadProgressUpdate, flags uint32, options []protocol.Option) {
f.downloadProgressMessages = append(f.downloadProgressMessages, downloadProgressMessage{
folder: folder,
updates: updates,
flags: flags,
options: options,
})
}
func BenchmarkRequest(b *testing.B) { func BenchmarkRequest(b *testing.B) {
db := db.OpenMemory() db := db.OpenMemory()
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db, nil) m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db, nil)
@ -271,7 +288,7 @@ func BenchmarkRequest(b *testing.B) {
} }
} }
fc := FakeConnection{ fc := &FakeConnection{
id: device1, id: device1,
requestData: []byte("some data to return"), requestData: []byte("some data to return"),
} }
@ -284,7 +301,7 @@ func BenchmarkRequest(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
data, err := m.requestGlobal(device1, "default", files[i%n].Name, 0, 32, nil, 0, nil) data, err := m.requestGlobal(device1, "default", files[i%n].Name, 0, 32, nil, false)
if err != nil { if err != nil {
b.Error(err) b.Error(err)
} }
@ -318,7 +335,7 @@ func TestDeviceRename(t *testing.T) {
conn := Connection{ conn := Connection{
&net.TCPConn{}, &net.TCPConn{},
FakeConnection{ &FakeConnection{
id: device1, id: device1,
requestData: []byte("some data to return"), requestData: []byte("some data to return"),
}, },

View File

@ -9,19 +9,22 @@ package model
import ( import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"reflect"
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
) )
type ProgressEmitter struct { type ProgressEmitter struct {
registry map[string]*sharedPullerState registry map[string]*sharedPullerState
interval time.Duration interval time.Duration
last map[string]map[string]*pullerProgress minBlocks int
mut sync.Mutex lastUpdate time.Time
sentDownloadStates map[protocol.DeviceID]*sentDownloadState // States representing what we've sent to the other peer via DownloadProgress messages.
connections map[string][]protocol.Connection
mut sync.Mutex
timer *time.Timer timer *time.Timer
@ -32,11 +35,12 @@ type ProgressEmitter struct {
// DownloadProgress events every interval. // DownloadProgress events every interval.
func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter { func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter {
t := &ProgressEmitter{ t := &ProgressEmitter{
stop: make(chan struct{}), stop: make(chan struct{}),
registry: make(map[string]*sharedPullerState), registry: make(map[string]*sharedPullerState),
last: make(map[string]map[string]*pullerProgress), timer: time.NewTimer(time.Millisecond),
timer: time.NewTimer(time.Millisecond), sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState),
mut: sync.NewMutex(), connections: make(map[string][]protocol.Connection),
mut: sync.NewMutex(),
} }
t.CommitConfiguration(config.Configuration{}, cfg.Raw()) t.CommitConfiguration(config.Configuration{}, cfg.Raw())
@ -48,6 +52,8 @@ func NewProgressEmitter(cfg *config.Wrapper) *ProgressEmitter {
// Serve starts the progress emitter which starts emitting DownloadProgress // Serve starts the progress emitter which starts emitting DownloadProgress
// events as the progress happens. // events as the progress happens.
func (t *ProgressEmitter) Serve() { func (t *ProgressEmitter) Serve() {
var lastUpdate time.Time
var lastCount, newCount int
for { for {
select { select {
case <-t.stop: case <-t.stop:
@ -56,21 +62,28 @@ func (t *ProgressEmitter) Serve() {
case <-t.timer.C: case <-t.timer.C:
t.mut.Lock() t.mut.Lock()
l.Debugln("progress emitter: timer - looking after", len(t.registry)) l.Debugln("progress emitter: timer - looking after", len(t.registry))
output := make(map[string]map[string]*pullerProgress)
newLastUpdated := lastUpdate
newCount = len(t.registry)
for _, puller := range t.registry { for _, puller := range t.registry {
if output[puller.folder] == nil { updated := puller.Updated()
output[puller.folder] = make(map[string]*pullerProgress) if updated.After(newLastUpdated) {
newLastUpdated = updated
} }
output[puller.folder][puller.file.Name] = puller.Progress()
} }
if !reflect.DeepEqual(t.last, output) {
events.Default.Log(events.DownloadProgress, output) if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount {
t.last = output lastUpdate = newLastUpdated
l.Debugf("progress emitter: emitting %#v", output) lastCount = newCount
t.sendDownloadProgressEvent()
if len(t.connections) > 0 {
t.sendDownloadProgressMessages()
}
} else { } else {
l.Debugln("progress emitter: nothing new") l.Debugln("progress emitter: nothing new")
} }
if len(t.registry) != 0 {
if newCount != 0 {
t.timer.Reset(t.interval) t.timer.Reset(t.interval)
} }
t.mut.Unlock() t.mut.Unlock()
@ -78,6 +91,95 @@ func (t *ProgressEmitter) Serve() {
} }
} }
func (t *ProgressEmitter) sendDownloadProgressEvent() {
// registry lock already held
output := make(map[string]map[string]*pullerProgress)
for _, puller := range t.registry {
if output[puller.folder] == nil {
output[puller.folder] = make(map[string]*pullerProgress)
}
output[puller.folder][puller.file.Name] = puller.Progress()
}
events.Default.Log(events.DownloadProgress, output)
l.Debugf("progress emitter: emitting %#v", output)
}
func (t *ProgressEmitter) sendDownloadProgressMessages() {
// registry lock already held
sharedFolders := make(map[protocol.DeviceID][]string)
deviceConns := make(map[protocol.DeviceID]protocol.Connection)
subscribers := t.connections
for folder, conns := range subscribers {
for _, conn := range conns {
id := conn.ID()
deviceConns[id] = conn
sharedFolders[id] = append(sharedFolders[id], folder)
state, ok := t.sentDownloadStates[id]
if !ok {
state = &sentDownloadState{
folderStates: make(map[string]*sentFolderDownloadState),
}
t.sentDownloadStates[id] = state
}
var activePullers []*sharedPullerState
for _, puller := range t.registry {
if puller.folder != folder || puller.file.IsSymlink() || puller.file.IsDirectory() || len(puller.file.Blocks) <= t.minBlocks {
continue
}
activePullers = append(activePullers, puller)
}
// For every new puller that hasn't yet been seen, it will send all the blocks the puller has available
// For every existing puller, it will check for new blocks, and send update for the new blocks only
// For every puller that we've seen before but is no longer there, we will send a forget message
updates := state.update(folder, activePullers)
if len(updates) > 0 {
conn.DownloadProgress(folder, updates, 0, nil)
}
}
}
// Clean up sentDownloadStates for devices which we are no longer connected to.
for id := range t.sentDownloadStates {
_, ok := deviceConns[id]
if !ok {
// Null out outstanding entries for device
delete(t.sentDownloadStates, id)
}
}
// If a folder was unshared from some device, tell it that all temp files
// are now gone.
for id, sharedDeviceFolders := range sharedFolders {
state := t.sentDownloadStates[id]
nextFolder:
// For each of the folders that the state is aware of,
// try to match it with a shared folder we've discovered above,
for _, folder := range state.folders() {
for _, existingFolder := range sharedDeviceFolders {
if existingFolder == folder {
continue nextFolder
}
}
// If we fail to find that folder, we tell the state to forget about it
// and return us a list of updates which would clean up the state
// on the remote end.
updates := state.cleanup(folder)
if len(updates) > 0 {
// XXX: Don't send this now, as the only way we've unshared a folder
// is by breaking the connection and reconnecting, hence sending
// forget messages for some random folder currently makes no sense.
// deviceConns[id].DownloadProgress(folder, updates, 0, nil)
}
}
}
}
// VerifyConfiguration implements the config.Committer interface // VerifyConfiguration implements the config.Committer interface
func (t *ProgressEmitter) VerifyConfiguration(from, to config.Configuration) error { func (t *ProgressEmitter) VerifyConfiguration(from, to config.Configuration) error {
return nil return nil
@ -89,6 +191,7 @@ func (t *ProgressEmitter) CommitConfiguration(from, to config.Configuration) boo
defer t.mut.Unlock() defer t.mut.Unlock()
t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second t.interval = time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second
t.minBlocks = to.Options.TempIndexMinBlocks
l.Debugln("progress emitter: updated interval", t.interval) l.Debugln("progress emitter: updated interval", t.interval)
return true return true
@ -115,7 +218,9 @@ func (t *ProgressEmitter) Register(s *sharedPullerState) {
func (t *ProgressEmitter) Deregister(s *sharedPullerState) { func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
t.mut.Lock() t.mut.Lock()
defer t.mut.Unlock() defer t.mut.Unlock()
l.Debugln("progress emitter: deregistering", s.folder, s.file.Name) l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
delete(t.registry, filepath.Join(s.folder, s.file.Name)) delete(t.registry, filepath.Join(s.folder, s.file.Name))
} }
@ -142,3 +247,38 @@ func (t *ProgressEmitter) lenRegistry() int {
defer t.mut.Unlock() defer t.mut.Unlock()
return len(t.registry) return len(t.registry)
} }
func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) {
t.mut.Lock()
for _, folder := range folders {
t.connections[folder] = append(t.connections[folder], conn)
}
t.mut.Unlock()
}
func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) {
t.mut.Lock()
left := make(map[string][]protocol.Connection, len(t.connections))
for folder, conns := range t.connections {
connsLeft := connsWithout(conns, conn)
if len(connsLeft) > 0 {
left[folder] = connsLeft
}
}
t.connections = left
t.mut.Unlock()
}
func connsWithout(conns []protocol.Connection, conn protocol.Connection) []protocol.Connection {
if len(conns) == 0 {
return nil
}
newConns := make([]protocol.Connection, 0, len(conns)-1)
for _, existingConn := range conns {
if existingConn != conn {
newConns = append(newConns, existingConn)
}
}
return newConns
}

View File

@ -7,34 +7,46 @@
package model package model
import ( import (
"fmt"
"path/filepath"
"runtime"
"testing" "testing"
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
) )
var timeout = 10 * time.Millisecond var timeout = 100 * time.Millisecond
func caller(skip int) string {
_, file, line, ok := runtime.Caller(skip + 1)
if !ok {
return "unknown"
}
return fmt.Sprintf("%s:%d", filepath.Base(file), line)
}
func expectEvent(w *events.Subscription, t *testing.T, size int) { func expectEvent(w *events.Subscription, t *testing.T, size int) {
event, err := w.Poll(timeout) event, err := w.Poll(timeout)
if err != nil { if err != nil {
t.Fatal("Unexpected error:", err) t.Fatal("Unexpected error:", err, "at", caller(1))
} }
if event.Type != events.DownloadProgress { if event.Type != events.DownloadProgress {
t.Fatal("Unexpected event:", event) t.Fatal("Unexpected event:", event, "at", caller(1))
} }
data := event.Data.(map[string]map[string]*pullerProgress) data := event.Data.(map[string]map[string]*pullerProgress)
if len(data) != size { if len(data) != size {
t.Fatal("Unexpected event data size:", data) t.Fatal("Unexpected event data size:", data, "at", caller(1))
} }
} }
func expectTimeout(w *events.Subscription, t *testing.T) { func expectTimeout(w *events.Subscription, t *testing.T) {
_, err := w.Poll(timeout) _, err := w.Poll(timeout)
if err != events.ErrTimeout { if err != events.ErrTimeout {
t.Fatal("Unexpected non-Timeout error:", err) t.Fatal("Unexpected non-Timeout error:", err, "at", caller(1))
} }
} }
@ -52,14 +64,15 @@ func TestProgressEmitter(t *testing.T) {
expectTimeout(w, t) expectTimeout(w, t)
s := sharedPullerState{ s := sharedPullerState{
mut: sync.NewMutex(), updated: time.Now(),
mut: sync.NewRWMutex(),
} }
p.Register(&s) p.Register(&s)
expectEvent(w, t, 1) expectEvent(w, t, 1)
expectTimeout(w, t) expectTimeout(w, t)
s.copyDone() s.copyDone(protocol.BlockInfo{})
expectEvent(w, t, 1) expectEvent(w, t, 1)
expectTimeout(w, t) expectTimeout(w, t)
@ -74,7 +87,7 @@ func TestProgressEmitter(t *testing.T) {
expectEvent(w, t, 1) expectEvent(w, t, 1)
expectTimeout(w, t) expectTimeout(w, t)
s.pullDone() s.pullDone(protocol.BlockInfo{})
expectEvent(w, t, 1) expectEvent(w, t, 1)
expectTimeout(w, t) expectTimeout(w, t)
@ -85,3 +98,335 @@ func TestProgressEmitter(t *testing.T) {
expectTimeout(w, t) expectTimeout(w, t)
} }
func TestSendDownloadProgressMessages(t *testing.T) {
c := config.Wrap("/tmp/test", config.Configuration{})
c.SetOptions(config.OptionsConfiguration{
ProgressUpdateIntervalS: 0,
TempIndexMinBlocks: 10,
})
fc := &FakeConnection{}
p := NewProgressEmitter(c)
p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"})
expect := func(updateIdx int, state *sharedPullerState, updateType uint32, version protocol.Vector, blocks []int32, remove bool) {
messageIdx := -1
for i, msg := range fc.downloadProgressMessages {
if msg.folder == state.folder {
messageIdx = i
break
}
}
if messageIdx < 0 {
t.Errorf("Message for folder %s does not exist at %s", state.folder, caller(1))
}
msg := fc.downloadProgressMessages[messageIdx]
// Don't know the index (it's random due to iterating maps)
if updateIdx == -1 {
for i, upd := range msg.updates {
if upd.Name == state.file.Name {
updateIdx = i
break
}
}
}
if updateIdx == -1 {
t.Errorf("Could not find update for %s at %s", state.file.Name, caller(1))
}
if updateIdx > len(msg.updates)-1 {
t.Errorf("Update at index %d does not exist at %s", updateIdx, caller(1))
}
update := msg.updates[updateIdx]
if update.UpdateType != updateType {
t.Errorf("Wrong update type at %s", caller(1))
}
if !update.Version.Equal(version) {
t.Errorf("Wrong version at %s", caller(1))
}
if len(update.BlockIndexes) != len(blocks) {
t.Errorf("Wrong indexes. Have %d expect %d at %s", len(update.BlockIndexes), len(blocks), caller(1))
}
for i := range update.BlockIndexes {
if update.BlockIndexes[i] != blocks[i] {
t.Errorf("Index %d incorrect at %s", i, caller(1))
}
}
if remove {
fc.downloadProgressMessages = append(fc.downloadProgressMessages[:messageIdx], fc.downloadProgressMessages[messageIdx+1:]...)
}
}
expectEmpty := func() {
if len(fc.downloadProgressMessages) > 0 {
t.Errorf("Still have something at %s: %#v", caller(1), fc.downloadProgressMessages)
}
}
now := time.Now()
tick := func() time.Time {
now = now.Add(time.Second)
return now
}
if len(fc.downloadProgressMessages) != 0 {
t.Error("Expected no requests")
}
v1 := (protocol.Vector{}).Update(0)
v2 := (protocol.Vector{}).Update(1)
// Requires more than 10 blocks to work.
blocks := make([]protocol.BlockInfo, 11, 11)
state1 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state1",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
availableUpdated: time.Now(),
}
p.registry["1"] = state1
// Has no blocks, hence no message is sent
p.sendDownloadProgressMessages()
expectEmpty()
// Returns update for puller with new extra blocks
state1.available = []int32{1}
p.sendDownloadProgressMessages()
expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{1}, true)
expectEmpty()
// Does nothing if nothing changes
p.sendDownloadProgressMessages()
expectEmpty()
// Does nothing if timestamp updated, but no new blocks (should never happen)
state1.availableUpdated = tick()
p.sendDownloadProgressMessages()
expectEmpty()
// Does not return an update if date blocks change but date does not (should never happen)
state1.available = []int32{1, 2}
p.sendDownloadProgressMessages()
expectEmpty()
// If the date and blocks changes, returns only the diff
state1.availableUpdated = tick()
p.sendDownloadProgressMessages()
expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{2}, true)
expectEmpty()
// Returns forget and update if puller version has changed
state1.file.Version = v2
p.sendDownloadProgressMessages()
expect(0, state1, protocol.UpdateTypeForget, v1, nil, false)
expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1, 2}, true)
expectEmpty()
// Sends an empty update if new file exists, but does not have any blocks yet. (To indicate that the old blocks are no longer available)
state1.file.Version = v1
state1.available = nil
state1.availableUpdated = tick()
p.sendDownloadProgressMessages()
expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
expect(1, state1, protocol.UpdateTypeAppend, v1, nil, true)
expectEmpty()
// Updates for multiple files and folders can be combined
state1.available = []int32{1, 2, 3}
state1.availableUpdated = tick()
state2 := &sharedPullerState{
folder: "folder2",
file: protocol.FileInfo{
Name: "state2",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
state3 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state3",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
state4 := &sharedPullerState{
folder: "folder2",
file: protocol.FileInfo{
Name: "state4",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
p.registry["2"] = state2
p.registry["3"] = state3
p.registry["4"] = state4
p.sendDownloadProgressMessages()
expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
expectEmpty()
// Returns forget if puller no longer exists, as well as updates if it has been updated.
state1.available = []int32{1, 2, 3, 4, 5}
state1.availableUpdated = tick()
state2.available = []int32{1, 2, 3, 4, 5}
state2.availableUpdated = tick()
delete(p.registry, "3")
delete(p.registry, "4")
p.sendDownloadProgressMessages()
expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
expect(-1, state3, protocol.UpdateTypeForget, v1, nil, true)
expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
expectEmpty()
// Deletions are sent only once (actual bug I found writing the tests)
p.sendDownloadProgressMessages()
p.sendDownloadProgressMessages()
expectEmpty()
// Not sent for "inactive" (symlinks, dirs, or wrong folder) pullers
// Directory
state5 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state5",
Version: v1,
Flags: protocol.FlagDirectory,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
// Symlink
state6 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state6",
Version: v1,
Flags: protocol.FlagSymlink,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
// Some other directory
state7 := &sharedPullerState{
folder: "folderXXX",
file: protocol.FileInfo{
Name: "state7",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
// Less than 10 blocks
state8 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state8",
Version: v1,
Blocks: blocks[:3],
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
p.registry["5"] = state5
p.registry["6"] = state6
p.registry["7"] = state7
p.registry["8"] = state8
p.sendDownloadProgressMessages()
expectEmpty()
// Device is no longer subscribed to a particular folder
delete(p.registry, "1") // Clean up first
delete(p.registry, "2") // Clean up first
p.sendDownloadProgressMessages()
expect(-1, state1, protocol.UpdateTypeForget, v1, nil, true)
expect(-1, state2, protocol.UpdateTypeForget, v1, nil, true)
expectEmpty()
p.registry["1"] = state1
p.registry["2"] = state2
p.registry["3"] = state3
p.registry["4"] = state4
p.sendDownloadProgressMessages()
expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
expectEmpty()
p.temporaryIndexUnsubscribe(fc)
p.temporaryIndexSubscribe(fc, []string{"folder"})
p.sendDownloadProgressMessages()
// See progressemitter.go for explanation why this is commented out.
// Search for state.cleanup
//expect(-1, state2, protocol.UpdateTypeForget, v1, nil, false)
//expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
expectEmpty()
// Cleanup when device no longer exists
p.temporaryIndexUnsubscribe(fc)
p.sendDownloadProgressMessages()
_, ok := p.sentDownloadStates[fc.ID()]
if ok {
t.Error("Should not be there")
}
}

View File

@ -970,9 +970,9 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
scanner.PopulateOffsets(file.Blocks) scanner.PopulateOffsets(file.Blocks)
reused := 0
var blocks []protocol.BlockInfo var blocks []protocol.BlockInfo
var blocksSize int64 var blocksSize int64
var reused []int32
// Check for an old temporary file which might have some blocks we could // Check for an old temporary file which might have some blocks we could
// reuse. // reuse.
@ -988,25 +988,27 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
} }
// Since the blocks are already there, we don't need to get them. // Since the blocks are already there, we don't need to get them.
for _, block := range file.Blocks { for i, block := range file.Blocks {
_, ok := existingBlocks[block.String()] _, ok := existingBlocks[block.String()]
if !ok { if !ok {
blocks = append(blocks, block) blocks = append(blocks, block)
blocksSize += int64(block.Size) blocksSize += int64(block.Size)
} else {
reused = append(reused, int32(i))
} }
} }
// The sharedpullerstate will know which flags to use when opening the // The sharedpullerstate will know which flags to use when opening the
// temp file depending if we are reusing any blocks or not. // temp file depending if we are reusing any blocks or not.
reused = len(file.Blocks) - len(blocks) if len(reused) == 0 {
if reused == 0 {
// Otherwise, discard the file ourselves in order for the // Otherwise, discard the file ourselves in order for the
// sharedpuller not to panic when it fails to exclusively create a // sharedpuller not to panic when it fails to exclusively create a
// file which already exists // file which already exists
osutil.InWritableDir(osutil.Remove, tempName) osutil.InWritableDir(osutil.Remove, tempName)
} }
} else { } else {
blocks = file.Blocks // Copy the blocks, as we don't want to shuffle them on the FileInfo
blocks = append(blocks, file.Blocks...)
blocksSize = file.Size() blocksSize = file.Size()
} }
@ -1018,6 +1020,12 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
} }
} }
// Shuffle the blocks
for i := range blocks {
j := rand.Intn(i + 1)
blocks[i], blocks[j] = blocks[j], blocks[i]
}
events.Default.Log(events.ItemStarted, map[string]string{ events.Default.Log(events.ItemStarted, map[string]string{
"folder": p.folder, "folder": p.folder,
"item": file.Name, "item": file.Name,
@ -1026,17 +1034,20 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks
}) })
s := sharedPullerState{ s := sharedPullerState{
file: file, file: file,
folder: p.folder, folder: p.folder,
tempName: tempName, tempName: tempName,
realName: realName, realName: realName,
copyTotal: len(blocks), copyTotal: len(blocks),
copyNeeded: len(blocks), copyNeeded: len(blocks),
reused: reused, reused: len(reused),
ignorePerms: p.ignorePermissions(file), updated: time.Now(),
version: curFile.Version, available: reused,
mut: sync.NewMutex(), availableUpdated: time.Now(),
sparse: p.allowSparse, ignorePerms: p.ignorePermissions(file),
version: curFile.Version,
mut: sync.NewRWMutex(),
sparse: p.allowSparse,
} }
l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused) l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
@ -1184,7 +1195,7 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull
} }
pullChan <- ps pullChan <- ps
} else { } else {
state.copyDone() state.copyDone(block)
} }
} }
out <- state.sharedPullerState out <- state.sharedPullerState
@ -1210,19 +1221,19 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul
if p.allowSparse && state.reused == 0 && state.block.IsEmpty() { if p.allowSparse && state.reused == 0 && state.block.IsEmpty() {
// There is no need to request a block of all zeroes. Pretend we // There is no need to request a block of all zeroes. Pretend we
// requested it and handled it correctly. // requested it and handled it correctly.
state.pullDone() state.pullDone(state.block)
out <- state.sharedPullerState out <- state.sharedPullerState
continue continue
} }
var lastError error var lastError error
potentialDevices := p.model.Availability(p.folder, state.file.Name) candidates := p.model.Availability(p.folder, state.file.Name, state.file.Version, state.block)
for { for {
// Select the least busy device to pull the block from. If we found no // Select the least busy device to pull the block from. If we found no
// feasible device at all, fail the block (and in the long run, the // feasible device at all, fail the block (and in the long run, the
// file). // file).
selected := activity.leastBusy(potentialDevices) selected, found := activity.leastBusy(candidates)
if selected == (protocol.DeviceID{}) { if !found {
if lastError != nil { if lastError != nil {
state.fail("pull", lastError) state.fail("pull", lastError)
} else { } else {
@ -1231,12 +1242,12 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul
break break
} }
potentialDevices = removeDevice(potentialDevices, selected) candidates = removeAvailability(candidates, selected)
// Fetch the block, while marking the selected device as in use so that // Fetch the block, while marking the selected device as in use so that
// leastBusy can select another device when someone else asks. // leastBusy can select another device when someone else asks.
activity.using(selected) activity.using(selected)
buf, lastError := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, 0, nil) buf, lastError := p.model.requestGlobal(selected.ID, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary)
activity.done(selected) activity.done(selected)
if lastError != nil { if lastError != nil {
l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError) l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
@ -1256,7 +1267,7 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul
if err != nil { if err != nil {
state.fail("save", err) state.fail("save", err)
} else { } else {
state.pullDone() state.pullDone(state.block)
} }
break break
} }
@ -1481,14 +1492,24 @@ func (p *rwFolder) inConflict(current, replacement protocol.Vector) bool {
return false return false
} }
func removeDevice(devices []protocol.DeviceID, device protocol.DeviceID) []protocol.DeviceID { func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
for i := range devices { for i := range cfg.Folders {
if devices[i] == device { folder := &cfg.Folders[i]
devices[i] = devices[len(devices)-1] if folder.ID == folderID {
return devices[:len(devices)-1] folder.Invalid = err.Error()
return
} }
} }
return devices }
func removeAvailability(availabilities []Availability, availability Availability) []Availability {
for i := range availabilities {
if availabilities[i] == availability {
availabilities[i] = availabilities[len(availabilities)-1]
return availabilities[:len(availabilities)-1]
}
}
return availabilities
} }
func (p *rwFolder) moveForConflict(name string) error { func (p *rwFolder) moveForConflict(name string) error {

View File

@ -104,9 +104,16 @@ func TestHandleFile(t *testing.T) {
t.Errorf("Unexpected count of copy blocks: %d != 8", len(toCopy.blocks)) t.Errorf("Unexpected count of copy blocks: %d != 8", len(toCopy.blocks))
} }
for i, block := range toCopy.blocks { for _, block := range blocks[1:] {
if string(block.Hash) != string(blocks[i+1].Hash) { found := false
t.Errorf("Block mismatch: %s != %s", block.String(), blocks[i+1].String()) for _, toCopyBlock := range toCopy.blocks {
if string(toCopyBlock.Hash) == string(block.Hash) {
found = true
break
}
}
if !found {
t.Errorf("Did not find block %s", block.String())
} }
} }
} }
@ -138,9 +145,17 @@ func TestHandleFileWithTemp(t *testing.T) {
t.Errorf("Unexpected count of copy blocks: %d != 4", len(toCopy.blocks)) t.Errorf("Unexpected count of copy blocks: %d != 4", len(toCopy.blocks))
} }
for i, eq := range []int{1, 5, 6, 8} { for _, idx := range []int{1, 5, 6, 8} {
if string(toCopy.blocks[i].Hash) != string(blocks[eq].Hash) { found := false
t.Errorf("Block mismatch: %s != %s", toCopy.blocks[i].String(), blocks[eq].String()) block := blocks[idx]
for _, toCopyBlock := range toCopy.blocks {
if string(toCopyBlock.Hash) == string(block.Hash) {
found = true
break
}
}
if !found {
t.Errorf("Did not find block %s", block.String())
} }
} }
} }
@ -187,13 +202,22 @@ func TestCopierFinder(t *testing.T) {
default: default:
} }
// Verify that the right blocks went into the pull list // Verify that the right blocks went into the pull list.
for i, eq := range []int{1, 5, 6, 8} { // They are pulled in random order.
if string(pulls[i].block.Hash) != string(blocks[eq].Hash) { for _, idx := range []int{1, 5, 6, 8} {
t.Errorf("Block %d mismatch: %s != %s", eq, pulls[i].block.String(), blocks[eq].String()) found := false
block := blocks[idx]
for _, pulledBlock := range pulls {
if string(pulledBlock.block.Hash) == string(block.Hash) {
found = true
break
}
} }
if string(finish.file.Blocks[eq-1].Hash) != string(blocks[eq].Hash) { if !found {
t.Errorf("Block %d mismatch: %s != %s", eq, finish.file.Blocks[eq-1].String(), blocks[eq].String()) t.Errorf("Did not find block %s", block.String())
}
if string(finish.file.Blocks[idx-1].Hash) != string(blocks[idx].Hash) {
t.Errorf("Block %d mismatch: %s != %s", idx, finish.file.Blocks[idx-1].String(), blocks[idx].String())
} }
} }

View File

@ -0,0 +1,184 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"time"
"github.com/syncthing/syncthing/lib/protocol"
)
// sentFolderFileDownloadState represents a state of what we've announced as available
// to some remote device for a specific file.
type sentFolderFileDownloadState struct {
blockIndexes []int32
version protocol.Vector
updated time.Time
}
// sentFolderDownloadState represents a state of what we've announced as available
// to some remote device for a specific folder.
type sentFolderDownloadState struct {
files map[string]*sentFolderFileDownloadState
}
// update takes a set of currently active sharedPullerStates, and returns a list
// of updates which we need to send to the client to become up to date.
func (s *sentFolderDownloadState) update(pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
var name string
var updates []protocol.FileDownloadProgressUpdate
seen := make(map[string]struct{}, len(pullers))
for _, puller := range pullers {
name = puller.file.Name
seen[name] = struct{}{}
pullerBlockIndexes := puller.Available()
pullerVersion := puller.file.Version
pullerBlockIndexesUpdated := puller.AvailableUpdated()
localFile, ok := s.files[name]
// New file we haven't seen before
if !ok {
// Only send an update if the file actually has some blocks.
if len(pullerBlockIndexes) > 0 {
s.files[name] = &sentFolderFileDownloadState{
blockIndexes: pullerBlockIndexes,
updated: pullerBlockIndexesUpdated,
version: pullerVersion,
}
updates = append(updates, protocol.FileDownloadProgressUpdate{
Name: name,
Version: pullerVersion,
UpdateType: protocol.UpdateTypeAppend,
BlockIndexes: pullerBlockIndexes,
})
}
continue
}
// Existing file we've already sent an update for.
if pullerBlockIndexesUpdated.Equal(localFile.updated) && pullerVersion.Equal(localFile.version) {
// The file state hasn't changed, go to next.
continue
}
if !pullerVersion.Equal(localFile.version) {
// The version has changed, clean up whatever we had for the old
// file, and advertise the new file.
updates = append(updates, protocol.FileDownloadProgressUpdate{
Name: name,
Version: localFile.version,
UpdateType: protocol.UpdateTypeForget,
})
updates = append(updates, protocol.FileDownloadProgressUpdate{
Name: name,
Version: pullerVersion,
UpdateType: protocol.UpdateTypeAppend,
BlockIndexes: pullerBlockIndexes,
})
localFile.blockIndexes = pullerBlockIndexes
localFile.updated = pullerBlockIndexesUpdated
localFile.version = pullerVersion
continue
}
// Relies on the fact that sharedPullerState.Available() should always
// append.
newBlocks := pullerBlockIndexes[len(localFile.blockIndexes):]
localFile.blockIndexes = append(localFile.blockIndexes, newBlocks...)
localFile.updated = pullerBlockIndexesUpdated
// If there are new blocks, send the update.
if len(newBlocks) > 0 {
updates = append(updates, protocol.FileDownloadProgressUpdate{
Name: name,
Version: localFile.version,
UpdateType: protocol.UpdateTypeAppend,
BlockIndexes: newBlocks,
})
}
}
// For each file that we are tracking, see if there still is a puller for it
// if not, the file completed or errored out.
for name, info := range s.files {
_, ok := seen[name]
if !ok {
updates = append(updates, protocol.FileDownloadProgressUpdate{
Name: name,
Version: info.version,
UpdateType: protocol.UpdateTypeForget,
})
delete(s.files, name)
}
}
return updates
}
// destroy removes all stored state, and returns a set of updates we need to
// dispatch to clean up the state on the remote end.
func (s *sentFolderDownloadState) destroy() []protocol.FileDownloadProgressUpdate {
updates := make([]protocol.FileDownloadProgressUpdate, 0, len(s.files))
for name, info := range s.files {
updates = append(updates, protocol.FileDownloadProgressUpdate{
Name: name,
Version: info.version,
UpdateType: protocol.UpdateTypeForget,
})
delete(s.files, name)
}
return updates
}
// sentDownloadState represents a state of what we've announced as available
// to some remote device. It is used from within the progress emitter
// which only has one routine, hence is deemed threadsafe.
type sentDownloadState struct {
folderStates map[string]*sentFolderDownloadState
}
// update receives a folder, and a slice of pullers that are currently available
// for the given folder, and according to the state of what we've seen before
// returns a set of updates which we should send to the remote device to make
// it aware of everything that we currently have available.
func (s *sentDownloadState) update(folder string, pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
fs, ok := s.folderStates[folder]
if !ok {
fs = &sentFolderDownloadState{
files: make(map[string]*sentFolderFileDownloadState),
}
s.folderStates[folder] = fs
}
return fs.update(pullers)
}
// folders returns a set of folders this state is currently aware off.
func (s *sentDownloadState) folders() []string {
folders := make([]string, 0, len(s.folderStates))
for key := range s.folderStates {
folders = append(folders, key)
}
return folders
}
// cleanup cleans up all state related to a folder, and returns a set of updates
// which would clean up the state on the remote device.
func (s *sentDownloadState) cleanup(folder string) []protocol.FileDownloadProgressUpdate {
fs, ok := s.folderStates[folder]
if ok {
updates := fs.destroy()
delete(s.folderStates, folder)
return updates
}
return nil
}

View File

@ -10,6 +10,7 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"time"
"github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
@ -30,15 +31,18 @@ type sharedPullerState struct {
sparse bool sparse bool
// Mutable, must be locked for access // Mutable, must be locked for access
err error // The first error we hit err error // The first error we hit
fd *os.File // The fd of the temp file fd *os.File // The fd of the temp file
copyTotal int // Total number of copy actions for the whole job copyTotal int // Total number of copy actions for the whole job
pullTotal int // Total number of pull actions for the whole job pullTotal int // Total number of pull actions for the whole job
copyOrigin int // Number of blocks copied from the original file copyOrigin int // Number of blocks copied from the original file
copyNeeded int // Number of copy actions still pending copyNeeded int // Number of copy actions still pending
pullNeeded int // Number of block pulls still pending pullNeeded int // Number of block pulls still pending
closed bool // True if the file has been finalClosed. updated time.Time // Time when any of the counters above were last updated
mut sync.Mutex // Protects the above closed bool // True if the file has been finalClosed.
available []int32 // Indexes of the blocks that are available in the temporary file
availableUpdated time.Time // Time when list of available blocks was last updated
mut sync.RWMutex // Protects the above
} }
// A momentary state representing the progress of the puller // A momentary state representing the progress of the puller
@ -56,7 +60,7 @@ type pullerProgress struct {
// A lockedWriterAt synchronizes WriteAt calls with an external mutex. // A lockedWriterAt synchronizes WriteAt calls with an external mutex.
// WriteAt() is goroutine safe by itself, but not against for example Close(). // WriteAt() is goroutine safe by itself, but not against for example Close().
type lockedWriterAt struct { type lockedWriterAt struct {
mut *sync.Mutex mut *sync.RWMutex
wr io.WriterAt wr io.WriterAt
} }
@ -196,15 +200,19 @@ func (s *sharedPullerState) failLocked(context string, err error) {
} }
func (s *sharedPullerState) failed() error { func (s *sharedPullerState) failed() error {
s.mut.Lock() s.mut.RLock()
defer s.mut.Unlock() err := s.err
s.mut.RUnlock()
return s.err return err
} }
func (s *sharedPullerState) copyDone() { func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
s.mut.Lock() s.mut.Lock()
s.copyNeeded-- s.copyNeeded--
s.updated = time.Now()
s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
s.availableUpdated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded) l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
s.mut.Unlock() s.mut.Unlock()
} }
@ -212,6 +220,7 @@ func (s *sharedPullerState) copyDone() {
func (s *sharedPullerState) copiedFromOrigin() { func (s *sharedPullerState) copiedFromOrigin() {
s.mut.Lock() s.mut.Lock()
s.copyOrigin++ s.copyOrigin++
s.updated = time.Now()
s.mut.Unlock() s.mut.Unlock()
} }
@ -221,13 +230,17 @@ func (s *sharedPullerState) pullStarted() {
s.copyNeeded-- s.copyNeeded--
s.pullTotal++ s.pullTotal++
s.pullNeeded++ s.pullNeeded++
s.updated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded) l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
s.mut.Unlock() s.mut.Unlock()
} }
func (s *sharedPullerState) pullDone() { func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
s.mut.Lock() s.mut.Lock()
s.pullNeeded-- s.pullNeeded--
s.updated = time.Now()
s.available = append(s.available, int32(block.Offset/protocol.BlockSize))
s.availableUpdated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded) l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
s.mut.Unlock() s.mut.Unlock()
} }
@ -265,10 +278,10 @@ func (s *sharedPullerState) finalClose() (bool, error) {
return true, s.err return true, s.err
} }
// Returns the momentarily progress for the puller // Progress returns the momentarily progress for the puller
func (s *sharedPullerState) Progress() *pullerProgress { func (s *sharedPullerState) Progress() *pullerProgress {
s.mut.Lock() s.mut.RLock()
defer s.mut.Unlock() defer s.mut.RUnlock()
total := s.reused + s.copyTotal + s.pullTotal total := s.reused + s.copyTotal + s.pullTotal
done := total - s.copyNeeded - s.pullNeeded done := total - s.copyNeeded - s.pullNeeded
return &pullerProgress{ return &pullerProgress{
@ -282,3 +295,27 @@ func (s *sharedPullerState) Progress() *pullerProgress {
BytesDone: db.BlocksToSize(done), BytesDone: db.BlocksToSize(done),
} }
} }
// Updated returns the time when any of the progress related counters was last updated.
func (s *sharedPullerState) Updated() time.Time {
s.mut.RLock()
t := s.updated
s.mut.RUnlock()
return t
}
// AvailableUpdated returns the time last time list of available blocks was updated
func (s *sharedPullerState) AvailableUpdated() time.Time {
s.mut.RLock()
t := s.availableUpdated
s.mut.RUnlock()
return t
}
// Available returns blocks available in the current temporary file
func (s *sharedPullerState) Available() []int32 {
s.mut.RLock()
blocks := s.available
s.mut.RUnlock()
return blocks
}

View File

@ -16,7 +16,7 @@ import (
func TestSourceFileOK(t *testing.T) { func TestSourceFileOK(t *testing.T) {
s := sharedPullerState{ s := sharedPullerState{
realName: "testdata/foo", realName: "testdata/foo",
mut: sync.NewMutex(), mut: sync.NewRWMutex(),
} }
fd, err := s.sourceFile() fd, err := s.sourceFile()
@ -45,7 +45,7 @@ func TestSourceFileOK(t *testing.T) {
func TestSourceFileBad(t *testing.T) { func TestSourceFileBad(t *testing.T) {
s := sharedPullerState{ s := sharedPullerState{
realName: "nonexistent", realName: "nonexistent",
mut: sync.NewMutex(), mut: sync.NewRWMutex(),
} }
fd, err := s.sourceFile() fd, err := s.sourceFile()
@ -71,7 +71,7 @@ func TestReadOnlyDir(t *testing.T) {
s := sharedPullerState{ s := sharedPullerState{
tempName: "testdata/read_only_dir/.temp_name", tempName: "testdata/read_only_dir/.temp_name",
mut: sync.NewMutex(), mut: sync.NewRWMutex(),
} }
fd, err := s.tempFile() fd, err := s.tempFile()

View File

@ -49,6 +49,9 @@ func (t *TestModel) Close(deviceID DeviceID, err error) {
func (t *TestModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { func (t *TestModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
} }
func (t *TestModel) DownloadProgress(DeviceID, string, []FileDownloadProgressUpdate, uint32, []Option) {
}
func (t *TestModel) closedError() error { func (t *TestModel) closedError() error {
select { select {
case <-t.closedCh: case <-t.closedCh:

View File

@ -138,6 +138,13 @@ type ClusterConfigMessage struct {
Options []Option // max:64 Options []Option // max:64
} }
type DownloadProgressMessage struct {
Folder string // max:64
Updates []FileDownloadProgressUpdate // max:1000000
Flags uint32
Options []Option // max:64
}
func (o *ClusterConfigMessage) GetOption(key string) string { func (o *ClusterConfigMessage) GetOption(key string) string {
for _, option := range o.Options { for _, option := range o.Options {
if option.Key == key { if option.Key == key {
@ -166,6 +173,13 @@ type Device struct {
Options []Option // max:64 Options []Option // max:64
} }
type FileDownloadProgressUpdate struct {
UpdateType uint32
Name string // max:8192
Version Vector
BlockIndexes []int32 // max:1000000
}
type Option struct { type Option struct {
Key string // max:64 Key string // max:64
Value string // max:1024 Value string // max:1024

View File

@ -690,6 +690,135 @@ func (o *ClusterConfigMessage) UnmarshalXDRFrom(u *xdr.Unmarshaller) error {
/* /*
DownloadProgressMessage Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Folder (length + padded data) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Updates |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Zero or more FileDownloadProgressUpdate Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Flags |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Options |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Zero or more Option Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct DownloadProgressMessage {
string Folder<64>;
FileDownloadProgressUpdate Updates<1000000>;
unsigned int Flags;
Option Options<64>;
}
*/
func (o DownloadProgressMessage) XDRSize() int {
return 4 + len(o.Folder) + xdr.Padding(len(o.Folder)) +
4 + xdr.SizeOfSlice(o.Updates) + 4 +
4 + xdr.SizeOfSlice(o.Options)
}
func (o DownloadProgressMessage) MarshalXDR() ([]byte, error) {
buf := make([]byte, o.XDRSize())
m := &xdr.Marshaller{Data: buf}
return buf, o.MarshalXDRInto(m)
}
func (o DownloadProgressMessage) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o DownloadProgressMessage) MarshalXDRInto(m *xdr.Marshaller) error {
if l := len(o.Folder); l > 64 {
return xdr.ElementSizeExceeded("Folder", l, 64)
}
m.MarshalString(o.Folder)
if l := len(o.Updates); l > 1000000 {
return xdr.ElementSizeExceeded("Updates", l, 1000000)
}
m.MarshalUint32(uint32(len(o.Updates)))
for i := range o.Updates {
if err := o.Updates[i].MarshalXDRInto(m); err != nil {
return err
}
}
m.MarshalUint32(o.Flags)
if l := len(o.Options); l > 64 {
return xdr.ElementSizeExceeded("Options", l, 64)
}
m.MarshalUint32(uint32(len(o.Options)))
for i := range o.Options {
if err := o.Options[i].MarshalXDRInto(m); err != nil {
return err
}
}
return m.Error
}
func (o *DownloadProgressMessage) UnmarshalXDR(bs []byte) error {
u := &xdr.Unmarshaller{Data: bs}
return o.UnmarshalXDRFrom(u)
}
func (o *DownloadProgressMessage) UnmarshalXDRFrom(u *xdr.Unmarshaller) error {
o.Folder = u.UnmarshalStringMax(64)
_UpdatesSize := int(u.UnmarshalUint32())
if _UpdatesSize < 0 {
return xdr.ElementSizeExceeded("Updates", _UpdatesSize, 1000000)
} else if _UpdatesSize == 0 {
o.Updates = nil
} else {
if _UpdatesSize > 1000000 {
return xdr.ElementSizeExceeded("Updates", _UpdatesSize, 1000000)
}
if _UpdatesSize <= len(o.Updates) {
o.Updates = o.Updates[:_UpdatesSize]
} else {
o.Updates = make([]FileDownloadProgressUpdate, _UpdatesSize)
}
for i := range o.Updates {
(&o.Updates[i]).UnmarshalXDRFrom(u)
}
}
o.Flags = u.UnmarshalUint32()
_OptionsSize := int(u.UnmarshalUint32())
if _OptionsSize < 0 {
return xdr.ElementSizeExceeded("Options", _OptionsSize, 64)
} else if _OptionsSize == 0 {
o.Options = nil
} else {
if _OptionsSize > 64 {
return xdr.ElementSizeExceeded("Options", _OptionsSize, 64)
}
if _OptionsSize <= len(o.Options) {
o.Options = o.Options[:_OptionsSize]
} else {
o.Options = make([]Option, _OptionsSize)
}
for i := range o.Options {
(&o.Options[i]).UnmarshalXDRFrom(u)
}
}
return u.Error
}
/*
Folder Structure: Folder Structure:
0 1 2 3 0 1 2 3
@ -996,6 +1125,109 @@ func (o *Device) UnmarshalXDRFrom(u *xdr.Unmarshaller) error {
/* /*
FileDownloadProgressUpdate Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Update Type |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Name (length + padded data) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Vector Structure \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Block Indexes |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
| Block Indexes (n items) |
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct FileDownloadProgressUpdate {
unsigned int UpdateType;
string Name<8192>;
Vector Version;
int BlockIndexes<1000000>;
}
*/
func (o FileDownloadProgressUpdate) XDRSize() int {
return 4 +
4 + len(o.Name) + xdr.Padding(len(o.Name)) +
o.Version.XDRSize() +
4 + len(o.BlockIndexes)*4
}
func (o FileDownloadProgressUpdate) MarshalXDR() ([]byte, error) {
buf := make([]byte, o.XDRSize())
m := &xdr.Marshaller{Data: buf}
return buf, o.MarshalXDRInto(m)
}
func (o FileDownloadProgressUpdate) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o FileDownloadProgressUpdate) MarshalXDRInto(m *xdr.Marshaller) error {
m.MarshalUint32(o.UpdateType)
if l := len(o.Name); l > 8192 {
return xdr.ElementSizeExceeded("Name", l, 8192)
}
m.MarshalString(o.Name)
if err := o.Version.MarshalXDRInto(m); err != nil {
return err
}
if l := len(o.BlockIndexes); l > 1000000 {
return xdr.ElementSizeExceeded("BlockIndexes", l, 1000000)
}
m.MarshalUint32(uint32(len(o.BlockIndexes)))
for i := range o.BlockIndexes {
m.MarshalUint32(uint32(o.BlockIndexes[i]))
}
return m.Error
}
func (o *FileDownloadProgressUpdate) UnmarshalXDR(bs []byte) error {
u := &xdr.Unmarshaller{Data: bs}
return o.UnmarshalXDRFrom(u)
}
func (o *FileDownloadProgressUpdate) UnmarshalXDRFrom(u *xdr.Unmarshaller) error {
o.UpdateType = u.UnmarshalUint32()
o.Name = u.UnmarshalStringMax(8192)
(&o.Version).UnmarshalXDRFrom(u)
_BlockIndexesSize := int(u.UnmarshalUint32())
if _BlockIndexesSize < 0 {
return xdr.ElementSizeExceeded("BlockIndexes", _BlockIndexesSize, 1000000)
} else if _BlockIndexesSize == 0 {
o.BlockIndexes = nil
} else {
if _BlockIndexesSize > 1000000 {
return xdr.ElementSizeExceeded("BlockIndexes", _BlockIndexesSize, 1000000)
}
if _BlockIndexesSize <= len(o.BlockIndexes) {
o.BlockIndexes = o.BlockIndexes[:_BlockIndexesSize]
} else {
o.BlockIndexes = make([]int32, _BlockIndexesSize)
}
for i := range o.BlockIndexes {
o.BlockIndexes[i] = int32(u.UnmarshalUint32())
}
}
return u.Error
}
/*
Option Structure: Option Structure:
0 1 2 3 0 1 2 3

View File

@ -9,32 +9,24 @@ package protocol
import "golang.org/x/text/unicode/norm" import "golang.org/x/text/unicode/norm"
type nativeModel struct { type nativeModel struct {
next Model Model
} }
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
for i := range files { for i := range files {
files[i].Name = norm.NFD.String(files[i].Name) files[i].Name = norm.NFD.String(files[i].Name)
} }
m.next.Index(deviceID, folder, files, flags, options) m.Model.Index(deviceID, folder, files, flags, options)
} }
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
for i := range files { for i := range files {
files[i].Name = norm.NFD.String(files[i].Name) files[i].Name = norm.NFD.String(files[i].Name)
} }
m.next.IndexUpdate(deviceID, folder, files, flags, options) m.Model.IndexUpdate(deviceID, folder, files, flags, options)
} }
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
name = norm.NFD.String(name) name = norm.NFD.String(name)
return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) return m.Model.Request(deviceID, folder, name, offset, hash, flags, options, buf)
}
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
m.next.ClusterConfig(deviceID, config)
}
func (m nativeModel) Close(deviceID DeviceID, err error) {
m.next.Close(deviceID, err)
} }

View File

@ -7,25 +7,5 @@ package protocol
// Normal Unixes uses NFC and slashes, which is the wire format. // Normal Unixes uses NFC and slashes, which is the wire format.
type nativeModel struct { type nativeModel struct {
next Model Model
}
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
m.next.Index(deviceID, folder, files, flags, options)
}
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
m.next.IndexUpdate(deviceID, folder, files, flags, options)
}
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
}
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
m.next.ClusterConfig(deviceID, config)
}
func (m nativeModel) Close(deviceID DeviceID, err error) {
m.next.Close(deviceID, err)
} }

View File

@ -21,30 +21,22 @@ var disallowedCharacters = string([]rune{
}) })
type nativeModel struct { type nativeModel struct {
next Model Model
} }
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
fixupFiles(folder, files) fixupFiles(folder, files)
m.next.Index(deviceID, folder, files, flags, options) m.Model.Index(deviceID, folder, files, flags, options)
} }
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
fixupFiles(folder, files) fixupFiles(folder, files)
m.next.IndexUpdate(deviceID, folder, files, flags, options) m.Model.IndexUpdate(deviceID, folder, files, flags, options)
} }
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
name = filepath.FromSlash(name) name = filepath.FromSlash(name)
return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) return m.Model.Request(deviceID, folder, name, offset, hash, flags, options, buf)
}
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
m.next.ClusterConfig(deviceID, config)
}
func (m nativeModel) Close(deviceID DeviceID, err error) {
m.next.Close(deviceID, err)
} }
func fixupFiles(folder string, files []FileInfo) { func fixupFiles(folder string, files []FileInfo) {

View File

@ -24,13 +24,14 @@ const (
) )
const ( const (
messageTypeClusterConfig = 0 messageTypeClusterConfig = 0
messageTypeIndex = 1 messageTypeIndex = 1
messageTypeRequest = 2 messageTypeRequest = 2
messageTypeResponse = 3 messageTypeResponse = 3
messageTypePing = 4 messageTypePing = 4
messageTypeIndexUpdate = 6 messageTypeIndexUpdate = 6
messageTypeClose = 7 messageTypeClose = 7
messageTypeDownloadProgress = 8
) )
const ( const (
@ -52,22 +53,29 @@ const (
SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget
) )
// IndexMessage message flags (for IndexUpdate)
const (
FlagIndexTemporary uint32 = 1 << iota
)
// Request message flags // Request message flags
const ( const (
FlagRequestTemporary uint32 = 1 << iota FlagFromTemporary uint32 = 1 << iota
)
// FileDownloadProgressUpdate update types
const (
UpdateTypeAppend uint32 = iota
UpdateTypeForget
)
// CLusterConfig flags
const (
FlagClusterConfigTemporaryIndexes uint32 = 1 << 0
) )
// ClusterConfigMessage.Folders flags // ClusterConfigMessage.Folders flags
const ( const (
FlagFolderReadOnly uint32 = 1 << 0 FlagFolderReadOnly uint32 = 1 << 0
FlagFolderIgnorePerms = 1 << 1 FlagFolderIgnorePerms = 1 << 1
FlagFolderIgnoreDelete = 1 << 2 FlagFolderIgnoreDelete = 1 << 2
FlagFolderAll = 1<<3 - 1 FlagFolderDisabledTempIndexes = 1 << 3
FlagFolderAll = 1<<4 - 1
) )
// ClusterConfigMessage.Folders.Devices flags // ClusterConfigMessage.Folders.Devices flags
@ -97,6 +105,8 @@ type Model interface {
ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
// The peer device closed the connection // The peer device closed the connection
Close(deviceID DeviceID, err error) Close(deviceID DeviceID, err error)
// The peer device sent progress updates for the files it is currently downloading
DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option)
} }
type Connection interface { type Connection interface {
@ -105,8 +115,9 @@ type Connection interface {
Name() string Name() string
Index(folder string, files []FileInfo, flags uint32, options []Option) error Index(folder string, files []FileInfo, flags uint32, options []Option) error
IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error
Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error)
ClusterConfig(config ClusterConfigMessage) ClusterConfig(config ClusterConfigMessage)
DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option)
Statistics() Statistics Statistics() Statistics
Closed() bool Closed() bool
} }
@ -242,7 +253,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32,
} }
// Request returns the bytes for the specified block after fetching them from the connected peer. // Request returns the bytes for the specified block after fetching them from the connected peer.
func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
var id int var id int
select { select {
case id = <-c.nextID: case id = <-c.nextID:
@ -250,6 +261,12 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
return nil, ErrClosed return nil, ErrClosed
} }
var flags uint32
if fromTemporary {
flags |= FlagFromTemporary
}
c.awaitingMut.Lock() c.awaitingMut.Lock()
if ch := c.awaiting[id]; ch != nil { if ch := c.awaiting[id]; ch != nil {
panic("id taken") panic("id taken")
@ -265,7 +282,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
Size: int32(size), Size: int32(size),
Hash: hash, Hash: hash,
Flags: flags, Flags: flags,
Options: options, Options: nil,
}, nil) }, nil)
if !ok { if !ok {
return nil, ErrClosed return nil, ErrClosed
@ -292,6 +309,16 @@ func (c *rawConnection) Closed() bool {
} }
} }
// DownloadProgress sends the progress updates for the files that are currently being downloaded.
func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option) {
c.send(-1, messageTypeDownloadProgress, DownloadProgressMessage{
Folder: folder,
Updates: updates,
Flags: flags,
Options: options,
}, nil)
}
func (c *rawConnection) ping() bool { func (c *rawConnection) ping() bool {
var id int var id int
select { select {
@ -359,6 +386,12 @@ func (c *rawConnection) readerLoop() (err error) {
} }
c.handleResponse(hdr.msgID, msg) c.handleResponse(hdr.msgID, msg)
case DownloadProgressMessage:
if state != stateReady {
return fmt.Errorf("protocol error: response message in state %d", state)
}
c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates, msg.Flags, msg.Options)
case pingMessage: case pingMessage:
if state != stateReady { if state != stateReady {
return fmt.Errorf("protocol error: ping message in state %d", state) return fmt.Errorf("protocol error: ping message in state %d", state)
@ -469,6 +502,14 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
err = cm.UnmarshalXDR(msgBuf) err = cm.UnmarshalXDR(msgBuf)
msg = cm msg = cm
case messageTypeDownloadProgress:
var dp DownloadProgressMessage
err := dp.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = dp
default: default:
err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
} }

View File

@ -183,7 +183,7 @@ func TestClose(t *testing.T) {
c0.Index("default", nil, 0, nil) c0.Index("default", nil, 0, nil)
c0.Index("default", nil, 0, nil) c0.Index("default", nil, 0, nil)
if _, err := c0.Request("default", "foo", 0, 0, nil, 0, nil); err == nil { if _, err := c0.Request("default", "foo", 0, 0, nil, false); err == nil {
t.Error("Request should return an error") t.Error("Request should return an error")
} }
} }

View File

@ -34,7 +34,7 @@ func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo, flags ui
return c.Connection.IndexUpdate(folder, myFs, flags, options) return c.Connection.IndexUpdate(folder, myFs, flags, options)
} }
func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
name = norm.NFC.String(filepath.ToSlash(name)) name = norm.NFC.String(filepath.ToSlash(name))
return c.Connection.Request(folder, name, offset, size, hash, flags, options) return c.Connection.Request(folder, name, offset, size, hash, fromTemporary)
} }