mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-07 00:53:58 +00:00
f86deedd9c
* lib/model: Progress emitter network operations dont need to be blocking * Do sends outside of the lock
471 lines
12 KiB
Go
471 lines
12 KiB
Go
// Copyright (C) 2014 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
package model
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/syncthing/syncthing/lib/config"
|
|
"github.com/syncthing/syncthing/lib/events"
|
|
"github.com/syncthing/syncthing/lib/protocol"
|
|
"github.com/syncthing/syncthing/lib/sync"
|
|
)
|
|
|
|
var timeout = 100 * time.Millisecond
|
|
|
|
func caller(skip int) string {
|
|
_, file, line, ok := runtime.Caller(skip + 1)
|
|
if !ok {
|
|
return "unknown"
|
|
}
|
|
return fmt.Sprintf("%s:%d", filepath.Base(file), line)
|
|
}
|
|
|
|
func expectEvent(w events.Subscription, t *testing.T, size int) {
|
|
event, err := w.Poll(timeout)
|
|
if err != nil {
|
|
t.Fatal("Unexpected error:", err, "at", caller(1))
|
|
}
|
|
if event.Type != events.DownloadProgress {
|
|
t.Fatal("Unexpected event:", event, "at", caller(1))
|
|
}
|
|
data := event.Data.(map[string]map[string]*pullerProgress)
|
|
if len(data) != size {
|
|
t.Fatal("Unexpected event data size:", data, "at", caller(1))
|
|
}
|
|
}
|
|
|
|
func expectTimeout(w events.Subscription, t *testing.T) {
|
|
_, err := w.Poll(timeout)
|
|
if err != events.ErrTimeout {
|
|
t.Fatal("Unexpected non-Timeout error:", err, "at", caller(1))
|
|
}
|
|
}
|
|
|
|
func TestProgressEmitter(t *testing.T) {
|
|
evLogger := events.NewLogger()
|
|
go evLogger.Serve()
|
|
defer evLogger.Stop()
|
|
|
|
w := evLogger.Subscribe(events.DownloadProgress)
|
|
|
|
c := createTmpWrapper(config.Configuration{})
|
|
defer os.Remove(c.ConfigPath())
|
|
c.SetOptions(config.OptionsConfiguration{
|
|
ProgressUpdateIntervalS: 60, // irrelevant, but must be positive
|
|
})
|
|
|
|
p := NewProgressEmitter(c, evLogger)
|
|
go p.Serve()
|
|
defer p.Stop()
|
|
p.interval = 0
|
|
|
|
expectTimeout(w, t)
|
|
|
|
s := sharedPullerState{
|
|
updated: time.Now(),
|
|
mut: sync.NewRWMutex(),
|
|
}
|
|
p.Register(&s)
|
|
|
|
expectEvent(w, t, 1)
|
|
expectTimeout(w, t)
|
|
|
|
s.copyDone(protocol.BlockInfo{})
|
|
|
|
expectEvent(w, t, 1)
|
|
expectTimeout(w, t)
|
|
|
|
s.copiedFromOrigin()
|
|
|
|
expectEvent(w, t, 1)
|
|
expectTimeout(w, t)
|
|
|
|
s.pullStarted()
|
|
|
|
expectEvent(w, t, 1)
|
|
expectTimeout(w, t)
|
|
|
|
s.pullDone(protocol.BlockInfo{})
|
|
|
|
expectEvent(w, t, 1)
|
|
expectTimeout(w, t)
|
|
|
|
p.Deregister(&s)
|
|
|
|
expectEvent(w, t, 0)
|
|
expectTimeout(w, t)
|
|
|
|
}
|
|
|
|
func TestSendDownloadProgressMessages(t *testing.T) {
|
|
c := createTmpWrapper(config.Configuration{})
|
|
defer os.Remove(c.ConfigPath())
|
|
c.SetOptions(config.OptionsConfiguration{
|
|
ProgressUpdateIntervalS: 60, // irrelevant, but must be positive
|
|
TempIndexMinBlocks: 10,
|
|
})
|
|
|
|
fc := &fakeConnection{}
|
|
|
|
evLogger := events.NewLogger()
|
|
go evLogger.Serve()
|
|
defer evLogger.Stop()
|
|
|
|
p := NewProgressEmitter(c, evLogger)
|
|
p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"})
|
|
p.registry["folder"] = make(map[string]*sharedPullerState)
|
|
p.registry["folder2"] = make(map[string]*sharedPullerState)
|
|
p.registry["folderXXX"] = make(map[string]*sharedPullerState)
|
|
|
|
expect := func(updateIdx int, state *sharedPullerState, updateType protocol.FileDownloadProgressUpdateType, version protocol.Vector, blocks []int32, remove bool) {
|
|
messageIdx := -1
|
|
for i, msg := range fc.downloadProgressMessages {
|
|
if msg.folder == state.folder {
|
|
messageIdx = i
|
|
break
|
|
}
|
|
}
|
|
if messageIdx < 0 {
|
|
t.Errorf("Message for folder %s does not exist at %s", state.folder, caller(1))
|
|
}
|
|
|
|
msg := fc.downloadProgressMessages[messageIdx]
|
|
|
|
// Don't know the index (it's random due to iterating maps)
|
|
if updateIdx == -1 {
|
|
for i, upd := range msg.updates {
|
|
if upd.Name == state.file.Name {
|
|
updateIdx = i
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if updateIdx == -1 {
|
|
t.Errorf("Could not find update for %s at %s", state.file.Name, caller(1))
|
|
}
|
|
|
|
if updateIdx > len(msg.updates)-1 {
|
|
t.Errorf("Update at index %d does not exist at %s", updateIdx, caller(1))
|
|
}
|
|
|
|
update := msg.updates[updateIdx]
|
|
|
|
if update.UpdateType != updateType {
|
|
t.Errorf("Wrong update type at %s", caller(1))
|
|
}
|
|
|
|
if !update.Version.Equal(version) {
|
|
t.Errorf("Wrong version at %s", caller(1))
|
|
}
|
|
|
|
if len(update.BlockIndexes) != len(blocks) {
|
|
t.Errorf("Wrong indexes. Have %d expect %d at %s", len(update.BlockIndexes), len(blocks), caller(1))
|
|
}
|
|
for i := range update.BlockIndexes {
|
|
if update.BlockIndexes[i] != blocks[i] {
|
|
t.Errorf("Index %d incorrect at %s", i, caller(1))
|
|
}
|
|
}
|
|
|
|
if remove {
|
|
fc.downloadProgressMessages = append(fc.downloadProgressMessages[:messageIdx], fc.downloadProgressMessages[messageIdx+1:]...)
|
|
}
|
|
}
|
|
expectEmpty := func() {
|
|
if len(fc.downloadProgressMessages) > 0 {
|
|
t.Errorf("Still have something at %s: %#v", caller(1), fc.downloadProgressMessages)
|
|
}
|
|
}
|
|
|
|
now := time.Now()
|
|
tick := func() time.Time {
|
|
now = now.Add(time.Second)
|
|
return now
|
|
}
|
|
|
|
if len(fc.downloadProgressMessages) != 0 {
|
|
t.Error("Expected no requests")
|
|
}
|
|
|
|
v1 := (protocol.Vector{}).Update(0)
|
|
v2 := (protocol.Vector{}).Update(1)
|
|
|
|
// Requires more than 10 blocks to work.
|
|
blocks := make([]protocol.BlockInfo, 11)
|
|
|
|
state1 := &sharedPullerState{
|
|
folder: "folder",
|
|
file: protocol.FileInfo{
|
|
Name: "state1",
|
|
Version: v1,
|
|
Blocks: blocks,
|
|
},
|
|
mut: sync.NewRWMutex(),
|
|
availableUpdated: time.Now(),
|
|
}
|
|
p.registry["folder"]["1"] = state1
|
|
|
|
// Has no blocks, hence no message is sent
|
|
sendMsgs(p)
|
|
expectEmpty()
|
|
|
|
// Returns update for puller with new extra blocks
|
|
state1.available = []int32{1}
|
|
sendMsgs(p)
|
|
|
|
expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{1}, true)
|
|
expectEmpty()
|
|
|
|
// Does nothing if nothing changes
|
|
sendMsgs(p)
|
|
expectEmpty()
|
|
|
|
// Does nothing if timestamp updated, but no new blocks (should never happen)
|
|
state1.availableUpdated = tick()
|
|
|
|
sendMsgs(p)
|
|
expectEmpty()
|
|
|
|
// Does not return an update if date blocks change but date does not (should never happen)
|
|
state1.available = []int32{1, 2}
|
|
|
|
sendMsgs(p)
|
|
expectEmpty()
|
|
|
|
// If the date and blocks changes, returns only the diff
|
|
state1.availableUpdated = tick()
|
|
|
|
sendMsgs(p)
|
|
|
|
expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{2}, true)
|
|
expectEmpty()
|
|
|
|
// Returns forget and update if puller version has changed
|
|
state1.file.Version = v2
|
|
|
|
sendMsgs(p)
|
|
|
|
expect(0, state1, protocol.UpdateTypeForget, v1, nil, false)
|
|
expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1, 2}, true)
|
|
expectEmpty()
|
|
|
|
// Returns forget and append if sharedPullerState creation timer changes.
|
|
|
|
state1.available = []int32{1}
|
|
state1.availableUpdated = tick()
|
|
state1.created = tick()
|
|
|
|
sendMsgs(p)
|
|
|
|
expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
|
|
expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1}, true)
|
|
expectEmpty()
|
|
|
|
// Sends an empty update if new file exists, but does not have any blocks yet. (To indicate that the old blocks are no longer available)
|
|
state1.file.Version = v1
|
|
state1.available = nil
|
|
state1.availableUpdated = tick()
|
|
|
|
sendMsgs(p)
|
|
|
|
expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
|
|
expect(1, state1, protocol.UpdateTypeAppend, v1, nil, true)
|
|
expectEmpty()
|
|
|
|
// Updates for multiple files and folders can be combined
|
|
state1.available = []int32{1, 2, 3}
|
|
state1.availableUpdated = tick()
|
|
|
|
state2 := &sharedPullerState{
|
|
folder: "folder2",
|
|
file: protocol.FileInfo{
|
|
Name: "state2",
|
|
Version: v1,
|
|
Blocks: blocks,
|
|
},
|
|
mut: sync.NewRWMutex(),
|
|
available: []int32{1, 2, 3},
|
|
availableUpdated: time.Now(),
|
|
}
|
|
state3 := &sharedPullerState{
|
|
folder: "folder",
|
|
file: protocol.FileInfo{
|
|
Name: "state3",
|
|
Version: v1,
|
|
Blocks: blocks,
|
|
},
|
|
mut: sync.NewRWMutex(),
|
|
available: []int32{1, 2, 3},
|
|
availableUpdated: time.Now(),
|
|
}
|
|
state4 := &sharedPullerState{
|
|
folder: "folder2",
|
|
file: protocol.FileInfo{
|
|
Name: "state4",
|
|
Version: v1,
|
|
Blocks: blocks,
|
|
},
|
|
mut: sync.NewRWMutex(),
|
|
available: []int32{1, 2, 3},
|
|
availableUpdated: time.Now(),
|
|
}
|
|
p.registry["folder2"]["2"] = state2
|
|
p.registry["folder"]["3"] = state3
|
|
p.registry["folder2"]["4"] = state4
|
|
|
|
sendMsgs(p)
|
|
|
|
expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
|
|
expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
|
|
expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
|
|
expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
|
|
expectEmpty()
|
|
|
|
// Returns forget if puller no longer exists, as well as updates if it has been updated.
|
|
state1.available = []int32{1, 2, 3, 4, 5}
|
|
state1.availableUpdated = tick()
|
|
state2.available = []int32{1, 2, 3, 4, 5}
|
|
state2.availableUpdated = tick()
|
|
|
|
delete(p.registry["folder"], "3")
|
|
delete(p.registry["folder2"], "4")
|
|
|
|
sendMsgs(p)
|
|
|
|
expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
|
|
expect(-1, state3, protocol.UpdateTypeForget, v1, nil, true)
|
|
expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
|
|
expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
|
|
expectEmpty()
|
|
|
|
// Deletions are sent only once (actual bug I found writing the tests)
|
|
sendMsgs(p)
|
|
sendMsgs(p)
|
|
expectEmpty()
|
|
|
|
// Not sent for "inactive" (symlinks, dirs, or wrong folder) pullers
|
|
// Directory
|
|
state5 := &sharedPullerState{
|
|
folder: "folder",
|
|
file: protocol.FileInfo{
|
|
Name: "state5",
|
|
Version: v1,
|
|
Type: protocol.FileInfoTypeDirectory,
|
|
Blocks: blocks,
|
|
},
|
|
mut: sync.NewRWMutex(),
|
|
available: []int32{1, 2, 3},
|
|
availableUpdated: time.Now(),
|
|
}
|
|
// Symlink
|
|
state6 := &sharedPullerState{
|
|
folder: "folder",
|
|
file: protocol.FileInfo{
|
|
Name: "state6",
|
|
Version: v1,
|
|
Type: protocol.FileInfoTypeSymlink,
|
|
},
|
|
mut: sync.NewRWMutex(),
|
|
available: []int32{1, 2, 3},
|
|
availableUpdated: time.Now(),
|
|
}
|
|
// Some other directory
|
|
state7 := &sharedPullerState{
|
|
folder: "folderXXX",
|
|
file: protocol.FileInfo{
|
|
Name: "state7",
|
|
Version: v1,
|
|
Blocks: blocks,
|
|
},
|
|
mut: sync.NewRWMutex(),
|
|
available: []int32{1, 2, 3},
|
|
availableUpdated: time.Now(),
|
|
}
|
|
// Less than 10 blocks
|
|
state8 := &sharedPullerState{
|
|
folder: "folder",
|
|
file: protocol.FileInfo{
|
|
Name: "state8",
|
|
Version: v1,
|
|
Blocks: blocks[:3],
|
|
},
|
|
mut: sync.NewRWMutex(),
|
|
available: []int32{1, 2, 3},
|
|
availableUpdated: time.Now(),
|
|
}
|
|
p.registry["folder"]["5"] = state5
|
|
p.registry["folder"]["6"] = state6
|
|
p.registry["folderXXX"]["7"] = state7
|
|
p.registry["folder"]["8"] = state8
|
|
|
|
sendMsgs(p)
|
|
|
|
expectEmpty()
|
|
|
|
// Device is no longer subscribed to a particular folder
|
|
delete(p.registry["folder"], "1") // Clean up first
|
|
delete(p.registry["folder2"], "2") // Clean up first
|
|
|
|
sendMsgs(p)
|
|
expect(-1, state1, protocol.UpdateTypeForget, v1, nil, true)
|
|
expect(-1, state2, protocol.UpdateTypeForget, v1, nil, true)
|
|
|
|
expectEmpty()
|
|
|
|
p.registry["folder"]["1"] = state1
|
|
p.registry["folder2"]["2"] = state2
|
|
p.registry["folder"]["3"] = state3
|
|
p.registry["folder2"]["4"] = state4
|
|
|
|
sendMsgs(p)
|
|
|
|
expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
|
|
expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
|
|
expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
|
|
expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
|
|
expectEmpty()
|
|
|
|
p.temporaryIndexUnsubscribe(fc)
|
|
p.temporaryIndexSubscribe(fc, []string{"folder"})
|
|
|
|
sendMsgs(p)
|
|
|
|
// See progressemitter.go for explanation why this is commented out.
|
|
// Search for state.cleanup
|
|
//expect(-1, state2, protocol.UpdateTypeForget, v1, nil, false)
|
|
//expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
|
|
|
|
expectEmpty()
|
|
|
|
// Cleanup when device no longer exists
|
|
p.temporaryIndexUnsubscribe(fc)
|
|
|
|
sendMsgs(p)
|
|
_, ok := p.sentDownloadStates[fc.ID()]
|
|
if ok {
|
|
t.Error("Should not be there")
|
|
}
|
|
}
|
|
|
|
func sendMsgs(p *ProgressEmitter) {
|
|
p.mut.Lock()
|
|
updates := p.computeProgressUpdates()
|
|
p.mut.Unlock()
|
|
ctx := context.Background()
|
|
for _, update := range updates {
|
|
update.send(ctx)
|
|
}
|
|
}
|