lib/model: Make jobQueue.Jobs paginated (fixes #5754) (#5804)

* lib/model: Make jobQueue.Jobs paginated (fixes #5754)

* fix, no test yet

* add test
This commit is contained in:
Simon Frei 2019-06-27 20:25:38 +02:00 committed by Audrius Butkevicius
parent afde0727fe
commit 3c7e7e971d
5 changed files with 208 additions and 65 deletions

View File

@ -220,8 +220,8 @@ func (f *folder) SchedulePull() {
} }
} }
func (f *folder) Jobs() ([]string, []string) { func (f *folder) Jobs(_, _ int) ([]string, []string, int) {
return nil, nil return nil, nil, 0
} }
func (f *folder) Scan(subdirs []string) error { func (f *folder) Scan(subdirs []string) error {

View File

@ -1599,8 +1599,8 @@ func (f *sendReceiveFolder) BringToFront(filename string) {
f.queue.BringToFront(filename) f.queue.BringToFront(filename)
} }
func (f *sendReceiveFolder) Jobs() ([]string, []string) { func (f *sendReceiveFolder) Jobs(page, perpage int) ([]string, []string, int) {
return f.queue.Jobs() return f.queue.Jobs(page, perpage)
} }
// dbUpdaterRoutine aggregates db updates and commits them in batches no // dbUpdaterRoutine aggregates db updates and commits them in batches no

View File

@ -47,8 +47,8 @@ type service interface {
Override() Override()
Revert() Revert()
DelayScan(d time.Duration) DelayScan(d time.Duration)
SchedulePull() // something relevant changed, we should try a pull SchedulePull() // something relevant changed, we should try a pull
Jobs() ([]string, []string) // In progress, Queued Jobs(page, perpage int) ([]string, []string, int) // In progress, Queued, skipped
Scan(subs []string) error Scan(subs []string) error
Serve() Serve()
Stop() Stop()
@ -815,8 +815,7 @@ func (m *model) NeedSize(folder string) db.Counts {
} }
// NeedFolderFiles returns paginated list of currently needed files in // NeedFolderFiles returns paginated list of currently needed files in
// progress, queued, and to be queued on next puller iteration, as well as the // progress, queued, and to be queued on next puller iteration.
// total number of files currently needed.
func (m *model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated) { func (m *model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated) {
m.fmut.RLock() m.fmut.RLock()
rf, rfOk := m.folderFiles[folder] rf, rfOk := m.folderFiles[folder]
@ -835,15 +834,7 @@ func (m *model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfo
get := perpage get := perpage
if runnerOk { if runnerOk {
allProgressNames, allQueuedNames := runner.Jobs() progressNames, queuedNames, skipped := runner.Jobs(page, perpage)
var progressNames, queuedNames []string
progressNames, skip, get = getChunk(allProgressNames, skip, get)
queuedNames, skip, get = getChunk(allQueuedNames, skip, get)
progress = make([]db.FileInfoTruncated, len(progressNames))
queued = make([]db.FileInfoTruncated, len(queuedNames))
seen = make(map[string]struct{}, len(progressNames)+len(queuedNames))
for i, name := range progressNames { for i, name := range progressNames {
if f, ok := rf.GetGlobalTruncated(name); ok { if f, ok := rf.GetGlobalTruncated(name); ok {
@ -858,6 +849,12 @@ func (m *model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfo
seen[name] = struct{}{} seen[name] = struct{}{}
} }
} }
get -= len(seen)
if get == 0 {
return progress, queued, nil
}
skip -= skipped
} }
rest = make([]db.FileInfoTruncated, 0, perpage) rest = make([]db.FileInfoTruncated, 0, perpage)
@ -2613,20 +2610,6 @@ func mapDevices(devices []protocol.DeviceID) map[protocol.DeviceID]struct{} {
return m return m
} }
// Skips `skip` elements and retrieves up to `get` elements from a given slice.
// Returns the resulting slice, plus how much elements are left to skip or
// copy to satisfy the values which were provided, given the slice is not
// big enough.
func getChunk(data []string, skip, get int) ([]string, int, int) {
l := len(data)
if l <= skip {
return []string{}, skip - l, get
} else if l < skip+get {
return data[skip:l], 0, get - (l - skip)
}
return data[skip : skip+get], 0, 0
}
func readOffsetIntoBuf(fs fs.Filesystem, file string, offset int64, buf []byte) error { func readOffsetIntoBuf(fs fs.Filesystem, file string, offset int64, buf []byte) error {
fd, err := fs.Open(file) fd, err := fs.Open(file)
if err != nil { if err != nil {

View File

@ -84,19 +84,46 @@ func (q *jobQueue) Done(file string) {
} }
} }
func (q *jobQueue) Jobs() ([]string, []string) { // Jobs returns a paginated list of file currently being pulled and files queued
// to be pulled. It also returns how many items were skipped.
func (q *jobQueue) Jobs(page, perpage int) ([]string, []string, int) {
q.mut.Lock() q.mut.Lock()
defer q.mut.Unlock() defer q.mut.Unlock()
progress := make([]string, len(q.progress)) toSkip := (page - 1) * perpage
copy(progress, q.progress) plen := len(q.progress)
qlen := len(q.queued)
queued := make([]string, len(q.queued)) if tot := plen + qlen; tot <= toSkip {
for i := range q.queued { return nil, nil, tot
queued[i] = q.queued[i].name
} }
return progress, queued if plen >= toSkip+perpage {
progress := make([]string, perpage)
copy(progress, q.progress[toSkip:toSkip+perpage])
return progress, nil, toSkip
}
var progress []string
if plen > toSkip {
progress = make([]string, plen-toSkip)
copy(progress, q.progress[toSkip:plen])
toSkip = 0
} else {
toSkip -= plen
}
var queued []string
if qlen-toSkip < perpage-len(progress) {
queued = make([]string, qlen-toSkip)
} else {
queued = make([]string, perpage-len(progress))
}
for i := range queued {
queued[i] = q.queued[i+toSkip].name
}
return progress, queued, (page - 1) * perpage
} }
func (q *jobQueue) Shuffle() { func (q *jobQueue) Shuffle() {

View File

@ -22,9 +22,9 @@ func TestJobQueue(t *testing.T) {
q.Push("f3", 0, time.Time{}) q.Push("f3", 0, time.Time{})
q.Push("f4", 0, time.Time{}) q.Push("f4", 0, time.Time{})
progress, queued := q.Jobs() progress, queued, _ := q.Jobs(1, 100)
if len(progress) != 0 || len(queued) != 4 { if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length") t.Fatal("Wrong length", len(progress), len(queued))
} }
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
@ -32,7 +32,7 @@ func TestJobQueue(t *testing.T) {
if !ok || n != fmt.Sprintf("f%d", i) { if !ok || n != fmt.Sprintf("f%d", i) {
t.Fatal("Wrong element") t.Fatal("Wrong element")
} }
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 1 || len(queued) != 3 { if len(progress) != 1 || len(queued) != 3 {
t.Log(progress) t.Log(progress)
t.Log(queued) t.Log(queued)
@ -40,19 +40,19 @@ func TestJobQueue(t *testing.T) {
} }
q.Done(n) q.Done(n)
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 0 || len(queued) != 3 { if len(progress) != 0 || len(queued) != 3 {
t.Fatal("Wrong length", len(progress), len(queued)) t.Fatal("Wrong length", len(progress), len(queued))
} }
q.Push(n, 0, time.Time{}) q.Push(n, 0, time.Time{})
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 0 || len(queued) != 4 { if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
q.Done("f5") // Does not exist q.Done("f5") // Does not exist
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 0 || len(queued) != 4 { if len(progress) != 0 || len(queued) != 4 {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
@ -63,7 +63,7 @@ func TestJobQueue(t *testing.T) {
} }
for i := 4; i > 0; i-- { for i := 4; i > 0; i-- {
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 4-i || len(queued) != i { if len(progress) != 4-i || len(queued) != i {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
@ -71,7 +71,7 @@ func TestJobQueue(t *testing.T) {
s := fmt.Sprintf("f%d", i) s := fmt.Sprintf("f%d", i)
q.BringToFront(s) q.BringToFront(s)
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 4-i || len(queued) != i { if len(progress) != 4-i || len(queued) != i {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
@ -80,13 +80,13 @@ func TestJobQueue(t *testing.T) {
if !ok || n != s { if !ok || n != s {
t.Fatal("Wrong element") t.Fatal("Wrong element")
} }
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 5-i || len(queued) != i-1 { if len(progress) != 5-i || len(queued) != i-1 {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
q.Done("f5") // Does not exist q.Done("f5") // Does not exist
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 5-i || len(queued) != i-1 { if len(progress) != 5-i || len(queued) != i-1 {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
@ -108,13 +108,13 @@ func TestJobQueue(t *testing.T) {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 0 || len(queued) != 0 { if len(progress) != 0 || len(queued) != 0 {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
q.BringToFront("") q.BringToFront("")
q.Done("f5") // Does not exist q.Done("f5") // Does not exist
progress, queued = q.Jobs() progress, queued, _ = q.Jobs(1, 100)
if len(progress) != 0 || len(queued) != 0 { if len(progress) != 0 || len(queued) != 0 {
t.Fatal("Wrong length") t.Fatal("Wrong length")
} }
@ -127,35 +127,35 @@ func TestBringToFront(t *testing.T) {
q.Push("f3", 0, time.Time{}) q.Push("f3", 0, time.Time{})
q.Push("f4", 0, time.Time{}) q.Push("f4", 0, time.Time{})
_, queued := q.Jobs() _, queued, _ := q.Jobs(1, 100)
if diff, equal := messagediff.PrettyDiff([]string{"f1", "f2", "f3", "f4"}, queued); !equal { if diff, equal := messagediff.PrettyDiff([]string{"f1", "f2", "f3", "f4"}, queued); !equal {
t.Errorf("Order does not match. Diff:\n%s", diff) t.Errorf("Order does not match. Diff:\n%s", diff)
} }
q.BringToFront("f1") // corner case: does nothing q.BringToFront("f1") // corner case: does nothing
_, queued = q.Jobs() _, queued, _ = q.Jobs(1, 100)
if diff, equal := messagediff.PrettyDiff([]string{"f1", "f2", "f3", "f4"}, queued); !equal { if diff, equal := messagediff.PrettyDiff([]string{"f1", "f2", "f3", "f4"}, queued); !equal {
t.Errorf("Order does not match. Diff:\n%s", diff) t.Errorf("Order does not match. Diff:\n%s", diff)
} }
q.BringToFront("f3") q.BringToFront("f3")
_, queued = q.Jobs() _, queued, _ = q.Jobs(1, 100)
if diff, equal := messagediff.PrettyDiff([]string{"f3", "f1", "f2", "f4"}, queued); !equal { if diff, equal := messagediff.PrettyDiff([]string{"f3", "f1", "f2", "f4"}, queued); !equal {
t.Errorf("Order does not match. Diff:\n%s", diff) t.Errorf("Order does not match. Diff:\n%s", diff)
} }
q.BringToFront("f2") q.BringToFront("f2")
_, queued = q.Jobs() _, queued, _ = q.Jobs(1, 100)
if diff, equal := messagediff.PrettyDiff([]string{"f2", "f3", "f1", "f4"}, queued); !equal { if diff, equal := messagediff.PrettyDiff([]string{"f2", "f3", "f1", "f4"}, queued); !equal {
t.Errorf("Order does not match. Diff:\n%s", diff) t.Errorf("Order does not match. Diff:\n%s", diff)
} }
q.BringToFront("f4") // corner case: last element q.BringToFront("f4") // corner case: last element
_, queued = q.Jobs() _, queued, _ = q.Jobs(1, 100)
if diff, equal := messagediff.PrettyDiff([]string{"f4", "f2", "f3", "f1"}, queued); !equal { if diff, equal := messagediff.PrettyDiff([]string{"f4", "f2", "f3", "f1"}, queued); !equal {
t.Errorf("Order does not match. Diff:\n%s", diff) t.Errorf("Order does not match. Diff:\n%s", diff)
} }
@ -171,9 +171,9 @@ func TestShuffle(t *testing.T) {
// This test will fail once in eight million times (1 / (4!)^5) :) // This test will fail once in eight million times (1 / (4!)^5) :)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
q.Shuffle() q.Shuffle()
_, queued := q.Jobs() _, queued, _ := q.Jobs(1, 100)
if l := len(queued); l != 4 { if l := len(queued); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l) t.Fatalf("Weird length %d returned from jobs(1, 100)", l)
} }
t.Logf("%v", queued) t.Logf("%v", queued)
@ -195,9 +195,9 @@ func TestSortBySize(t *testing.T) {
q.SortSmallestFirst() q.SortSmallestFirst()
_, actual := q.Jobs() _, actual, _ := q.Jobs(1, 100)
if l := len(actual); l != 4 { if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l) t.Fatalf("Weird length %d returned from jobs(1, 100)", l)
} }
expected := []string{"f4", "f1", "f3", "f2"} expected := []string{"f4", "f1", "f3", "f2"}
@ -207,9 +207,9 @@ func TestSortBySize(t *testing.T) {
q.SortLargestFirst() q.SortLargestFirst()
_, actual = q.Jobs() _, actual, _ = q.Jobs(1, 100)
if l := len(actual); l != 4 { if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l) t.Fatalf("Weird length %d returned from jobs(1, 100)", l)
} }
expected = []string{"f2", "f3", "f1", "f4"} expected = []string{"f2", "f3", "f1", "f4"}
@ -227,9 +227,9 @@ func TestSortByAge(t *testing.T) {
q.SortOldestFirst() q.SortOldestFirst()
_, actual := q.Jobs() _, actual, _ := q.Jobs(1, 100)
if l := len(actual); l != 4 { if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l) t.Fatalf("Weird length %d returned from jobs(1, 100)", l)
} }
expected := []string{"f4", "f1", "f3", "f2"} expected := []string{"f4", "f1", "f3", "f2"}
@ -239,9 +239,9 @@ func TestSortByAge(t *testing.T) {
q.SortNewestFirst() q.SortNewestFirst()
_, actual = q.Jobs() _, actual, _ = q.Jobs(1, 100)
if l := len(actual); l != 4 { if l := len(actual); l != 4 {
t.Fatalf("Weird length %d returned from Jobs()", l) t.Fatalf("Weird length %d returned from jobs(1, 100)", l)
} }
expected = []string{"f2", "f3", "f1", "f4"} expected = []string{"f2", "f3", "f1", "f4"}
@ -280,3 +280,136 @@ func BenchmarkJobQueuePushPopDone10k(b *testing.B) {
} }
} }
func TestQueuePagination(t *testing.T) {
q := newJobQueue()
// Ten random actions
names := make([]string, 10)
for i := 0; i < 10; i++ {
names[i] = fmt.Sprint("f", i)
q.Push(names[i], 0, time.Time{})
}
progress, queued, skip := q.Jobs(1, 100)
if len(progress) != 0 || len(queued) != 10 || skip != 0 {
t.Error("Wrong length", len(progress), len(queued), 0)
}
progress, queued, skip = q.Jobs(1, 5)
if len(progress) != 0 || len(queued) != 5 || skip != 0 {
t.Error("Wrong length", len(progress), len(queued), 0)
} else if !equalStrings(queued, names[:5]) {
t.Errorf("Wrong elements in queued, got %v, expected %v", queued, names[:5])
}
progress, queued, skip = q.Jobs(2, 5)
if len(progress) != 0 || len(queued) != 5 || skip != 5 {
t.Error("Wrong length", len(progress), len(queued), 0)
} else if !equalStrings(queued, names[5:]) {
t.Errorf("Wrong elements in queued, got %v, expected %v", queued, names[5:])
}
progress, queued, skip = q.Jobs(2, 7)
if len(progress) != 0 || len(queued) != 3 || skip != 7 {
t.Error("Wrong length", len(progress), len(queued), 0)
} else if !equalStrings(queued, names[7:]) {
t.Errorf("Wrong elements in queued, got %v, expected %v", queued, names[7:])
}
progress, queued, skip = q.Jobs(3, 5)
if len(progress) != 0 || len(queued) != 0 || skip != 10 {
t.Error("Wrong length", len(progress), len(queued), 0)
}
n, ok := q.Pop()
if !ok || n != names[0] {
t.Fatal("Wrong element")
}
progress, queued, skip = q.Jobs(1, 100)
if len(progress) != 1 || len(queued) != 9 || skip != 0 {
t.Error("Wrong length", len(progress), len(queued), 0)
}
progress, queued, skip = q.Jobs(1, 5)
if len(progress) != 1 || len(queued) != 4 || skip != 0 {
t.Error("Wrong length", len(progress), len(queued), 0)
} else if !equalStrings(progress, names[:1]) {
t.Errorf("Wrong elements in progress, got %v, expected %v", progress, names[:1])
} else if !equalStrings(queued, names[1:5]) {
t.Errorf("Wrong elements in queued, got %v, expected %v", queued, names[1:5])
}
progress, queued, skip = q.Jobs(2, 5)
if len(progress) != 0 || len(queued) != 5 || skip != 5 {
t.Error("Wrong length", len(progress), len(queued), 0)
} else if !equalStrings(queued, names[5:]) {
t.Errorf("Wrong elements in queued, got %v, expected %v", queued, names[5:])
}
progress, queued, skip = q.Jobs(2, 7)
if len(progress) != 0 || len(queued) != 3 || skip != 7 {
t.Error("Wrong length", len(progress), len(queued), 0)
} else if !equalStrings(queued, names[7:]) {
t.Errorf("Wrong elements in queued, got %v, expected %v", queued, names[7:])
}
progress, queued, skip = q.Jobs(3, 5)
if len(progress) != 0 || len(queued) != 0 || skip != 10 {
t.Error("Wrong length", len(progress), len(queued), 0)
}
for i := 1; i < 8; i++ {
n, ok := q.Pop()
if !ok || n != names[i] {
t.Fatal("Wrong element")
}
}
progress, queued, skip = q.Jobs(1, 100)
if len(progress) != 8 || len(queued) != 2 || skip != 0 {
t.Error("Wrong length", len(progress), len(queued), 0)
}
progress, queued, skip = q.Jobs(1, 5)
if len(progress) != 5 || len(queued) != 0 || skip != 0 {
t.Error("Wrong length", len(progress), len(queued), 0)
} else if !equalStrings(progress, names[:5]) {
t.Errorf("Wrong elements in progress, got %v, expected %v", progress, names[:5])
}
progress, queued, skip = q.Jobs(2, 5)
if len(progress) != 3 || len(queued) != 2 || skip != 5 {
t.Error("Wrong length", len(progress), len(queued), 0)
} else if !equalStrings(progress, names[5:8]) {
t.Errorf("Wrong elements in progress, got %v, expected %v", progress, names[5:8])
} else if !equalStrings(queued, names[8:]) {
t.Errorf("Wrong elements in queued, got %v, expected %v", queued, names[8:])
}
progress, queued, skip = q.Jobs(2, 7)
if len(progress) != 1 || len(queued) != 2 || skip != 7 {
t.Error("Wrong length", len(progress), len(queued), 0)
} else if !equalStrings(progress, names[7:8]) {
t.Errorf("Wrong elements in progress, got %v, expected %v", progress, names[7:8])
} else if !equalStrings(queued, names[8:]) {
t.Errorf("Wrong elements in queued, got %v, expected %v", queued, names[8:])
}
progress, queued, skip = q.Jobs(3, 5)
if len(progress) != 0 || len(queued) != 0 || skip != 10 {
t.Error("Wrong length", len(progress), len(queued), 0)
}
}
func equalStrings(first, second []string) bool {
if len(first) != len(second) {
return false
}
for i := range first {
if first[i] != second[i] {
return false
}
}
return true
}