mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-10 15:20:56 +00:00
lib/db, lib/model: Add sequence->deviceKey to db for sending indexes (#4906)
Instead of walking and unmarshalling the entire db and sorting the resulting file infos by sequence, add store device keys by sequence number in the database. Thus only the required file infos need be unmarshalled and are already sorted by index.
This commit is contained in:
parent
2c18640386
commit
a548014755
@ -11,11 +11,9 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
)
|
||||
|
||||
const dbVersion = 1
|
||||
const dbVersion = 2
|
||||
|
||||
const (
|
||||
KeyTypeDevice = iota
|
||||
@ -29,6 +27,7 @@ const (
|
||||
KeyTypeIndexID
|
||||
KeyTypeFolderMeta
|
||||
KeyTypeMiscData
|
||||
KeyTypeSequence
|
||||
)
|
||||
|
||||
func (l VersionList) String() string {
|
||||
@ -60,28 +59,5 @@ func (l fileList) Less(a, b int) bool {
|
||||
return l[a].Name < l[b].Name
|
||||
}
|
||||
|
||||
type dbReader interface {
|
||||
Get([]byte, *opt.ReadOptions) ([]byte, error)
|
||||
}
|
||||
|
||||
// Flush batches to disk when they contain this many records.
|
||||
const batchFlushSize = 64
|
||||
|
||||
func getFile(db dbReader, key []byte) (protocol.FileInfo, bool) {
|
||||
bs, err := db.Get(key, nil)
|
||||
if err == leveldb.ErrNotFound {
|
||||
return protocol.FileInfo{}, false
|
||||
}
|
||||
if err != nil {
|
||||
l.Debugln("surprise error:", err)
|
||||
return protocol.FileInfo{}, false
|
||||
}
|
||||
|
||||
var f protocol.FileInfo
|
||||
err = f.Unmarshal(bs)
|
||||
if err != nil {
|
||||
l.Debugln("unmarshal error:", err)
|
||||
return protocol.FileInfo{}, false
|
||||
}
|
||||
return f, true
|
||||
}
|
||||
|
@ -35,10 +35,13 @@ type Instance struct {
|
||||
}
|
||||
|
||||
const (
|
||||
keyPrefixLen = 1
|
||||
keyFolderLen = 4 // indexed
|
||||
keyDeviceLen = 4 // indexed
|
||||
keyHashLen = 32
|
||||
keyPrefixLen = 1
|
||||
keyFolderLen = 4 // indexed
|
||||
keyDeviceLen = 4 // indexed
|
||||
keySequenceLen = 8
|
||||
keyHashLen = 32
|
||||
|
||||
maxInt64 int64 = 1<<63 - 1
|
||||
)
|
||||
|
||||
func Open(file string) (*Instance, error) {
|
||||
@ -88,13 +91,21 @@ func (db *Instance) UpdateSchema() {
|
||||
miscDB := NewNamespacedKV(db, string(KeyTypeMiscData))
|
||||
prevVersion, _ := miscDB.Int64("dbVersion")
|
||||
|
||||
if prevVersion >= dbVersion {
|
||||
return
|
||||
}
|
||||
|
||||
l.Infof("Updating database schema version from %v to %v...", prevVersion, dbVersion)
|
||||
|
||||
if prevVersion == 0 {
|
||||
db.updateSchema0to1()
|
||||
}
|
||||
|
||||
if prevVersion != dbVersion {
|
||||
miscDB.PutInt64("dbVersion", dbVersion)
|
||||
if prevVersion <= 1 {
|
||||
db.updateSchema1to2()
|
||||
}
|
||||
|
||||
l.Infof("Finished updating database schema version from %v to %v", prevVersion, dbVersion)
|
||||
miscDB.PutInt64("dbVersion", dbVersion)
|
||||
}
|
||||
|
||||
// Committed returns the number of items committed to the database since startup
|
||||
@ -112,9 +123,10 @@ func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, m
|
||||
defer t.close()
|
||||
|
||||
var fk []byte
|
||||
var gk []byte
|
||||
for _, f := range fs {
|
||||
name := []byte(f.Name)
|
||||
fk = db.deviceKeyInto(fk[:cap(fk)], folder, device, name)
|
||||
fk = db.deviceKeyInto(fk, folder, device, name)
|
||||
|
||||
// Get and unmarshal the file entry. If it doesn't exist or can't be
|
||||
// unmarshalled we'll add it as a new entry.
|
||||
@ -135,8 +147,10 @@ func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, m
|
||||
}
|
||||
meta.addFile(devID, f)
|
||||
|
||||
t.insertFile(folder, device, f)
|
||||
t.updateGlobal(folder, device, f, meta)
|
||||
t.insertFile(fk, folder, device, f)
|
||||
|
||||
gk = db.globalKeyInto(gk, folder, name)
|
||||
t.updateGlobal(gk, folder, device, f, meta)
|
||||
|
||||
// Write out and reuse the batch every few records, to avoid the batch
|
||||
// growing too large and thus allocating unnecessarily much memory.
|
||||
@ -144,6 +158,33 @@ func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, m
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Instance) addSequences(folder []byte, fs []protocol.FileInfo) {
|
||||
t := db.newReadWriteTransaction()
|
||||
defer t.close()
|
||||
|
||||
var sk []byte
|
||||
var dk []byte
|
||||
for _, f := range fs {
|
||||
sk = db.sequenceKeyInto(sk, folder, f.Sequence)
|
||||
dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], []byte(f.Name))
|
||||
t.Put(sk, dk)
|
||||
l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
|
||||
t.checkFlush()
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
|
||||
t := db.newReadWriteTransaction()
|
||||
defer t.close()
|
||||
|
||||
var sk []byte
|
||||
for _, f := range fs {
|
||||
t.Delete(db.sequenceKeyInto(sk, folder, f.Sequence))
|
||||
l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
|
||||
t.checkFlush()
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
|
||||
t := db.newReadOnlyTransaction()
|
||||
defer t.close()
|
||||
@ -171,7 +212,26 @@ func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn It
|
||||
l.Debugln("unmarshal error:", err)
|
||||
continue
|
||||
}
|
||||
if cont := fn(f); !cont {
|
||||
if !fn(f) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) {
|
||||
t := db.newReadOnlyTransaction()
|
||||
defer t.close()
|
||||
|
||||
dbi := t.NewIterator(&util.Range{Start: db.sequenceKey(folder, startSeq), Limit: db.sequenceKey(folder, maxInt64)}, nil)
|
||||
defer dbi.Release()
|
||||
|
||||
for dbi.Next() {
|
||||
f, ok := db.getFile(dbi.Value())
|
||||
if !ok {
|
||||
l.Debugln("missing file for sequence number", db.sequenceKeySequence(dbi.Key()))
|
||||
continue
|
||||
}
|
||||
if !fn(f) {
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -184,6 +244,8 @@ func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte,
|
||||
dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, nil, nil)[:keyPrefixLen+keyFolderLen]), nil)
|
||||
defer dbi.Release()
|
||||
|
||||
var gk []byte
|
||||
|
||||
for dbi.Next() {
|
||||
device := db.deviceKeyDevice(dbi.Key())
|
||||
var f FileInfoTruncated
|
||||
@ -200,20 +262,37 @@ func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte,
|
||||
switch f.Name {
|
||||
case "", ".", "..", "/": // A few obviously invalid filenames
|
||||
l.Infof("Dropping invalid filename %q from database", f.Name)
|
||||
t.removeFromGlobal(folder, device, nil, nil)
|
||||
name := []byte(f.Name)
|
||||
gk = db.globalKeyInto(gk, folder, name)
|
||||
t.removeFromGlobal(gk, folder, device, name, nil)
|
||||
t.Delete(dbi.Key())
|
||||
t.checkFlush()
|
||||
continue
|
||||
}
|
||||
|
||||
if cont := fn(device, f); !cont {
|
||||
if !fn(device, f) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Instance) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
|
||||
return getFile(db, db.deviceKey(folder, device, file))
|
||||
func (db *Instance) getFile(key []byte) (protocol.FileInfo, bool) {
|
||||
bs, err := db.Get(key, nil)
|
||||
if err == leveldb.ErrNotFound {
|
||||
return protocol.FileInfo{}, false
|
||||
}
|
||||
if err != nil {
|
||||
l.Debugln("surprise error:", err)
|
||||
return protocol.FileInfo{}, false
|
||||
}
|
||||
|
||||
var f protocol.FileInfo
|
||||
err = f.Unmarshal(bs)
|
||||
if err != nil {
|
||||
l.Debugln("unmarshal error:", err)
|
||||
return protocol.FileInfo{}, false
|
||||
}
|
||||
return f, true
|
||||
}
|
||||
|
||||
func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
|
||||
@ -286,7 +365,7 @@ func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator
|
||||
return
|
||||
}
|
||||
|
||||
fk = db.deviceKeyInto(fk[:cap(fk)], folder, vl.Versions[0].Device, name)
|
||||
fk = db.deviceKeyInto(fk, folder, vl.Versions[0].Device, name)
|
||||
bs, err := t.Get(fk, nil)
|
||||
if err != nil {
|
||||
l.Debugln("surprise error:", err)
|
||||
@ -299,7 +378,7 @@ func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator
|
||||
continue
|
||||
}
|
||||
|
||||
if cont := fn(f); !cont {
|
||||
if !fn(f) {
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -392,7 +471,7 @@ func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator)
|
||||
continue
|
||||
}
|
||||
|
||||
fk = db.deviceKeyInto(fk[:cap(fk)], folder, vl.Versions[i].Device, name)
|
||||
fk = db.deviceKeyInto(fk, folder, vl.Versions[i].Device, name)
|
||||
bs, err := t.Get(fk, nil)
|
||||
if err != nil {
|
||||
l.Debugln("surprise error:", err)
|
||||
@ -412,7 +491,7 @@ func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator)
|
||||
|
||||
l.Debugf("need folder=%q device=%v name=%q need=%v have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, protocol.DeviceIDFromBytes(device), name, need, have, haveFileVersion.Invalid, haveFileVersion.Version, needVersion, needDevice)
|
||||
|
||||
if cont := fn(gf); !cont {
|
||||
if !fn(gf) {
|
||||
return
|
||||
}
|
||||
|
||||
@ -478,10 +557,13 @@ func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracke
|
||||
dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, nil)), nil)
|
||||
defer dbi.Release()
|
||||
|
||||
var gk []byte
|
||||
|
||||
for dbi.Next() {
|
||||
key := dbi.Key()
|
||||
name := db.deviceKeyName(key)
|
||||
t.removeFromGlobal(folder, device, name, meta)
|
||||
gk = db.globalKeyInto(gk, folder, name)
|
||||
t.removeFromGlobal(gk, folder, device, name, meta)
|
||||
t.Delete(key)
|
||||
t.checkFlush()
|
||||
}
|
||||
@ -512,8 +594,7 @@ func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) {
|
||||
name := db.globalKeyName(gk)
|
||||
var newVL VersionList
|
||||
for i, version := range vl.Versions {
|
||||
fk = db.deviceKeyInto(fk[:cap(fk)], folder, version.Device, name)
|
||||
|
||||
fk = db.deviceKeyInto(fk, folder, version.Device, name)
|
||||
_, err := t.Get(fk, nil)
|
||||
if err == leveldb.ErrNotFound {
|
||||
continue
|
||||
@ -525,7 +606,7 @@ func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) {
|
||||
newVL.Versions = append(newVL.Versions, version)
|
||||
|
||||
if i == 0 {
|
||||
if fi, ok := t.getFile(folder, version.Device, name); ok {
|
||||
if fi, ok := db.getFile(fk); ok {
|
||||
meta.addFile(globalDeviceID, fi)
|
||||
}
|
||||
}
|
||||
@ -550,18 +631,20 @@ func (db *Instance) updateSchema0to1() {
|
||||
changedFolders := make(map[string]struct{})
|
||||
ignAdded := 0
|
||||
meta := newMetadataTracker() // dummy metadata tracker
|
||||
var gk []byte
|
||||
|
||||
for dbi.Next() {
|
||||
folder := db.deviceKeyFolder(dbi.Key())
|
||||
device := db.deviceKeyDevice(dbi.Key())
|
||||
name := string(db.deviceKeyName(dbi.Key()))
|
||||
name := db.deviceKeyName(dbi.Key())
|
||||
|
||||
// Remove files with absolute path (see #4799)
|
||||
if strings.HasPrefix(name, "/") {
|
||||
if strings.HasPrefix(string(name), "/") {
|
||||
if _, ok := changedFolders[string(folder)]; !ok {
|
||||
changedFolders[string(folder)] = struct{}{}
|
||||
}
|
||||
t.removeFromGlobal(folder, device, nil, nil)
|
||||
gk = db.globalKeyInto(gk, folder, name)
|
||||
t.removeFromGlobal(gk, folder, device, nil, nil)
|
||||
t.Delete(dbi.Key())
|
||||
t.checkFlush()
|
||||
continue
|
||||
@ -590,7 +673,8 @@ func (db *Instance) updateSchema0to1() {
|
||||
|
||||
// Add invalid files to global list
|
||||
if f.Invalid {
|
||||
if t.updateGlobal(folder, device, f, meta) {
|
||||
gk = db.globalKeyInto(gk, folder, name)
|
||||
if t.updateGlobal(gk, folder, device, f, meta) {
|
||||
if _, ok := changedFolders[string(folder)]; !ok {
|
||||
changedFolders[string(folder)] = struct{}{}
|
||||
}
|
||||
@ -606,6 +690,25 @@ func (db *Instance) updateSchema0to1() {
|
||||
l.Infof("Updated symlink type for %d index entries and added %d invalid files to global list", symlinkConv, ignAdded)
|
||||
}
|
||||
|
||||
func (db *Instance) updateSchema1to2() {
|
||||
t := db.newReadWriteTransaction()
|
||||
defer t.close()
|
||||
|
||||
var sk []byte
|
||||
var dk []byte
|
||||
|
||||
for _, folderStr := range db.ListFolders() {
|
||||
folder := []byte(folderStr)
|
||||
db.withHave(folder, protocol.LocalDeviceID[:], nil, true, func(f FileIntf) bool {
|
||||
sk = db.sequenceKeyInto(sk, folder, f.SequenceNo())
|
||||
dk = db.deviceKeyInto(dk, folder, protocol.LocalDeviceID[:], []byte(f.FileName()))
|
||||
t.Put(sk, dk)
|
||||
t.checkFlush()
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// deviceKey returns a byte slice encoding the following information:
|
||||
// keyTypeDevice (1 byte)
|
||||
// folder (4 bytes)
|
||||
@ -615,16 +718,14 @@ func (db *Instance) deviceKey(folder, device, file []byte) []byte {
|
||||
return db.deviceKeyInto(nil, folder, device, file)
|
||||
}
|
||||
|
||||
func (db *Instance) deviceKeyInto(k []byte, folder, device, file []byte) []byte {
|
||||
func (db *Instance) deviceKeyInto(k, folder, device, file []byte) []byte {
|
||||
reqLen := keyPrefixLen + keyFolderLen + keyDeviceLen + len(file)
|
||||
if len(k) < reqLen {
|
||||
k = make([]byte, reqLen)
|
||||
}
|
||||
k = resize(k, reqLen)
|
||||
k[0] = KeyTypeDevice
|
||||
binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
|
||||
binary.BigEndian.PutUint32(k[keyPrefixLen+keyFolderLen:], db.deviceIdx.ID(device))
|
||||
copy(k[keyPrefixLen+keyFolderLen+keyDeviceLen:], file)
|
||||
return k[:reqLen]
|
||||
return k
|
||||
}
|
||||
|
||||
// deviceKeyName returns the device ID from the key
|
||||
@ -655,11 +756,16 @@ func (db *Instance) deviceKeyDevice(key []byte) []byte {
|
||||
// folder (4 bytes)
|
||||
// name (variable size)
|
||||
func (db *Instance) globalKey(folder, file []byte) []byte {
|
||||
k := make([]byte, keyPrefixLen+keyFolderLen+len(file))
|
||||
k[0] = KeyTypeGlobal
|
||||
binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
|
||||
copy(k[keyPrefixLen+keyFolderLen:], file)
|
||||
return k
|
||||
return db.globalKeyInto(nil, folder, file)
|
||||
}
|
||||
|
||||
func (db *Instance) globalKeyInto(gk, folder, file []byte) []byte {
|
||||
reqLen := keyPrefixLen + keyFolderLen + len(file)
|
||||
gk = resize(gk, reqLen)
|
||||
gk[0] = KeyTypeGlobal
|
||||
binary.BigEndian.PutUint32(gk[keyPrefixLen:], db.folderIdx.ID(folder))
|
||||
copy(gk[keyPrefixLen+keyFolderLen:], file)
|
||||
return gk[:reqLen]
|
||||
}
|
||||
|
||||
// globalKeyName returns the filename from the key
|
||||
@ -672,6 +778,28 @@ func (db *Instance) globalKeyFolder(key []byte) ([]byte, bool) {
|
||||
return db.folderIdx.Val(binary.BigEndian.Uint32(key[keyPrefixLen:]))
|
||||
}
|
||||
|
||||
// sequenceKey returns a byte slice encoding the following information:
|
||||
// KeyTypeSequence (1 byte)
|
||||
// folder (4 bytes)
|
||||
// sequence number (8 bytes)
|
||||
func (db *Instance) sequenceKey(folder []byte, seq int64) []byte {
|
||||
return db.sequenceKeyInto(nil, folder, seq)
|
||||
}
|
||||
|
||||
func (db *Instance) sequenceKeyInto(k []byte, folder []byte, seq int64) []byte {
|
||||
reqLen := keyPrefixLen + keyFolderLen + keySequenceLen
|
||||
k = resize(k, reqLen)
|
||||
k[0] = KeyTypeSequence
|
||||
binary.BigEndian.PutUint32(k[keyPrefixLen:], db.folderIdx.ID(folder))
|
||||
binary.BigEndian.PutUint64(k[keyPrefixLen+keyFolderLen:], uint64(seq))
|
||||
return k[:reqLen]
|
||||
}
|
||||
|
||||
// sequenceKeySequence returns the sequence number from the key
|
||||
func (db *Instance) sequenceKeySequence(key []byte) int64 {
|
||||
return int64(binary.BigEndian.Uint64(key[keyPrefixLen+keyFolderLen:]))
|
||||
}
|
||||
|
||||
func (db *Instance) getIndexID(device, folder []byte) protocol.IndexID {
|
||||
key := db.indexIDKey(device, folder)
|
||||
cur, err := db.Get(key, nil)
|
||||
@ -887,3 +1015,11 @@ func (i *smallIndex) Val(id uint32) ([]byte, bool) {
|
||||
|
||||
return []byte(val), true
|
||||
}
|
||||
|
||||
// resize returns a byte array of length reqLen, reusing k if possible
|
||||
func resize(k []byte, reqLen int) []byte {
|
||||
if cap(k) < reqLen {
|
||||
return make([]byte, reqLen)
|
||||
}
|
||||
return k[:reqLen]
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ func (t readOnlyTransaction) close() {
|
||||
}
|
||||
|
||||
func (t readOnlyTransaction) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
|
||||
return getFile(t, t.db.deviceKey(folder, device, file))
|
||||
return t.db.getFile(t.db.deviceKey(folder, device, file))
|
||||
}
|
||||
|
||||
// A readWriteTransaction is a readOnlyTransaction plus a batch for writes.
|
||||
@ -74,21 +74,18 @@ func (t readWriteTransaction) flush() {
|
||||
atomic.AddInt64(&t.db.committed, int64(t.Batch.Len()))
|
||||
}
|
||||
|
||||
func (t readWriteTransaction) insertFile(folder, device []byte, file protocol.FileInfo) {
|
||||
func (t readWriteTransaction) insertFile(fk, folder, device []byte, file protocol.FileInfo) {
|
||||
l.Debugf("insert; folder=%q device=%v %v", folder, protocol.DeviceIDFromBytes(device), file)
|
||||
|
||||
name := []byte(file.Name)
|
||||
nk := t.db.deviceKey(folder, device, name)
|
||||
t.Put(nk, mustMarshal(&file))
|
||||
t.Put(fk, mustMarshal(&file))
|
||||
}
|
||||
|
||||
// updateGlobal adds this device+version to the version list for the given
|
||||
// file. If the device is already present in the list, the version is updated.
|
||||
// If the file does not have an entry in the global list, it is created.
|
||||
func (t readWriteTransaction) updateGlobal(folder, device []byte, file protocol.FileInfo, meta *metadataTracker) bool {
|
||||
func (t readWriteTransaction) updateGlobal(gk, folder, device []byte, file protocol.FileInfo, meta *metadataTracker) bool {
|
||||
l.Debugf("update global; folder=%q device=%v file=%q version=%v invalid=%v", folder, protocol.DeviceIDFromBytes(device), file.Name, file.Version, file.Invalid)
|
||||
name := []byte(file.Name)
|
||||
gk := t.db.globalKey(folder, name)
|
||||
svl, _ := t.Get(gk, nil) // skip error, we check len(svl) != 0 later
|
||||
|
||||
var fl VersionList
|
||||
@ -150,8 +147,7 @@ insert:
|
||||
// to determine the winner.)
|
||||
//
|
||||
// A surprise missing file entry here is counted as a win for us.
|
||||
of, ok := t.getFile(folder, fl.Versions[i].Device, name)
|
||||
if !ok || file.WinsConflict(of) {
|
||||
if of, ok := t.getFile(folder, fl.Versions[i].Device, name); !ok || file.WinsConflict(of) {
|
||||
fl.Versions = insertVersion(fl.Versions, i, nv)
|
||||
insertedAt = i
|
||||
break insert
|
||||
@ -193,10 +189,9 @@ insert:
|
||||
// removeFromGlobal removes the device from the global version list for the
|
||||
// given file. If the version list is empty after this, the file entry is
|
||||
// removed entirely.
|
||||
func (t readWriteTransaction) removeFromGlobal(folder, device, file []byte, meta *metadataTracker) {
|
||||
func (t readWriteTransaction) removeFromGlobal(gk, folder, device, file []byte, meta *metadataTracker) {
|
||||
l.Debugf("remove from global; folder=%q device=%v file=%q", folder, protocol.DeviceIDFromBytes(device), file)
|
||||
|
||||
gk := t.db.globalKey(folder, file)
|
||||
svl, err := t.Get(gk, nil)
|
||||
if err != nil {
|
||||
// We might be called to "remove" a global version that doesn't exist
|
||||
|
@ -141,8 +141,11 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
|
||||
// filter slice according to https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
||||
oldFs := fs
|
||||
fs = fs[:0]
|
||||
var dk []byte
|
||||
folder := []byte(s.folder)
|
||||
for _, nf := range oldFs {
|
||||
ef, ok := s.db.getFile([]byte(s.folder), device[:], []byte(nf.Name))
|
||||
dk = s.db.deviceKeyInto(dk, folder, device[:], []byte(osutil.NormalizedFilename(nf.Name)))
|
||||
ef, ok := s.db.getFile(dk)
|
||||
if ok && ef.Version.Equal(nf.Version) && ef.Invalid == nf.Invalid {
|
||||
continue
|
||||
}
|
||||
@ -157,6 +160,8 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
|
||||
}
|
||||
s.blockmap.Discard(discards)
|
||||
s.blockmap.Update(updates)
|
||||
s.db.removeSequences(folder, discards)
|
||||
s.db.addSequences(folder, updates)
|
||||
}
|
||||
|
||||
s.db.updateFiles([]byte(s.folder), device[:], fs, s.meta)
|
||||
@ -183,6 +188,11 @@ func (s *FileSet) WithHaveTruncated(device protocol.DeviceID, fn Iterator) {
|
||||
s.db.withHave([]byte(s.folder), device[:], nil, true, nativeFileIterator(fn))
|
||||
}
|
||||
|
||||
func (s *FileSet) WithHaveSequence(startSeq int64, fn Iterator) {
|
||||
l.Debugf("%s WithHaveSequence(%v)", s.folder, startSeq)
|
||||
s.db.withHaveSequence([]byte(s.folder), startSeq, nativeFileIterator(fn))
|
||||
}
|
||||
|
||||
func (s *FileSet) WithPrefixedHaveTruncated(device protocol.DeviceID, prefix string, fn Iterator) {
|
||||
l.Debugf("%s WithPrefixedHaveTruncated(%v)", s.folder, device)
|
||||
s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn))
|
||||
@ -203,7 +213,7 @@ func (s *FileSet) WithPrefixedGlobalTruncated(prefix string, fn Iterator) {
|
||||
}
|
||||
|
||||
func (s *FileSet) Get(device protocol.DeviceID, file string) (protocol.FileInfo, bool) {
|
||||
f, ok := s.db.getFile([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file)))
|
||||
f, ok := s.db.getFile(s.db.deviceKey([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file))))
|
||||
f.Name = osutil.NativeFilename(f.Name)
|
||||
return f, ok
|
||||
}
|
||||
|
@ -865,6 +865,33 @@ func TestIssue4701(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithHaveSequence(t *testing.T) {
|
||||
ldb := db.OpenMemory()
|
||||
|
||||
folder := "test)"
|
||||
s := db.NewFileSet(folder, fs.NewFilesystem(fs.FilesystemTypeBasic, "."), ldb)
|
||||
|
||||
// The files must not be in alphabetical order
|
||||
localHave := fileList{
|
||||
protocol.FileInfo{Name: "e", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1003}}}, Invalid: true},
|
||||
protocol.FileInfo{Name: "b", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1001}}}, Blocks: genBlocks(2)},
|
||||
protocol.FileInfo{Name: "d", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1003}}}, Blocks: genBlocks(7)},
|
||||
protocol.FileInfo{Name: "a", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1000}}}, Blocks: genBlocks(1)},
|
||||
protocol.FileInfo{Name: "c", Version: protocol.Vector{Counters: []protocol.Counter{{ID: myID, Value: 1002}}}, Blocks: genBlocks(5), Invalid: true},
|
||||
}
|
||||
|
||||
replace(s, protocol.LocalDeviceID, localHave)
|
||||
|
||||
i := 2
|
||||
s.WithHaveSequence(int64(i), func(fi db.FileIntf) bool {
|
||||
if f := fi.(protocol.FileInfo); !f.IsEquivalent(localHave[i-1], false, false) {
|
||||
t.Fatalf("Got %v\nExpected %v", f, localHave[i-1])
|
||||
}
|
||||
i++
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func replace(fs *db.FileSet, device protocol.DeviceID, files []protocol.FileInfo) {
|
||||
fs.Drop(device)
|
||||
fs.Update(device, files)
|
||||
|
@ -1639,14 +1639,15 @@ func (m *Model) receivedFile(folder string, file protocol.FileInfo) {
|
||||
m.folderStatRef(folder).ReceivedFile(file.Name, file.IsDeleted())
|
||||
}
|
||||
|
||||
func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, startSequence int64, dbLocation string, dropSymlinks bool) {
|
||||
func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, prevSequence int64, dbLocation string, dropSymlinks bool) {
|
||||
deviceID := conn.ID()
|
||||
var err error
|
||||
|
||||
l.Debugf("Starting sendIndexes for %s to %s at %s (slv=%d)", folder, deviceID, conn, startSequence)
|
||||
l.Debugf("Starting sendIndexes for %s to %s at %s (slv=%d)", folder, deviceID, conn, prevSequence)
|
||||
defer l.Debugf("Exiting sendIndexes for %s to %s at %s: %v", folder, deviceID, conn, err)
|
||||
|
||||
minSequence, err := sendIndexTo(startSequence, conn, folder, fs, ignores, dbLocation, dropSymlinks)
|
||||
// We need to send one index, regardless of whether there is something to send or not
|
||||
prevSequence, err = sendIndexTo(prevSequence, conn, folder, fs, ignores, dbLocation, dropSymlinks)
|
||||
|
||||
// Subscribe to LocalIndexUpdated (we have new information to send) and
|
||||
// DeviceDisconnected (it might be us who disconnected, so we should
|
||||
@ -1664,12 +1665,12 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignore
|
||||
// currently in the database, wait for the local index to update. The
|
||||
// local index may update for other folders than the one we are
|
||||
// sending for.
|
||||
if fs.Sequence(protocol.LocalDeviceID) <= minSequence {
|
||||
if fs.Sequence(protocol.LocalDeviceID) <= prevSequence {
|
||||
sub.Poll(time.Minute)
|
||||
continue
|
||||
}
|
||||
|
||||
minSequence, err = sendIndexTo(minSequence, conn, folder, fs, ignores, dbLocation, dropSymlinks)
|
||||
prevSequence, err = sendIndexTo(prevSequence, conn, folder, fs, ignores, dbLocation, dropSymlinks)
|
||||
|
||||
// Wait a short amount of time before entering the next loop. If there
|
||||
// are continuous changes happening to the local index, this gives us
|
||||
@ -1678,43 +1679,20 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignore
|
||||
}
|
||||
}
|
||||
|
||||
func sendIndexTo(minSequence int64, conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, dbLocation string, dropSymlinks bool) (int64, error) {
|
||||
// sendIndexTo sends file infos with a sequence number higher than prevSequence and
|
||||
// returns the highest sent sequence number.
|
||||
func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, dbLocation string, dropSymlinks bool) (int64, error) {
|
||||
deviceID := conn.ID()
|
||||
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
|
||||
batchSizeBytes := 0
|
||||
initial := minSequence == 0
|
||||
maxSequence := minSequence
|
||||
initial := prevSequence == 0
|
||||
var err error
|
||||
var f protocol.FileInfo
|
||||
debugMsg := func(t string) string {
|
||||
return fmt.Sprintf("Sending indexes for %s to %s at %s: %d files (<%d bytes) (%s)", folder, deviceID, conn, len(batch), batchSizeBytes, t)
|
||||
}
|
||||
|
||||
sorter := NewIndexSorter(dbLocation)
|
||||
defer sorter.Close()
|
||||
|
||||
fs.WithHave(protocol.LocalDeviceID, func(fi db.FileIntf) bool {
|
||||
f := fi.(protocol.FileInfo)
|
||||
if f.Sequence <= minSequence {
|
||||
return true
|
||||
}
|
||||
|
||||
if f.Sequence > maxSequence {
|
||||
maxSequence = f.Sequence
|
||||
}
|
||||
|
||||
if dropSymlinks && f.IsSymlink() {
|
||||
// Do not send index entries with symlinks to clients that can't
|
||||
// handle it. Fixes issue #3802. Once both sides are upgraded, a
|
||||
// rescan (i.e., change) of the symlink is required for it to
|
||||
// sync again, due to delta indexes.
|
||||
return true
|
||||
}
|
||||
|
||||
sorter.Append(f)
|
||||
return true
|
||||
})
|
||||
|
||||
sorter.Sorted(func(f protocol.FileInfo) bool {
|
||||
fs.WithHaveSequence(prevSequence+1, func(fi db.FileIntf) bool {
|
||||
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
|
||||
if initial {
|
||||
if err = conn.Index(folder, batch); err != nil {
|
||||
@ -1733,24 +1711,43 @@ func sendIndexTo(minSequence int64, conn protocol.Connection, folder string, fs
|
||||
batchSizeBytes = 0
|
||||
}
|
||||
|
||||
f = fi.(protocol.FileInfo)
|
||||
|
||||
if dropSymlinks && f.IsSymlink() {
|
||||
// Do not send index entries with symlinks to clients that can't
|
||||
// handle it. Fixes issue #3802. Once both sides are upgraded, a
|
||||
// rescan (i.e., change) of the symlink is required for it to
|
||||
// sync again, due to delta indexes.
|
||||
return true
|
||||
}
|
||||
|
||||
batch = append(batch, f)
|
||||
batchSizeBytes += f.ProtoSize()
|
||||
return true
|
||||
})
|
||||
|
||||
if initial && err == nil {
|
||||
if err != nil {
|
||||
return prevSequence, err
|
||||
}
|
||||
|
||||
if initial {
|
||||
err = conn.Index(folder, batch)
|
||||
if err == nil {
|
||||
l.Debugln(debugMsg("small initial index"))
|
||||
}
|
||||
} else if len(batch) > 0 && err == nil {
|
||||
} else if len(batch) > 0 {
|
||||
err = conn.IndexUpdate(folder, batch)
|
||||
if err == nil {
|
||||
l.Debugln(debugMsg("last batch"))
|
||||
}
|
||||
}
|
||||
|
||||
return maxSequence, err
|
||||
// True if there was nothing to be sent
|
||||
if f.Sequence == 0 {
|
||||
return prevSequence, err
|
||||
}
|
||||
|
||||
return f.Sequence, err
|
||||
}
|
||||
|
||||
func (m *Model) updateLocalsFromScanning(folder string, fs []protocol.FileInfo) {
|
||||
|
@ -1,199 +0,0 @@
|
||||
// Copyright (C) 2016 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
)
|
||||
|
||||
const (
|
||||
maxBytesInMemory = 512 << 10
|
||||
)
|
||||
|
||||
// The IndexSorter sorts FileInfos based on their Sequence. You use it
|
||||
// by first Append()ing all entries to be sorted, then calling Sorted()
|
||||
// which will iterate over all the items in correctly sorted order.
|
||||
type IndexSorter interface {
|
||||
Append(f protocol.FileInfo)
|
||||
Sorted(fn func(f protocol.FileInfo) bool)
|
||||
Close()
|
||||
}
|
||||
|
||||
type internalIndexSorter interface {
|
||||
IndexSorter
|
||||
full() bool
|
||||
copyTo(to IndexSorter)
|
||||
}
|
||||
|
||||
// NewIndexSorter returns a new IndexSorter that will start out in memory
|
||||
// for efficiency but switch to on disk storage once the amount of data
|
||||
// becomes large.
|
||||
func NewIndexSorter(location string) IndexSorter {
|
||||
return &autoSwitchingIndexSorter{
|
||||
internalIndexSorter: newInMemoryIndexSorter(),
|
||||
location: location,
|
||||
}
|
||||
}
|
||||
|
||||
// An autoSwitchingSorter starts out as an inMemorySorter but becomes an
|
||||
// onDiskSorter when the in memory sorter is full().
|
||||
type autoSwitchingIndexSorter struct {
|
||||
internalIndexSorter
|
||||
location string
|
||||
}
|
||||
|
||||
func (s *autoSwitchingIndexSorter) Append(f protocol.FileInfo) {
|
||||
if s.internalIndexSorter.full() {
|
||||
// We spill before adding a file instead of after, to handle the
|
||||
// case where we're over max size but won't add any more files, in
|
||||
// which case we *don't* need to spill. An example of this would be
|
||||
// an index containing just a single large file.
|
||||
l.Debugf("sorter %p spills to disk", s)
|
||||
next := newOnDiskIndexSorter(s.location)
|
||||
s.internalIndexSorter.copyTo(next)
|
||||
s.internalIndexSorter = next
|
||||
}
|
||||
s.internalIndexSorter.Append(f)
|
||||
}
|
||||
|
||||
// An inMemoryIndexSorter is simply a slice of FileInfos. The full() method
|
||||
// returns true when the number of files exceeds maxFiles or the total
|
||||
// number of blocks exceeds maxBlocks.
|
||||
type inMemoryIndexSorter struct {
|
||||
files []protocol.FileInfo
|
||||
bytes int
|
||||
maxBytes int
|
||||
}
|
||||
|
||||
func newInMemoryIndexSorter() *inMemoryIndexSorter {
|
||||
return &inMemoryIndexSorter{
|
||||
maxBytes: maxBytesInMemory,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *inMemoryIndexSorter) Append(f protocol.FileInfo) {
|
||||
s.files = append(s.files, f)
|
||||
s.bytes += f.ProtoSize()
|
||||
}
|
||||
|
||||
func (s *inMemoryIndexSorter) Sorted(fn func(protocol.FileInfo) bool) {
|
||||
sort.Sort(bySequence(s.files))
|
||||
for _, f := range s.files {
|
||||
if !fn(f) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *inMemoryIndexSorter) Close() {
|
||||
}
|
||||
|
||||
func (s *inMemoryIndexSorter) full() bool {
|
||||
return s.bytes >= s.maxBytes
|
||||
}
|
||||
|
||||
func (s *inMemoryIndexSorter) copyTo(dst IndexSorter) {
|
||||
for _, f := range s.files {
|
||||
dst.Append(f)
|
||||
}
|
||||
}
|
||||
|
||||
// bySequence sorts FileInfos by Sequence
|
||||
type bySequence []protocol.FileInfo
|
||||
|
||||
func (l bySequence) Len() int {
|
||||
return len(l)
|
||||
}
|
||||
func (l bySequence) Swap(a, b int) {
|
||||
l[a], l[b] = l[b], l[a]
|
||||
}
|
||||
func (l bySequence) Less(a, b int) bool {
|
||||
return l[a].Sequence < l[b].Sequence
|
||||
}
|
||||
|
||||
// An onDiskIndexSorter is backed by a LevelDB database in the temporary
|
||||
// directory. It relies on the fact that iterating over the database is done
|
||||
// in key order and uses the Sequence as key. When done with an
|
||||
// onDiskIndexSorter you must call Close() to remove the temporary database.
|
||||
type onDiskIndexSorter struct {
|
||||
db *leveldb.DB
|
||||
dir string
|
||||
}
|
||||
|
||||
func newOnDiskIndexSorter(location string) *onDiskIndexSorter {
|
||||
// Set options to minimize resource usage.
|
||||
opts := &opt.Options{
|
||||
OpenFilesCacheCapacity: 10,
|
||||
WriteBuffer: 512 << 10,
|
||||
}
|
||||
|
||||
// Use a temporary database directory.
|
||||
tmp, err := ioutil.TempDir(location, "tmp-index-sorter.")
|
||||
if err != nil {
|
||||
panic("creating temporary directory: " + err.Error())
|
||||
}
|
||||
db, err := leveldb.OpenFile(tmp, opts)
|
||||
if err != nil {
|
||||
panic("creating temporary database: " + err.Error())
|
||||
}
|
||||
|
||||
s := &onDiskIndexSorter{
|
||||
db: db,
|
||||
dir: tmp,
|
||||
}
|
||||
l.Debugf("onDiskIndexSorter %p created at %s", s, tmp)
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *onDiskIndexSorter) Append(f protocol.FileInfo) {
|
||||
key := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(key[:], uint64(f.Sequence))
|
||||
data, err := f.Marshal()
|
||||
if err != nil {
|
||||
panic("bug: marshalling FileInfo should never fail: " + err.Error())
|
||||
}
|
||||
err = s.db.Put(key, data, nil)
|
||||
if err != nil {
|
||||
panic("writing to temporary database: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *onDiskIndexSorter) Sorted(fn func(protocol.FileInfo) bool) {
|
||||
it := s.db.NewIterator(nil, nil)
|
||||
defer it.Release()
|
||||
for it.Next() {
|
||||
var f protocol.FileInfo
|
||||
if err := f.Unmarshal(it.Value()); err != nil {
|
||||
panic("unmarshal failed: " + err.Error())
|
||||
}
|
||||
if !fn(f) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *onDiskIndexSorter) Close() {
|
||||
l.Debugf("onDiskIndexSorter %p closes", s)
|
||||
s.db.Close()
|
||||
os.RemoveAll(s.dir)
|
||||
}
|
||||
|
||||
func (s *onDiskIndexSorter) full() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *onDiskIndexSorter) copyTo(dst IndexSorter) {
|
||||
// Just wrap Sorted() if we need to support this in the future.
|
||||
panic("unsupported")
|
||||
}
|
@ -1,157 +0,0 @@
|
||||
// Copyright (C) 2016 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/rand"
|
||||
)
|
||||
|
||||
func TestInMemoryIndexSorter(t *testing.T) {
|
||||
// An inMemorySorter should be able to absorb a few files in unsorted
|
||||
// order, and return them sorted.
|
||||
|
||||
s := newInMemoryIndexSorter()
|
||||
addFiles(50, s)
|
||||
verifySorted(t, s, 50)
|
||||
verifyBreak(t, s, 50)
|
||||
s.Close()
|
||||
}
|
||||
|
||||
func TestOnDiskIndexSorter(t *testing.T) {
|
||||
// An onDiskSorter should be able to absorb a few files in unsorted
|
||||
// order, and return them sorted.
|
||||
|
||||
s := newOnDiskIndexSorter("testdata")
|
||||
addFiles(50, s)
|
||||
verifySorted(t, s, 50)
|
||||
verifyBreak(t, s, 50)
|
||||
|
||||
// The temporary database should exist on disk. When Close()d, it should
|
||||
// be removed.
|
||||
|
||||
info, err := os.Stat(s.dir)
|
||||
if err != nil {
|
||||
t.Fatal("temp database should exist on disk:", err)
|
||||
}
|
||||
if !info.IsDir() {
|
||||
t.Fatal("temp database should be a directory")
|
||||
}
|
||||
|
||||
s.Close()
|
||||
|
||||
_, err = os.Stat(s.dir)
|
||||
if !os.IsNotExist(err) {
|
||||
t.Fatal("temp database should have been removed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexSorter(t *testing.T) {
|
||||
// An default IndexSorter should be able to absorb files, have them in
|
||||
// memory, and at some point switch to an on disk database.
|
||||
|
||||
s := NewIndexSorter("testdata")
|
||||
defer s.Close()
|
||||
|
||||
// We should start out as an in memory store.
|
||||
|
||||
nFiles := 1
|
||||
addFiles(1, s)
|
||||
verifySorted(t, s, nFiles)
|
||||
|
||||
as := s.(*autoSwitchingIndexSorter)
|
||||
if _, ok := as.internalIndexSorter.(*inMemoryIndexSorter); !ok {
|
||||
t.Fatalf("the sorter should be in memory after only one file")
|
||||
}
|
||||
|
||||
// At some point, for sure with less than maxBytesInMemory files, we
|
||||
// should switch over to an on disk sorter.
|
||||
for i := 0; i < maxBytesInMemory; i++ {
|
||||
addFiles(1, s)
|
||||
nFiles++
|
||||
if _, ok := as.internalIndexSorter.(*onDiskIndexSorter); ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := as.internalIndexSorter.(*onDiskIndexSorter); !ok {
|
||||
t.Fatalf("the sorter should be on disk after %d files", nFiles)
|
||||
}
|
||||
|
||||
verifySorted(t, s, nFiles)
|
||||
|
||||
// For test coverage, as some methods are called on the onDiskSorter
|
||||
// only after switching to it.
|
||||
|
||||
addFiles(1, s)
|
||||
verifySorted(t, s, nFiles+1)
|
||||
}
|
||||
|
||||
// addFiles adds files with random Sequence to the Sorter.
|
||||
func addFiles(n int, s IndexSorter) {
|
||||
for i := 0; i < n; i++ {
|
||||
rnd := rand.Int63()
|
||||
f := protocol.FileInfo{
|
||||
Name: fmt.Sprintf("file-%d", rnd),
|
||||
Size: rand.Int63(),
|
||||
Permissions: uint32(rand.Intn(0777)),
|
||||
ModifiedS: rand.Int63(),
|
||||
ModifiedNs: int32(rand.Int63()),
|
||||
Sequence: rnd,
|
||||
Version: protocol.Vector{Counters: []protocol.Counter{{ID: 42, Value: uint64(rand.Int63())}}},
|
||||
Blocks: []protocol.BlockInfo{{
|
||||
Size: int32(rand.Intn(128 << 10)),
|
||||
Hash: []byte(rand.String(32)),
|
||||
}},
|
||||
}
|
||||
s.Append(f)
|
||||
}
|
||||
}
|
||||
|
||||
// verifySorted checks that the files are returned sorted by Sequence.
|
||||
func verifySorted(t *testing.T, s IndexSorter, expected int) {
|
||||
prevSequence := int64(-1)
|
||||
seen := 0
|
||||
s.Sorted(func(f protocol.FileInfo) bool {
|
||||
if f.Sequence <= prevSequence {
|
||||
t.Fatalf("Unsorted Sequence, %d <= %d", f.Sequence, prevSequence)
|
||||
}
|
||||
prevSequence = f.Sequence
|
||||
seen++
|
||||
return true
|
||||
})
|
||||
if seen != expected {
|
||||
t.Fatalf("expected %d files returned, got %d", expected, seen)
|
||||
}
|
||||
}
|
||||
|
||||
// verifyBreak checks that the Sorter stops iteration once we return false.
|
||||
func verifyBreak(t *testing.T, s IndexSorter, expected int) {
|
||||
prevSequence := int64(-1)
|
||||
seen := 0
|
||||
s.Sorted(func(f protocol.FileInfo) bool {
|
||||
if f.Sequence <= prevSequence {
|
||||
t.Fatalf("Unsorted Sequence, %d <= %d", f.Sequence, prevSequence)
|
||||
}
|
||||
if len(f.Blocks) != 1 {
|
||||
t.Fatalf("incorrect number of blocks %d != 1", len(f.Blocks))
|
||||
}
|
||||
if len(f.Version.Counters) != 1 {
|
||||
t.Fatalf("incorrect number of version counters %d != 1", len(f.Version.Counters))
|
||||
}
|
||||
prevSequence = f.Sequence
|
||||
seen++
|
||||
return seen < expected/2
|
||||
})
|
||||
if seen != expected/2 {
|
||||
t.Fatalf("expected %d files iterated over, got %d", expected, seen)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user