syncthing/lib/model/progressemitter_test.go
2017-02-09 08:04:16 +01:00

445 lines
11 KiB
Go

// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package model
import (
"fmt"
"path/filepath"
"runtime"
"testing"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
var timeout = 100 * time.Millisecond
func caller(skip int) string {
_, file, line, ok := runtime.Caller(skip + 1)
if !ok {
return "unknown"
}
return fmt.Sprintf("%s:%d", filepath.Base(file), line)
}
func expectEvent(w *events.Subscription, t *testing.T, size int) {
event, err := w.Poll(timeout)
if err != nil {
t.Fatal("Unexpected error:", err, "at", caller(1))
}
if event.Type != events.DownloadProgress {
t.Fatal("Unexpected event:", event, "at", caller(1))
}
data := event.Data.(map[string]map[string]*pullerProgress)
if len(data) != size {
t.Fatal("Unexpected event data size:", data, "at", caller(1))
}
}
func expectTimeout(w *events.Subscription, t *testing.T) {
_, err := w.Poll(timeout)
if err != events.ErrTimeout {
t.Fatal("Unexpected non-Timeout error:", err, "at", caller(1))
}
}
func TestProgressEmitter(t *testing.T) {
w := events.Default.Subscribe(events.DownloadProgress)
c := config.Wrap("/tmp/test", config.Configuration{})
c.SetOptions(config.OptionsConfiguration{
ProgressUpdateIntervalS: 0,
})
p := NewProgressEmitter(c)
go p.Serve()
p.interval = 0
expectTimeout(w, t)
s := sharedPullerState{
updated: time.Now(),
mut: sync.NewRWMutex(),
}
p.Register(&s)
expectEvent(w, t, 1)
expectTimeout(w, t)
s.copyDone(protocol.BlockInfo{})
expectEvent(w, t, 1)
expectTimeout(w, t)
s.copiedFromOrigin()
expectEvent(w, t, 1)
expectTimeout(w, t)
s.pullStarted()
expectEvent(w, t, 1)
expectTimeout(w, t)
s.pullDone(protocol.BlockInfo{})
expectEvent(w, t, 1)
expectTimeout(w, t)
p.Deregister(&s)
expectEvent(w, t, 0)
expectTimeout(w, t)
}
func TestSendDownloadProgressMessages(t *testing.T) {
c := config.Wrap("/tmp/test", config.Configuration{})
c.SetOptions(config.OptionsConfiguration{
ProgressUpdateIntervalS: 0,
TempIndexMinBlocks: 10,
})
fc := &fakeConnection{}
p := NewProgressEmitter(c)
p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"})
expect := func(updateIdx int, state *sharedPullerState, updateType protocol.FileDownloadProgressUpdateType, version protocol.Vector, blocks []int32, remove bool) {
messageIdx := -1
for i, msg := range fc.downloadProgressMessages {
if msg.folder == state.folder {
messageIdx = i
break
}
}
if messageIdx < 0 {
t.Errorf("Message for folder %s does not exist at %s", state.folder, caller(1))
}
msg := fc.downloadProgressMessages[messageIdx]
// Don't know the index (it's random due to iterating maps)
if updateIdx == -1 {
for i, upd := range msg.updates {
if upd.Name == state.file.Name {
updateIdx = i
break
}
}
}
if updateIdx == -1 {
t.Errorf("Could not find update for %s at %s", state.file.Name, caller(1))
}
if updateIdx > len(msg.updates)-1 {
t.Errorf("Update at index %d does not exist at %s", updateIdx, caller(1))
}
update := msg.updates[updateIdx]
if update.UpdateType != updateType {
t.Errorf("Wrong update type at %s", caller(1))
}
if !update.Version.Equal(version) {
t.Errorf("Wrong version at %s", caller(1))
}
if len(update.BlockIndexes) != len(blocks) {
t.Errorf("Wrong indexes. Have %d expect %d at %s", len(update.BlockIndexes), len(blocks), caller(1))
}
for i := range update.BlockIndexes {
if update.BlockIndexes[i] != blocks[i] {
t.Errorf("Index %d incorrect at %s", i, caller(1))
}
}
if remove {
fc.downloadProgressMessages = append(fc.downloadProgressMessages[:messageIdx], fc.downloadProgressMessages[messageIdx+1:]...)
}
}
expectEmpty := func() {
if len(fc.downloadProgressMessages) > 0 {
t.Errorf("Still have something at %s: %#v", caller(1), fc.downloadProgressMessages)
}
}
now := time.Now()
tick := func() time.Time {
now = now.Add(time.Second)
return now
}
if len(fc.downloadProgressMessages) != 0 {
t.Error("Expected no requests")
}
v1 := (protocol.Vector{}).Update(0)
v2 := (protocol.Vector{}).Update(1)
// Requires more than 10 blocks to work.
blocks := make([]protocol.BlockInfo, 11, 11)
state1 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state1",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
availableUpdated: time.Now(),
}
p.registry["1"] = state1
// Has no blocks, hence no message is sent
p.sendDownloadProgressMessages()
expectEmpty()
// Returns update for puller with new extra blocks
state1.available = []int32{1}
p.sendDownloadProgressMessages()
expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{1}, true)
expectEmpty()
// Does nothing if nothing changes
p.sendDownloadProgressMessages()
expectEmpty()
// Does nothing if timestamp updated, but no new blocks (should never happen)
state1.availableUpdated = tick()
p.sendDownloadProgressMessages()
expectEmpty()
// Does not return an update if date blocks change but date does not (should never happen)
state1.available = []int32{1, 2}
p.sendDownloadProgressMessages()
expectEmpty()
// If the date and blocks changes, returns only the diff
state1.availableUpdated = tick()
p.sendDownloadProgressMessages()
expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{2}, true)
expectEmpty()
// Returns forget and update if puller version has changed
state1.file.Version = v2
p.sendDownloadProgressMessages()
expect(0, state1, protocol.UpdateTypeForget, v1, nil, false)
expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1, 2}, true)
expectEmpty()
// Returns forget and append if sharedPullerState creation timer changes.
state1.available = []int32{1}
state1.availableUpdated = tick()
state1.created = tick()
p.sendDownloadProgressMessages()
expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1}, true)
expectEmpty()
// Sends an empty update if new file exists, but does not have any blocks yet. (To indicate that the old blocks are no longer available)
state1.file.Version = v1
state1.available = nil
state1.availableUpdated = tick()
p.sendDownloadProgressMessages()
expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
expect(1, state1, protocol.UpdateTypeAppend, v1, nil, true)
expectEmpty()
// Updates for multiple files and folders can be combined
state1.available = []int32{1, 2, 3}
state1.availableUpdated = tick()
state2 := &sharedPullerState{
folder: "folder2",
file: protocol.FileInfo{
Name: "state2",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
state3 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state3",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
state4 := &sharedPullerState{
folder: "folder2",
file: protocol.FileInfo{
Name: "state4",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
p.registry["2"] = state2
p.registry["3"] = state3
p.registry["4"] = state4
p.sendDownloadProgressMessages()
expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
expectEmpty()
// Returns forget if puller no longer exists, as well as updates if it has been updated.
state1.available = []int32{1, 2, 3, 4, 5}
state1.availableUpdated = tick()
state2.available = []int32{1, 2, 3, 4, 5}
state2.availableUpdated = tick()
delete(p.registry, "3")
delete(p.registry, "4")
p.sendDownloadProgressMessages()
expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
expect(-1, state3, protocol.UpdateTypeForget, v1, nil, true)
expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
expectEmpty()
// Deletions are sent only once (actual bug I found writing the tests)
p.sendDownloadProgressMessages()
p.sendDownloadProgressMessages()
expectEmpty()
// Not sent for "inactive" (symlinks, dirs, or wrong folder) pullers
// Directory
state5 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state5",
Version: v1,
Type: protocol.FileInfoTypeDirectory,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
// Symlink
state6 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state6",
Version: v1,
Type: protocol.FileInfoTypeSymlink,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
// Some other directory
state7 := &sharedPullerState{
folder: "folderXXX",
file: protocol.FileInfo{
Name: "state7",
Version: v1,
Blocks: blocks,
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
// Less than 10 blocks
state8 := &sharedPullerState{
folder: "folder",
file: protocol.FileInfo{
Name: "state8",
Version: v1,
Blocks: blocks[:3],
},
mut: sync.NewRWMutex(),
available: []int32{1, 2, 3},
availableUpdated: time.Now(),
}
p.registry["5"] = state5
p.registry["6"] = state6
p.registry["7"] = state7
p.registry["8"] = state8
p.sendDownloadProgressMessages()
expectEmpty()
// Device is no longer subscribed to a particular folder
delete(p.registry, "1") // Clean up first
delete(p.registry, "2") // Clean up first
p.sendDownloadProgressMessages()
expect(-1, state1, protocol.UpdateTypeForget, v1, nil, true)
expect(-1, state2, protocol.UpdateTypeForget, v1, nil, true)
expectEmpty()
p.registry["1"] = state1
p.registry["2"] = state2
p.registry["3"] = state3
p.registry["4"] = state4
p.sendDownloadProgressMessages()
expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
expectEmpty()
p.temporaryIndexUnsubscribe(fc)
p.temporaryIndexSubscribe(fc, []string{"folder"})
p.sendDownloadProgressMessages()
// See progressemitter.go for explanation why this is commented out.
// Search for state.cleanup
//expect(-1, state2, protocol.UpdateTypeForget, v1, nil, false)
//expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
expectEmpty()
// Cleanup when device no longer exists
p.temporaryIndexUnsubscribe(fc)
p.sendDownloadProgressMessages()
_, ok := p.sentDownloadStates[fc.ID()]
if ok {
t.Error("Should not be there")
}
}