lib/model: Sort outgoing index updates by LocalVersion

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3411
This commit is contained in:
Jakob Borg 2016-07-21 17:21:15 +00:00 committed by Audrius Butkevicius
parent e1a4f81e50
commit 8ab6b60778
3 changed files with 361 additions and 0 deletions

View File

@ -1194,6 +1194,9 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold
maxLocalVer := int64(0)
var err error
sorter := NewIndexSorter()
defer sorter.Close()
fs.WithHave(protocol.LocalDeviceID, func(fi db.FileIntf) bool {
f := fi.(protocol.FileInfo)
if f.LocalVersion <= minLocalVer {
@ -1209,6 +1212,11 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold
return true
}
sorter.Append(f)
return true
})
sorter.Sorted(func(f protocol.FileInfo) bool {
if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize {
if initial {
if err = conn.Index(folder, batch); err != nil {

197
lib/model/sorter.go Normal file
View File

@ -0,0 +1,197 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"encoding/binary"
"io/ioutil"
"sort"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
const (
maxBytesInMemory = 512 << 10
)
// The IndexSorter sorts FileInfos based on their LocalVersion. You use it
// by first Append()ing all entries to be sorted, then calling Sorted()
// which will iterate over all the items in correctly sorted order.
type IndexSorter interface {
Append(f protocol.FileInfo)
Sorted(fn func(f protocol.FileInfo) bool)
Close()
}
type internalIndexSorter interface {
IndexSorter
full() bool
copyTo(to IndexSorter)
}
// NewIndexSorter returns a new IndexSorter that will start out in memory
// for efficiency but switch to on disk storage once the amount of data
// becomes large.
func NewIndexSorter() IndexSorter {
return &autoSwitchingIndexSorter{
internalIndexSorter: newInMemoryIndexSorter(),
}
}
// An autoSwitchingSorter starts out as an inMemorySorter but becomes an
// onDiskSorter when the in memory sorter is full().
type autoSwitchingIndexSorter struct {
internalIndexSorter
}
func (s *autoSwitchingIndexSorter) Append(f protocol.FileInfo) {
if s.internalIndexSorter.full() {
// We spill before adding a file instead of after, to handle the
// case where we're over max size but won't add any more files, in
// which case we *don't* need to spill. An example of this would be
// an index containing just a single large file.
l.Debugf("sorter %p spills to disk", s)
next := newOnDiskIndexSorter()
s.internalIndexSorter.copyTo(next)
s.internalIndexSorter = next
}
s.internalIndexSorter.Append(f)
}
// An inMemoryIndexSorter is simply a slice of FileInfos. The full() method
// returns true when the number of files exceeds maxFiles or the total
// number of blocks exceeds maxBlocks.
type inMemoryIndexSorter struct {
files []protocol.FileInfo
bytes int
maxBytes int
}
func newInMemoryIndexSorter() *inMemoryIndexSorter {
return &inMemoryIndexSorter{
maxBytes: maxBytesInMemory,
}
}
func (s *inMemoryIndexSorter) Append(f protocol.FileInfo) {
s.files = append(s.files, f)
s.bytes += f.ProtoSize()
}
func (s *inMemoryIndexSorter) Sorted(fn func(protocol.FileInfo) bool) {
sort.Sort(byLocalVersion(s.files))
for _, f := range s.files {
if !fn(f) {
break
}
}
}
func (s *inMemoryIndexSorter) Close() {
}
func (s *inMemoryIndexSorter) full() bool {
return s.bytes >= s.maxBytes
}
func (s *inMemoryIndexSorter) copyTo(dst IndexSorter) {
for _, f := range s.files {
dst.Append(f)
}
}
// byLocalVersion sorts FileInfos by LocalVersion
type byLocalVersion []protocol.FileInfo
func (l byLocalVersion) Len() int {
return len(l)
}
func (l byLocalVersion) Swap(a, b int) {
l[a], l[b] = l[b], l[a]
}
func (l byLocalVersion) Less(a, b int) bool {
return l[a].LocalVersion < l[b].LocalVersion
}
// An onDiskIndexSorter is backed by a LevelDB database in the temporary
// directory. It relies on the fact that iterating over the database is done
// in key order and uses the LocalVersion as key. When done with an
// onDiskIndexSorter you must call Close() to remove the temporary database.
type onDiskIndexSorter struct {
db *leveldb.DB
dir string
}
func newOnDiskIndexSorter() *onDiskIndexSorter {
// Set options to minimize resource usage.
opts := &opt.Options{
OpenFilesCacheCapacity: 10,
WriteBuffer: 512 << 10,
}
// Use a temporary database directory.
tmp, err := ioutil.TempDir("", "syncthing-db.")
if err != nil {
panic("creating temporary directory: " + err.Error())
}
db, err := leveldb.OpenFile(tmp, opts)
if err != nil {
panic("creating temporary database: " + err.Error())
}
s := &onDiskIndexSorter{
db: db,
dir: tmp,
}
l.Debugf("onDiskIndexSorter %p created at %s", s, tmp)
return s
}
func (s *onDiskIndexSorter) Append(f protocol.FileInfo) {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key[:], uint64(f.LocalVersion))
data, err := f.Marshal()
if err != nil {
panic("bug: marshalling FileInfo should never fail: " + err.Error())
}
err = s.db.Put(key, data, nil)
if err != nil {
panic("writing to temporary database: " + err.Error())
}
}
func (s *onDiskIndexSorter) Sorted(fn func(protocol.FileInfo) bool) {
it := s.db.NewIterator(nil, nil)
defer it.Release()
for it.Next() {
var f protocol.FileInfo
if err := f.Unmarshal(it.Value()); err != nil {
panic("unmarshal failed: " + err.Error())
}
if !fn(f) {
break
}
}
}
func (s *onDiskIndexSorter) Close() {
l.Debugf("onDiskIndexSorter %p closes", s)
s.db.Close()
osutil.RemoveAll(s.dir)
}
func (s *onDiskIndexSorter) full() bool {
return false
}
func (s *onDiskIndexSorter) copyTo(dst IndexSorter) {
// Just wrap Sorted() if we need to support this in the future.
panic("unsupported")
}

156
lib/model/sorter_test.go Normal file
View File

@ -0,0 +1,156 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package model
import (
"fmt"
"os"
"testing"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/rand"
)
func TestInMemoryIndexSorter(t *testing.T) {
// An inMemorySorter should be able to absorb a few files in unsorted
// order, and return them sorted.
s := newInMemoryIndexSorter()
addFiles(50, s)
verifySorted(t, s, 50)
verifyBreak(t, s, 50)
s.Close()
}
func TestOnDiskIndexSorter(t *testing.T) {
// An onDiskSorter should be able to absorb a few files in unsorted
// order, and return them sorted.
s := newOnDiskIndexSorter()
addFiles(50, s)
verifySorted(t, s, 50)
verifyBreak(t, s, 50)
// The temporary database should exist on disk. When Close()d, it should
// be removed.
info, err := os.Stat(s.dir)
if err != nil {
t.Fatal("temp database should exist on disk:", err)
}
if !info.IsDir() {
t.Fatal("temp database should be a directory")
}
s.Close()
_, err = os.Stat(s.dir)
if !os.IsNotExist(err) {
t.Fatal("temp database should have been removed")
}
}
func TestIndexSorter(t *testing.T) {
// An default IndexSorter should be able to absorb files, have them in
// memory, and at some point switch to an on disk database.
s := NewIndexSorter()
defer s.Close()
// We should start out as an in memory store.
nFiles := 1
addFiles(1, s)
verifySorted(t, s, nFiles)
as := s.(*autoSwitchingIndexSorter)
if _, ok := as.internalIndexSorter.(*inMemoryIndexSorter); !ok {
t.Fatalf("the sorter should be in memory after only one file")
}
// At some point, for sure with less than maxBytesInMemory files, we
// should switch over to an on disk sorter.
for i := 0; i < maxBytesInMemory; i++ {
addFiles(1, s)
nFiles++
if _, ok := as.internalIndexSorter.(*onDiskIndexSorter); ok {
break
}
}
if _, ok := as.internalIndexSorter.(*onDiskIndexSorter); !ok {
t.Fatalf("the sorter should be on disk after %d files", nFiles)
}
verifySorted(t, s, nFiles)
// For test coverage, as some methods are called on the onDiskSorter
// only after switching to it.
addFiles(1, s)
verifySorted(t, s, nFiles+1)
}
// addFiles adds files with random LocalVersion to the Sorter.
func addFiles(n int, s IndexSorter) {
for i := 0; i < n; i++ {
rnd := rand.Int63()
f := protocol.FileInfo{
Name: fmt.Sprintf("file-%d", rnd),
Size: rand.Int63(),
Permissions: uint32(rand.Intn(0777)),
Modified: rand.Int63(),
LocalVersion: rnd,
Version: protocol.Vector{Counters: []protocol.Counter{{ID: 42, Value: uint64(rand.Int63())}}},
Blocks: []protocol.BlockInfo{{
Size: int32(rand.Intn(128 << 10)),
Hash: []byte(rand.String(32)),
}},
}
s.Append(f)
}
}
// verifySorted checks that the files are returned sorted by LocalVersion.
func verifySorted(t *testing.T, s IndexSorter, expected int) {
prevLocalVer := int64(-1)
seen := 0
s.Sorted(func(f protocol.FileInfo) bool {
if f.LocalVersion <= prevLocalVer {
t.Fatalf("Unsorted LocalVer, %d <= %d", f.LocalVersion, prevLocalVer)
}
prevLocalVer = f.LocalVersion
seen++
return true
})
if seen != expected {
t.Fatalf("expected %d files returned, got %d", expected, seen)
}
}
// verifyBreak checks that the Sorter stops iteration once we return false.
func verifyBreak(t *testing.T, s IndexSorter, expected int) {
prevLocalVer := int64(-1)
seen := 0
s.Sorted(func(f protocol.FileInfo) bool {
if f.LocalVersion <= prevLocalVer {
t.Fatalf("Unsorted LocalVer, %d <= %d", f.LocalVersion, prevLocalVer)
}
if len(f.Blocks) != 1 {
t.Fatalf("incorrect number of blocks %d != 1", len(f.Blocks))
}
if len(f.Version.Counters) != 1 {
t.Fatalf("incorrect number of version counters %d != 1", len(f.Version.Counters))
}
prevLocalVer = f.LocalVersion
seen++
return seen < expected/2
})
if seen != expected/2 {
t.Fatalf("expected %d files iterated over, got %d", expected, seen)
}
}