Merge pull request #1171 from syncthing/jobqueue

Add job queue (replaces #1060)
This commit is contained in:
Audrius Butkevicius 2015-01-02 15:18:50 +00:00
commit 1b066d6965
10 changed files with 456 additions and 40 deletions

View File

@ -149,6 +149,7 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro
postRestMux.HandleFunc("/rest/shutdown", restPostShutdown) postRestMux.HandleFunc("/rest/shutdown", restPostShutdown)
postRestMux.HandleFunc("/rest/upgrade", restPostUpgrade) postRestMux.HandleFunc("/rest/upgrade", restPostUpgrade)
postRestMux.HandleFunc("/rest/scan", withModel(m, restPostScan)) postRestMux.HandleFunc("/rest/scan", withModel(m, restPostScan))
postRestMux.HandleFunc("/rest/bump", withModel(m, restPostBump))
// A handler that splits requests between the two above and disables // A handler that splits requests between the two above and disables
// caching // caching
@ -314,19 +315,12 @@ func restGetNeed(m *model.Model, w http.ResponseWriter, r *http.Request) {
var qs = r.URL.Query() var qs = r.URL.Query()
var folder = qs.Get("folder") var folder = qs.Get("folder")
files := m.NeedFolderFilesLimited(folder, 100) // max 100 files progress, queued, rest := m.NeedFolderFiles(folder, 100)
// Convert the struct to a more loose structure, and inject the size. // Convert the struct to a more loose structure, and inject the size.
output := make([]map[string]interface{}, 0, len(files)) output := map[string][]map[string]interface{}{
for _, file := range files { "progress": toNeedSlice(progress),
output = append(output, map[string]interface{}{ "queued": toNeedSlice(queued),
"Name": file.Name, "rest": toNeedSlice(rest),
"Flags": file.Flags,
"Modified": file.Modified,
"Version": file.Version,
"LocalVersion": file.LocalVersion,
"NumBlocks": file.NumBlocks,
"Size": protocol.BlocksToSize(file.NumBlocks),
})
} }
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
@ -650,6 +644,14 @@ func restPostScan(m *model.Model, w http.ResponseWriter, r *http.Request) {
} }
} }
func restPostBump(m *model.Model, w http.ResponseWriter, r *http.Request) {
qs := r.URL.Query()
folder := qs.Get("folder")
file := qs.Get("file")
m.BringToFront(folder, file)
restGetNeed(m, w, r)
}
func getQR(w http.ResponseWriter, r *http.Request) { func getQR(w http.ResponseWriter, r *http.Request) {
var qs = r.URL.Query() var qs = r.URL.Query()
var text = qs.Get("text") var text = qs.Get("text")
@ -775,3 +777,19 @@ func mimeTypeForFile(file string) string {
return mime.TypeByExtension(ext) return mime.TypeByExtension(ext)
} }
} }
func toNeedSlice(files []protocol.FileInfoTruncated) []map[string]interface{} {
output := make([]map[string]interface{}, len(files))
for i, file := range files {
output[i] = map[string]interface{}{
"Name": file.Name,
"Flags": file.Flags,
"Modified": file.Modified,
"Version": file.Version,
"LocalVersion": file.LocalVersion,
"NumBlocks": file.NumBlocks,
"Size": protocol.BlocksToSize(file.NumBlocks),
}
}
return output
}

View File

@ -801,21 +801,37 @@
<hr/> <hr/>
<table class="table table-striped table-condensed"> <table class="table table-striped table-condensed">
<tr ng-repeat="f in needed" ng-init="a = needAction(f)"> <tr ng-repeat="f in needed.progress" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td> <td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td title="{{f.Name}}">{{f.Name | basename}}</td> <td title="{{f.Name}}">{{f.Name | basename}}</td>
<td> <td ng-if="a == 'sync' && progress[neededFolder] && progress[neededFolder][f.Name]">
<span ng-if="a == 'sync' && progress[neededFolder] && progress[neededFolder][f.Name]"> <div class="progress">
<div class="progress"> <div class="progress-bar progress-bar-success" style="width: {{progress[neededFolder][f.Name].Reused}}%"></div>
<div class="progress-bar progress-bar-success" style="width: {{progress[neededFolder][f.Name].Reused}}%"></div> <div class="progress-bar" style="width: {{progress[neededFolder][f.Name].CopiedFromOrigin}}%"></div>
<div class="progress-bar" style="width: {{progress[neededFolder][f.Name].CopiedFromOrigin}}%"></div> <div class="progress-bar progress-bar-info" style="width: {{progress[neededFolder][f.Name].CopiedFromElsewhere}}%"></div>
<div class="progress-bar progress-bar-info" style="width: {{progress[neededFolder][f.Name].CopiedFromElsewhere}}%"></div> <div class="progress-bar progress-bar-warning" style="width: {{progress[neededFolder][f.Name].Pulled}}%"></div>
<div class="progress-bar progress-bar-warning" style="width: {{progress[neededFolder][f.Name].Pulled}}%"></div> <div class="progress-bar progress-bar-danger progress-bar-striped active" style="width: {{progress[neededFolder][f.Name].Pulling}}%"></div>
<div class="progress-bar progress-bar-danger progress-bar-striped active" style="width: {{progress[neededFolder][f.Name].Pulling}}%"></div> <span class="show frontal">
<span class="show frontal">{{progress[neededFolder][f.Name].BytesDone | binary}}B / {{progress[neededFolder][f.Name].BytesTotal | binary}}B</span> {{progress[neededFolder][f.Name].BytesDone | binary}}B / {{progress[neededFolder][f.Name].BytesTotal | binary}}B
</div> </span>
</span> </div>
</td> </td>
<td class="text-right small-data" ng-if="a != 'sync' || !progress[neededFolder] || !progress[neededFolder][f.Name]">
<span ng-if="f.Size > 0">{{f.Size | binary}}B</span>
</td>
</tr>
<tr ng-repeat="f in needed.queued" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td title="{{f.Name}}">{{f.Name | basename}}</td>
<td class="text-right small-data">
<span ng-if="$index != 0" class="glyphicon glyphicon-chevron-up" ng-click="bumpFile(neededFolder, f.Name)"></span>
<span ng-if="f.Size > 0">{{f.Size | binary}}B</span>
</td>
</tr>
<tr ng-repeat="f in needed.rest" ng-init="a = needAction(f)">
<td class="small-data"><span class="glyphicon glyphicon-{{needIcons[a]}}"></span> {{needActions[a]}}</td>
<td title="{{f.Name}}">{{f.Name | basename}}</td>
<td class="text-right small-data"><span ng-if="f.Size > 0">{{f.Size | binary}}B</span></td>
</tr> </tr>
</table> </table>

View File

@ -1056,6 +1056,15 @@ angular.module('syncthing.core')
$http.post(urlbase + "/scan?folder=" + encodeURIComponent(folder)); $http.post(urlbase + "/scan?folder=" + encodeURIComponent(folder));
}; };
$scope.bumpFile = function (folder, file) {
$http.post(urlbase + "/bump?folder=" + encodeURIComponent(folder) + "&file=" + encodeURIComponent(file)).success(function (data) {
if ($scope.neededFolder == folder) {
console.log("bumpFile", folder, data);
$scope.needed = data;
}
});
};
// pseudo main. called on all definitions assigned // pseudo main. called on all definitions assigned
initController(); initController();
}); });

File diff suppressed because one or more lines are too long

View File

@ -79,6 +79,8 @@ const (
type service interface { type service interface {
Serve() Serve()
Stop() Stop()
Jobs() ([]string, []string) // In progress, Queued
BringToFront(string)
} }
type Model struct { type Model struct {
@ -189,6 +191,7 @@ func (m *Model) StartFolderRW(folder string) {
copiers: cfg.Copiers, copiers: cfg.Copiers,
pullers: cfg.Pullers, pullers: cfg.Pullers,
finishers: cfg.Finishers, finishers: cfg.Finishers,
queue: newJobQueue(),
} }
m.folderRunners[folder] = p m.folderRunners[folder] = p
m.fmut.Unlock() m.fmut.Unlock()
@ -416,22 +419,50 @@ func (m *Model) NeedSize(folder string) (files int, bytes int64) {
return return
} }
// NeedFiles returns the list of currently needed files, stopping at maxFiles // NeedFiles returns the list of currently needed files in progress, queued,
// files. Limit <= 0 is ignored. // and to be queued on next puller iteration. Also takes a soft cap which is
func (m *Model) NeedFolderFilesLimited(folder string, maxFiles int) []protocol.FileInfoTruncated { // only respected when adding files from the model rather than the runner queue.
func (m *Model) NeedFolderFiles(folder string, max int) ([]protocol.FileInfoTruncated, []protocol.FileInfoTruncated, []protocol.FileInfoTruncated) {
defer m.leveldbPanicWorkaround() defer m.leveldbPanicWorkaround()
m.fmut.RLock() m.fmut.RLock()
defer m.fmut.RUnlock() defer m.fmut.RUnlock()
if rf, ok := m.folderFiles[folder]; ok { if rf, ok := m.folderFiles[folder]; ok {
fs := make([]protocol.FileInfoTruncated, 0, maxFiles) var progress, queued, rest []protocol.FileInfoTruncated
rf.WithNeedTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool { var seen map[string]bool
fs = append(fs, f.(protocol.FileInfoTruncated))
return maxFiles <= 0 || len(fs) < maxFiles runner, ok := m.folderRunners[folder]
}) if ok {
return fs progressNames, queuedNames := runner.Jobs()
progress = make([]protocol.FileInfoTruncated, len(progressNames))
queued = make([]protocol.FileInfoTruncated, len(queuedNames))
seen = make(map[string]bool, len(progressNames)+len(queuedNames))
for i, name := range progressNames {
progress[i] = rf.GetGlobal(name).ToTruncated() /// XXX: Should implement GetGlobalTruncated directly
seen[name] = true
}
for i, name := range queuedNames {
queued[i] = rf.GetGlobal(name).ToTruncated() /// XXX: Should implement GetGlobalTruncated directly
seen[name] = true
}
}
left := max - len(progress) - len(queued)
if max < 1 || left > 0 {
rf.WithNeedTruncated(protocol.LocalDeviceID, func(f protocol.FileIntf) bool {
left--
ft := f.(protocol.FileInfoTruncated)
if !seen[ft.Name] {
rest = append(rest, ft)
}
return max < 1 || left > 0
})
}
return progress, queued, rest
} }
return nil return nil, nil, nil
} }
// Index is called when a new device is connected and we receive their full index. // Index is called when a new device is connected and we receive their full index.
@ -1336,7 +1367,7 @@ func (m *Model) RemoteLocalVersion(folder string) uint64 {
return ver return ver
} }
func (m *Model) availability(folder string, file string) []protocol.DeviceID { func (m *Model) availability(folder, file string) []protocol.DeviceID {
// Acquire this lock first, as the value returned from foldersFiles can // Acquire this lock first, as the value returned from foldersFiles can
// gen heavily modified on Close() // gen heavily modified on Close()
m.pmut.RLock() m.pmut.RLock()
@ -1359,6 +1390,17 @@ func (m *Model) availability(folder string, file string) []protocol.DeviceID {
return availableDevices return availableDevices
} }
// Bump the given files priority in the job queue
func (m *Model) BringToFront(folder, file string) {
m.pmut.RLock()
defer m.pmut.RUnlock()
runner, ok := m.folderRunners[folder]
if ok {
runner.BringToFront(file)
}
}
func (m *Model) String() string { func (m *Model) String() string {
return fmt.Sprintf("model@%p", m) return fmt.Sprintf("model@%p", m)
} }

View File

@ -78,6 +78,7 @@ type Puller struct {
copiers int copiers int
pullers int pullers int
finishers int finishers int
queue *jobQueue
} }
// Serve will run scans and pulls. It will return when Stop()ed or on a // Serve will run scans and pulls. It will return when Stop()ed or on a
@ -337,15 +338,23 @@ func (p *Puller) pullerIteration(checksum bool, ignores *ignore.Matcher) int {
p.handleDir(file) p.handleDir(file)
default: default:
// A new or changed file or symlink. This is the only case where we // A new or changed file or symlink. This is the only case where we
// do stuff in the background; the other three are done // do stuff concurrently in the background
// synchronously. p.queue.Push(file.Name)
p.handleFile(file, copyChan, finisherChan)
} }
changed++ changed++
return true return true
}) })
for {
fileName, ok := p.queue.Pop()
if !ok {
break
}
f := p.model.CurrentGlobalFile(p.folder, fileName)
p.handleFile(f, copyChan, finisherChan)
}
// Signal copy and puller routines that we are done with the in data for // Signal copy and puller routines that we are done with the in data for
// this iteration. Wait for them to finish. // this iteration. Wait for them to finish.
close(copyChan) close(copyChan)
@ -483,6 +492,7 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
if debug { if debug {
l.Debugln(p, "taking shortcut on", file.Name) l.Debugln(p, "taking shortcut on", file.Name)
} }
p.queue.Done(file.Name)
if file.IsSymlink() { if file.IsSymlink() {
p.shortcutSymlink(curFile, file) p.shortcutSymlink(curFile, file)
} else { } else {
@ -850,6 +860,7 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
continue continue
} }
p.queue.Done(state.file.Name)
p.performFinish(state) p.performFinish(state)
p.model.receivedFile(p.folder, state.file.Name) p.model.receivedFile(p.folder, state.file.Name)
if p.progressEmitter != nil { if p.progressEmitter != nil {
@ -859,6 +870,15 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
} }
} }
// Moves the given filename to the front of the job queue
func (p *Puller) BringToFront(filename string) {
p.queue.BringToFront(filename)
}
func (p *Puller) Jobs() ([]string, []string) {
return p.queue.Jobs()
}
func invalidateFolder(cfg *config.Configuration, folderID string, err error) { func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
for i := range cfg.Folders { for i := range cfg.Folders {
folder := &cfg.Folders[i] folder := &cfg.Folders[i]

94
internal/model/queue.go Normal file
View File

@ -0,0 +1,94 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package model
import "sync"
type jobQueue struct {
progress []string
queued []string
mut sync.Mutex
}
func newJobQueue() *jobQueue {
return &jobQueue{}
}
func (q *jobQueue) Push(file string) {
q.mut.Lock()
q.queued = append(q.queued, file)
q.mut.Unlock()
}
func (q *jobQueue) Pop() (string, bool) {
q.mut.Lock()
defer q.mut.Unlock()
if len(q.queued) == 0 {
return "", false
}
var f string
f = q.queued[0]
q.queued = q.queued[1:]
q.progress = append(q.progress, f)
return f, true
}
func (q *jobQueue) BringToFront(filename string) {
q.mut.Lock()
defer q.mut.Unlock()
for i, cur := range q.queued {
if cur == filename {
if i > 0 {
// Shift the elements before the selected element one step to
// the right, overwriting the selected element
copy(q.queued[1:i+1], q.queued[0:])
// Put the selected element at the front
q.queued[0] = cur
}
return
}
}
}
func (q *jobQueue) Done(file string) {
q.mut.Lock()
defer q.mut.Unlock()
for i := range q.progress {
if q.progress[i] == file {
copy(q.progress[i:], q.progress[i+1:])
q.progress = q.progress[:len(q.progress)-1]
return
}
}
}
func (q *jobQueue) Jobs() ([]string, []string) {
q.mut.Lock()
defer q.mut.Unlock()
progress := make([]string, len(q.progress))
copy(progress, q.progress)
queued := make([]string, len(q.queued))
copy(queued, q.queued)
return progress, queued
}

View File

@ -0,0 +1,200 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package model
import (
"fmt"
"reflect"
"testing"
)
func TestJobQueue(t *testing.T) {
// Some random actions
q := newJobQueue()
q.Push("f1")
q.Push("f2")
q.Push("f3")
q.Push("f4")
progress, queued := q.Jobs()
if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length")
}
for i := 1; i < 5; i++ {
n, ok := q.Pop()
if !ok || n != fmt.Sprintf("f%d", i) {
t.Fatal("Wrong element")
}
progress, queued = q.Jobs()
if len(progress) != 1 || len(queued) != 3 {
t.Log(progress)
t.Log(queued)
t.Fatal("Wrong length")
}
q.Done(n)
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 3 {
t.Fatal("Wrong length", len(progress), len(queued))
}
q.Push(n)
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length")
}
q.Done("f5") // Does not exist
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length")
}
}
if len(q.progress) > 0 || len(q.queued) != 4 {
t.Fatal("Wrong length")
}
for i := 4; i > 0; i-- {
progress, queued = q.Jobs()
if len(progress) != 4-i || len(queued) != i {
t.Fatal("Wrong length")
}
s := fmt.Sprintf("f%d", i)
q.BringToFront(s)
progress, queued = q.Jobs()
if len(progress) != 4-i || len(queued) != i {
t.Fatal("Wrong length")
}
n, ok := q.Pop()
if !ok || n != s {
t.Fatal("Wrong element")
}
progress, queued = q.Jobs()
if len(progress) != 5-i || len(queued) != i-1 {
t.Fatal("Wrong length")
}
q.Done("f5") // Does not exist
progress, queued = q.Jobs()
if len(progress) != 5-i || len(queued) != i-1 {
t.Fatal("Wrong length")
}
}
_, ok := q.Pop()
if len(q.progress) != 4 || ok {
t.Fatal("Wrong length")
}
q.Done("f1")
q.Done("f2")
q.Done("f3")
q.Done("f4")
q.Done("f5") // Does not exist
_, ok = q.Pop()
if len(q.progress) != 0 || ok {
t.Fatal("Wrong length")
}
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 0 {
t.Fatal("Wrong length")
}
q.BringToFront("")
q.Done("f5") // Does not exist
progress, queued = q.Jobs()
if len(progress) != 0 || len(queued) != 0 {
t.Fatal("Wrong length")
}
}
func TestBringToFront(t *testing.T) {
q := newJobQueue()
q.Push("f1")
q.Push("f2")
q.Push("f3")
q.Push("f4")
_, queued := q.Jobs()
if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
t.Errorf("Incorrect order %v at start", queued)
}
q.BringToFront("f1") // corner case: does nothing
_, queued = q.Jobs()
if !reflect.DeepEqual(queued, []string{"f1", "f2", "f3", "f4"}) {
t.Errorf("Incorrect order %v", queued)
}
q.BringToFront("f3")
_, queued = q.Jobs()
if !reflect.DeepEqual(queued, []string{"f3", "f1", "f2", "f4"}) {
t.Errorf("Incorrect order %v", queued)
}
q.BringToFront("f2")
_, queued = q.Jobs()
if !reflect.DeepEqual(queued, []string{"f2", "f3", "f1", "f4"}) {
t.Errorf("Incorrect order %v", queued)
}
q.BringToFront("f4") // corner case: last element
_, queued = q.Jobs()
if !reflect.DeepEqual(queued, []string{"f4", "f2", "f3", "f1"}) {
t.Errorf("Incorrect order %v", queued)
}
}
func BenchmarkJobQueueBump(b *testing.B) {
files := genFiles(b.N)
q := newJobQueue()
for _, f := range files {
q.Push(f.Name)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.BringToFront(files[i].Name)
}
}
func BenchmarkJobQueuePushPopDone10k(b *testing.B) {
files := genFiles(10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
q := newJobQueue()
for _, f := range files {
q.Push(f.Name)
}
for range files {
n, _ := q.Pop()
q.Done(n)
}
}
}

View File

@ -75,3 +75,9 @@ func (s *Scanner) Stop() {
func (s *Scanner) String() string { func (s *Scanner) String() string {
return fmt.Sprintf("scanner/%s@%p", s.folder, s) return fmt.Sprintf("scanner/%s@%p", s.folder, s)
} }
func (s *Scanner) BringToFront(string) {}
func (s *Scanner) Jobs() ([]string, []string) {
return nil, nil
}

View File

@ -69,6 +69,17 @@ func (f FileInfo) HasPermissionBits() bool {
return f.Flags&FlagNoPermBits == 0 return f.Flags&FlagNoPermBits == 0
} }
func (f FileInfo) ToTruncated() FileInfoTruncated {
return FileInfoTruncated{
Name: f.Name,
Flags: f.Flags,
Modified: f.Modified,
Version: f.Version,
LocalVersion: f.LocalVersion,
NumBlocks: uint32(len(f.Blocks)),
}
}
// Used for unmarshalling a FileInfo structure but skipping the actual block list // Used for unmarshalling a FileInfo structure but skipping the actual block list
type FileInfoTruncated struct { type FileInfoTruncated struct {
Name string // max:8192 Name string // max:8192