From 6da7f17c4adc6b094edbe8e0e975d8cab9396ba3 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 25 Mar 2015 22:37:35 +0100 Subject: [PATCH] Implement version vectors --- cmd/syncthing/main.go | 2 +- internal/db/leveldb.go | 55 +++++++++++++++------------ internal/db/leveldb_xdr.go | 43 +++++++++++---------- internal/db/set.go | 8 ++-- internal/discover/packets_xdr.go | 64 ++++++++++++++++---------------- internal/lamport/clock.go | 31 ---------------- internal/lamport/clock_test.go | 24 ------------ internal/model/model.go | 20 +++++----- internal/scanner/walk.go | 18 +++++---- 9 files changed, 113 insertions(+), 152 deletions(-) delete mode 100644 internal/lamport/clock.go delete mode 100644 internal/lamport/clock_test.go diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 4ca717ec1..dde7b27d8 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -514,7 +514,7 @@ func syncthingMain() { } } - m := model.NewModel(cfg, myName, "syncthing", Version, ldb) + m := model.NewModel(cfg, myID, myName, "syncthing", Version, ldb) sanityCheckFolders(cfg, m) diff --git a/internal/db/leveldb.go b/internal/db/leveldb.go index 1e5dada5f..cb5280bd7 100644 --- a/internal/db/leveldb.go +++ b/internal/db/leveldb.go @@ -17,7 +17,6 @@ import ( "sync" "github.com/syncthing/protocol" - "github.com/syncthing/syncthing/internal/lamport" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" @@ -49,7 +48,7 @@ const ( ) type fileVersion struct { - version int64 + version protocol.Vector device []byte } @@ -242,8 +241,7 @@ func ldbGenericReplace(db *leveldb.DB, folder, device []byte, fs []protocol.File } var ef FileInfoTruncated ef.UnmarshalXDR(dbi.Value()) - if fs[fsi].Version > ef.Version || - (fs[fsi].Version == ef.Version && fs[fsi].Flags != ef.Flags) { + if !fs[fsi].Version.Equal(ef.Version) || fs[fsi].Flags != ef.Flags { if debugDB { l.Debugln("generic replace; differs - insert") } @@ -315,7 +313,7 @@ func ldbReplace(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) i }) } -func ldbReplaceWithDelete(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) int64 { +func ldbReplaceWithDelete(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo, myID uint64) int64 { return ldbGenericReplace(db, folder, device, fs, func(db dbReader, batch dbWriter, folder, device, name []byte, dbi iterator.Iterator) int64 { var tf FileInfoTruncated err := tf.UnmarshalXDR(dbi.Value()) @@ -329,7 +327,7 @@ func ldbReplaceWithDelete(db *leveldb.DB, folder, device []byte, fs []protocol.F ts := clock(tf.LocalVersion) f := protocol.FileInfo{ Name: tf.Name, - Version: lamport.Default.Tick(tf.Version), + Version: tf.Version.Update(myID), LocalVersion: ts, Flags: tf.Flags | protocol.FlagDeleted, Modified: tf.Modified, @@ -394,7 +392,7 @@ func ldbUpdate(db *leveldb.DB, folder, device []byte, fs []protocol.FileInfo) in } // Flags might change without the version being bumped when we set the // invalid flag on an existing file. - if ef.Version != f.Version || ef.Flags != f.Flags { + if !ef.Version.Equal(f.Version) || ef.Flags != f.Flags { if lv := ldbInsert(batch, folder, device, f); lv > maxLocalVer { maxLocalVer = lv } @@ -454,7 +452,7 @@ func ldbInsert(batch dbWriter, folder, device []byte, file protocol.FileInfo) in // ldbUpdateGlobal adds this device+version to the version list for the given // file. If the device is already present in the list, the version is updated. // If the file does not have an entry in the global list, it is created. -func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, version int64) bool { +func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, version protocol.Vector) bool { if debugDB { l.Debugf("update global; folder=%q device=%v file=%q version=%d", folder, protocol.DeviceIDFromBytes(device), file, version) } @@ -465,10 +463,8 @@ func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, v } var fl versionList - nv := fileVersion{ - device: device, - version: version, - } + + // Remove the device from the current version list if svl != nil { err = fl.UnmarshalXDR(svl) if err != nil { @@ -477,7 +473,7 @@ func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, v for i := range fl.versions { if bytes.Compare(fl.versions[i].device, device) == 0 { - if fl.versions[i].version == version { + if fl.versions[i].version.Equal(version) { // No need to do anything return false } @@ -487,8 +483,15 @@ func ldbUpdateGlobal(db dbReader, batch dbWriter, folder, device, file []byte, v } } + nv := fileVersion{ + device: device, + version: version, + } for i := range fl.versions { - if fl.versions[i].version <= version { + // We compare against ConcurrentLesser as well here because we need + // to enforce a consistent ordering of versions even in the case of + // conflicts. + if comp := fl.versions[i].version.Compare(version); comp == protocol.Equal || comp == protocol.Lesser || comp == protocol.ConcurrentLesser { t := append(fl.versions, fileVersion{}) copy(t[i+1:], t[i:]) t[i] = nv @@ -776,7 +779,7 @@ func ldbAvailability(db *leveldb.DB, folder, file []byte) []protocol.DeviceID { var devices []protocol.DeviceID for _, v := range vl.versions { - if v.version != vl.versions[0].version { + if !v.version.Equal(vl.versions[0].version) { break } n := protocol.DeviceIDFromBytes(v.device) @@ -808,7 +811,7 @@ func ldbWithNeed(db *leveldb.DB, folder, device []byte, truncate bool, fn Iterat dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil) defer dbi.Release() -outer: +nextFile: for dbi.Next() { var vl versionList err := vl.UnmarshalXDR(dbi.Value()) @@ -822,12 +825,15 @@ outer: have := false // If we have the file, any version need := false // If we have a lower version of the file - var haveVersion int64 + var haveVersion protocol.Vector for _, v := range vl.versions { if bytes.Compare(v.device, device) == 0 { have = true haveVersion = v.version - need = v.version < vl.versions[0].version + // XXX: This marks Concurrent (i.e. conflicting) changes as + // needs. Maybe we should do that, but it needs special + // handling in the puller. + need = !v.version.GreaterEqual(vl.versions[0].version) break } } @@ -835,11 +841,12 @@ outer: if need || !have { name := globalKeyName(dbi.Key()) needVersion := vl.versions[0].version - inner: + + nextVersion: for i := range vl.versions { - if vl.versions[i].version != needVersion { + if !vl.versions[i].version.Equal(needVersion) { // We haven't found a valid copy of the file with the needed version. - continue outer + continue nextFile } fk := deviceKey(folder, vl.versions[i].device, name) if debugDB { @@ -866,12 +873,12 @@ outer: if gf.IsInvalid() { // The file is marked invalid for whatever reason, don't use it. - continue inner + continue nextVersion } if gf.IsDeleted() && !have { // We don't need deleted files that we don't have - continue outer + continue nextFile } if debugDB { @@ -883,7 +890,7 @@ outer: } // This file is handled, no need to look further in the version list - continue outer + continue nextFile } } } diff --git a/internal/db/leveldb_xdr.go b/internal/db/leveldb_xdr.go index 23339a72a..a3c1ee969 100644 --- a/internal/db/leveldb_xdr.go +++ b/internal/db/leveldb_xdr.go @@ -18,9 +18,9 @@ fileVersion Structure: 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| | -+ version (64 bits) + -| | +/ / +\ Vector Structure \ +/ / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Length of device | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -31,7 +31,7 @@ fileVersion Structure: struct fileVersion { - hyper version; + Vector version; opaque device<>; } @@ -39,7 +39,7 @@ struct fileVersion { func (o fileVersion) EncodeXDR(w io.Writer) (int, error) { var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) + return o.EncodeXDRInto(xw) } func (o fileVersion) MarshalXDR() ([]byte, error) { @@ -57,29 +57,32 @@ func (o fileVersion) MustMarshalXDR() []byte { func (o fileVersion) AppendXDR(bs []byte) ([]byte, error) { var aw = xdr.AppendWriter(bs) var xw = xdr.NewWriter(&aw) - _, err := o.encodeXDR(xw) + _, err := o.EncodeXDRInto(xw) return []byte(aw), err } -func (o fileVersion) encodeXDR(xw *xdr.Writer) (int, error) { - xw.WriteUint64(uint64(o.version)) +func (o fileVersion) EncodeXDRInto(xw *xdr.Writer) (int, error) { + _, err := o.version.EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } xw.WriteBytes(o.device) return xw.Tot(), xw.Error() } func (o *fileVersion) DecodeXDR(r io.Reader) error { xr := xdr.NewReader(r) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } func (o *fileVersion) UnmarshalXDR(bs []byte) error { var br = bytes.NewReader(bs) var xr = xdr.NewReader(br) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } -func (o *fileVersion) decodeXDR(xr *xdr.Reader) error { - o.version = int64(xr.ReadUint64()) +func (o *fileVersion) DecodeXDRFrom(xr *xdr.Reader) error { + (&o.version).DecodeXDRFrom(xr) o.device = xr.ReadBytes() return xr.Error() } @@ -107,7 +110,7 @@ struct versionList { func (o versionList) EncodeXDR(w io.Writer) (int, error) { var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) + return o.EncodeXDRInto(xw) } func (o versionList) MarshalXDR() ([]byte, error) { @@ -125,14 +128,14 @@ func (o versionList) MustMarshalXDR() []byte { func (o versionList) AppendXDR(bs []byte) ([]byte, error) { var aw = xdr.AppendWriter(bs) var xw = xdr.NewWriter(&aw) - _, err := o.encodeXDR(xw) + _, err := o.EncodeXDRInto(xw) return []byte(aw), err } -func (o versionList) encodeXDR(xw *xdr.Writer) (int, error) { +func (o versionList) EncodeXDRInto(xw *xdr.Writer) (int, error) { xw.WriteUint32(uint32(len(o.versions))) for i := range o.versions { - _, err := o.versions[i].encodeXDR(xw) + _, err := o.versions[i].EncodeXDRInto(xw) if err != nil { return xw.Tot(), err } @@ -142,20 +145,20 @@ func (o versionList) encodeXDR(xw *xdr.Writer) (int, error) { func (o *versionList) DecodeXDR(r io.Reader) error { xr := xdr.NewReader(r) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } func (o *versionList) UnmarshalXDR(bs []byte) error { var br = bytes.NewReader(bs) var xr = xdr.NewReader(br) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } -func (o *versionList) decodeXDR(xr *xdr.Reader) error { +func (o *versionList) DecodeXDRFrom(xr *xdr.Reader) error { _versionsSize := int(xr.ReadUint32()) o.versions = make([]fileVersion, _versionsSize) for i := range o.versions { - (&o.versions[i]).decodeXDR(xr) + (&o.versions[i]).DecodeXDRFrom(xr) } return xr.Error() } diff --git a/internal/db/set.go b/internal/db/set.go index dcd73d4d0..b4629ba51 100644 --- a/internal/db/set.go +++ b/internal/db/set.go @@ -16,7 +16,6 @@ import ( "sync" "github.com/syncthing/protocol" - "github.com/syncthing/syncthing/internal/lamport" "github.com/syncthing/syncthing/internal/osutil" "github.com/syndtr/goleveldb/leveldb" ) @@ -61,7 +60,6 @@ func NewFileSet(folder string, db *leveldb.DB) *FileSet { if f.LocalVersion > s.localVersion[deviceID] { s.localVersion[deviceID] = f.LocalVersion } - lamport.Default.Tick(f.Version) return true }) if debug { @@ -90,14 +88,14 @@ func (s *FileSet) Replace(device protocol.DeviceID, fs []protocol.FileInfo) { } } -func (s *FileSet) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo) { +func (s *FileSet) ReplaceWithDelete(device protocol.DeviceID, fs []protocol.FileInfo, myID uint64) { if debug { l.Debugf("%s ReplaceWithDelete(%v, [%d])", s.folder, device, len(fs)) } normalizeFilenames(fs) s.mutex.Lock() defer s.mutex.Unlock() - if lv := ldbReplaceWithDelete(s.db, []byte(s.folder), device[:], fs); lv > s.localVersion[device] { + if lv := ldbReplaceWithDelete(s.db, []byte(s.folder), device[:], fs, myID); lv > s.localVersion[device] { s.localVersion[device] = lv } if device == protocol.LocalDeviceID { @@ -118,7 +116,7 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) { updates := make([]protocol.FileInfo, 0, len(fs)) for _, newFile := range fs { existingFile, ok := ldbGet(s.db, []byte(s.folder), device[:], []byte(newFile.Name)) - if !ok || existingFile.Version <= newFile.Version { + if !ok || !existingFile.Version.Equal(newFile.Version) { discards = append(discards, existingFile) updates = append(updates, newFile) } diff --git a/internal/discover/packets_xdr.go b/internal/discover/packets_xdr.go index 545b46fca..a8c89c7a5 100644 --- a/internal/discover/packets_xdr.go +++ b/internal/discover/packets_xdr.go @@ -37,7 +37,7 @@ struct Query { func (o Query) EncodeXDR(w io.Writer) (int, error) { var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) + return o.EncodeXDRInto(xw) } func (o Query) MarshalXDR() ([]byte, error) { @@ -55,11 +55,11 @@ func (o Query) MustMarshalXDR() []byte { func (o Query) AppendXDR(bs []byte) ([]byte, error) { var aw = xdr.AppendWriter(bs) var xw = xdr.NewWriter(&aw) - _, err := o.encodeXDR(xw) + _, err := o.EncodeXDRInto(xw) return []byte(aw), err } -func (o Query) encodeXDR(xw *xdr.Writer) (int, error) { +func (o Query) EncodeXDRInto(xw *xdr.Writer) (int, error) { xw.WriteUint32(o.Magic) if l := len(o.DeviceID); l > 32 { return xw.Tot(), xdr.ElementSizeExceeded("DeviceID", l, 32) @@ -70,16 +70,16 @@ func (o Query) encodeXDR(xw *xdr.Writer) (int, error) { func (o *Query) DecodeXDR(r io.Reader) error { xr := xdr.NewReader(r) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } func (o *Query) UnmarshalXDR(bs []byte) error { var br = bytes.NewReader(bs) var xr = xdr.NewReader(br) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } -func (o *Query) decodeXDR(xr *xdr.Reader) error { +func (o *Query) DecodeXDRFrom(xr *xdr.Reader) error { o.Magic = xr.ReadUint32() o.DeviceID = xr.ReadBytesMax(32) return xr.Error() @@ -94,7 +94,9 @@ Announce Structure: +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Magic | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| Device | +/ / +\ Device Structure \ +/ / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Number of Extra | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -114,7 +116,7 @@ struct Announce { func (o Announce) EncodeXDR(w io.Writer) (int, error) { var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) + return o.EncodeXDRInto(xw) } func (o Announce) MarshalXDR() ([]byte, error) { @@ -132,13 +134,13 @@ func (o Announce) MustMarshalXDR() []byte { func (o Announce) AppendXDR(bs []byte) ([]byte, error) { var aw = xdr.AppendWriter(bs) var xw = xdr.NewWriter(&aw) - _, err := o.encodeXDR(xw) + _, err := o.EncodeXDRInto(xw) return []byte(aw), err } -func (o Announce) encodeXDR(xw *xdr.Writer) (int, error) { +func (o Announce) EncodeXDRInto(xw *xdr.Writer) (int, error) { xw.WriteUint32(o.Magic) - _, err := o.This.encodeXDR(xw) + _, err := o.This.EncodeXDRInto(xw) if err != nil { return xw.Tot(), err } @@ -147,7 +149,7 @@ func (o Announce) encodeXDR(xw *xdr.Writer) (int, error) { } xw.WriteUint32(uint32(len(o.Extra))) for i := range o.Extra { - _, err := o.Extra[i].encodeXDR(xw) + _, err := o.Extra[i].EncodeXDRInto(xw) if err != nil { return xw.Tot(), err } @@ -157,25 +159,25 @@ func (o Announce) encodeXDR(xw *xdr.Writer) (int, error) { func (o *Announce) DecodeXDR(r io.Reader) error { xr := xdr.NewReader(r) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } func (o *Announce) UnmarshalXDR(bs []byte) error { var br = bytes.NewReader(bs) var xr = xdr.NewReader(br) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } -func (o *Announce) decodeXDR(xr *xdr.Reader) error { +func (o *Announce) DecodeXDRFrom(xr *xdr.Reader) error { o.Magic = xr.ReadUint32() - (&o.This).decodeXDR(xr) + (&o.This).DecodeXDRFrom(xr) _ExtraSize := int(xr.ReadUint32()) if _ExtraSize > 16 { return xdr.ElementSizeExceeded("Extra", _ExtraSize, 16) } o.Extra = make([]Device, _ExtraSize) for i := range o.Extra { - (&o.Extra[i]).decodeXDR(xr) + (&o.Extra[i]).DecodeXDRFrom(xr) } return xr.Error() } @@ -210,7 +212,7 @@ struct Device { func (o Device) EncodeXDR(w io.Writer) (int, error) { var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) + return o.EncodeXDRInto(xw) } func (o Device) MarshalXDR() ([]byte, error) { @@ -228,11 +230,11 @@ func (o Device) MustMarshalXDR() []byte { func (o Device) AppendXDR(bs []byte) ([]byte, error) { var aw = xdr.AppendWriter(bs) var xw = xdr.NewWriter(&aw) - _, err := o.encodeXDR(xw) + _, err := o.EncodeXDRInto(xw) return []byte(aw), err } -func (o Device) encodeXDR(xw *xdr.Writer) (int, error) { +func (o Device) EncodeXDRInto(xw *xdr.Writer) (int, error) { if l := len(o.ID); l > 32 { return xw.Tot(), xdr.ElementSizeExceeded("ID", l, 32) } @@ -242,7 +244,7 @@ func (o Device) encodeXDR(xw *xdr.Writer) (int, error) { } xw.WriteUint32(uint32(len(o.Addresses))) for i := range o.Addresses { - _, err := o.Addresses[i].encodeXDR(xw) + _, err := o.Addresses[i].EncodeXDRInto(xw) if err != nil { return xw.Tot(), err } @@ -252,16 +254,16 @@ func (o Device) encodeXDR(xw *xdr.Writer) (int, error) { func (o *Device) DecodeXDR(r io.Reader) error { xr := xdr.NewReader(r) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } func (o *Device) UnmarshalXDR(bs []byte) error { var br = bytes.NewReader(bs) var xr = xdr.NewReader(br) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } -func (o *Device) decodeXDR(xr *xdr.Reader) error { +func (o *Device) DecodeXDRFrom(xr *xdr.Reader) error { o.ID = xr.ReadBytesMax(32) _AddressesSize := int(xr.ReadUint32()) if _AddressesSize > 16 { @@ -269,7 +271,7 @@ func (o *Device) decodeXDR(xr *xdr.Reader) error { } o.Addresses = make([]Address, _AddressesSize) for i := range o.Addresses { - (&o.Addresses[i]).decodeXDR(xr) + (&o.Addresses[i]).DecodeXDRFrom(xr) } return xr.Error() } @@ -300,7 +302,7 @@ struct Address { func (o Address) EncodeXDR(w io.Writer) (int, error) { var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) + return o.EncodeXDRInto(xw) } func (o Address) MarshalXDR() ([]byte, error) { @@ -318,11 +320,11 @@ func (o Address) MustMarshalXDR() []byte { func (o Address) AppendXDR(bs []byte) ([]byte, error) { var aw = xdr.AppendWriter(bs) var xw = xdr.NewWriter(&aw) - _, err := o.encodeXDR(xw) + _, err := o.EncodeXDRInto(xw) return []byte(aw), err } -func (o Address) encodeXDR(xw *xdr.Writer) (int, error) { +func (o Address) EncodeXDRInto(xw *xdr.Writer) (int, error) { if l := len(o.IP); l > 16 { return xw.Tot(), xdr.ElementSizeExceeded("IP", l, 16) } @@ -333,16 +335,16 @@ func (o Address) encodeXDR(xw *xdr.Writer) (int, error) { func (o *Address) DecodeXDR(r io.Reader) error { xr := xdr.NewReader(r) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } func (o *Address) UnmarshalXDR(bs []byte) error { var br = bytes.NewReader(bs) var xr = xdr.NewReader(br) - return o.decodeXDR(xr) + return o.DecodeXDRFrom(xr) } -func (o *Address) decodeXDR(xr *xdr.Reader) error { +func (o *Address) DecodeXDRFrom(xr *xdr.Reader) error { o.IP = xr.ReadBytesMax(16) o.Port = xr.ReadUint16() return xr.Error() diff --git a/internal/lamport/clock.go b/internal/lamport/clock.go deleted file mode 100644 index be5e50a42..000000000 --- a/internal/lamport/clock.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (C) 2014 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at http://mozilla.org/MPL/2.0/. - -// Package lamport implements a simple Lamport Clock for versioning -package lamport - -import "sync" - -var Default = Clock{} - -type Clock struct { - val int64 - mut sync.Mutex -} - -func (c *Clock) Tick(v int64) int64 { - c.mut.Lock() - if v > c.val { - c.val = v + 1 - c.mut.Unlock() - return v + 1 - } else { - c.val++ - v = c.val - c.mut.Unlock() - return v - } -} diff --git a/internal/lamport/clock_test.go b/internal/lamport/clock_test.go deleted file mode 100644 index 4a2c1cb0b..000000000 --- a/internal/lamport/clock_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (C) 2014 The Syncthing Authors. -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this file, -// You can obtain one at http://mozilla.org/MPL/2.0/. - -package lamport - -import "testing" - -var inputs = []int64{0, 42, 2, 3, 4, 8, 9, 33, 44, 112, 100} - -func TestClock(t *testing.T) { - c := Clock{} - - var prev int64 - for _, input := range inputs { - cur := c.Tick(input) - if cur <= prev || cur <= input { - t.Error("Clock moving backwards") - } - prev = cur - } -} diff --git a/internal/model/model.go b/internal/model/model.go index 4de15fcf8..daf564f53 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -27,7 +27,6 @@ import ( "github.com/syncthing/syncthing/internal/db" "github.com/syncthing/syncthing/internal/events" "github.com/syncthing/syncthing/internal/ignore" - "github.com/syncthing/syncthing/internal/lamport" "github.com/syncthing/syncthing/internal/osutil" "github.com/syncthing/syncthing/internal/scanner" "github.com/syncthing/syncthing/internal/stats" @@ -59,6 +58,8 @@ type Model struct { db *leveldb.DB finder *db.BlockFinder progressEmitter *ProgressEmitter + id protocol.DeviceID + shortID uint64 deviceName string clientName string @@ -93,10 +94,14 @@ var ( // NewModel creates and starts a new model. The model starts in read-only mode, // where it sends index information to connected peers and responds to requests // for file data without altering the local folder in any way. -func NewModel(cfg *config.Wrapper, deviceName, clientName, clientVersion string, ldb *leveldb.DB) *Model { +func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, clientVersion string, ldb *leveldb.DB) *Model { m := &Model{ cfg: cfg, db: ldb, + finder: db.NewBlockFinder(ldb, cfg), + progressEmitter: NewProgressEmitter(cfg), + id: id, + shortID: id.Short(), deviceName: deviceName, clientName: clientName, clientVersion: clientVersion, @@ -111,8 +116,6 @@ func NewModel(cfg *config.Wrapper, deviceName, clientName, clientVersion string, protoConn: make(map[protocol.DeviceID]protocol.Connection), rawConn: make(map[protocol.DeviceID]io.Closer), deviceVer: make(map[protocol.DeviceID]string), - finder: db.NewBlockFinder(ldb, cfg), - progressEmitter: NewProgressEmitter(cfg), } if cfg.Options().ProgressUpdateIntervalS > -1 { go m.progressEmitter.Serve() @@ -443,7 +446,6 @@ func (m *Model) Index(deviceID protocol.DeviceID, folder string, fs []protocol.F } for i := 0; i < len(fs); { - lamport.Default.Tick(fs[i].Version) if fs[i].Flags&^protocol.FlagsAll != 0 { if debug { l.Debugln("dropping update for file with unknown bits set", fs[i]) @@ -492,7 +494,6 @@ func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []prot } for i := 0; i < len(fs); { - lamport.Default.Tick(fs[i].Version) if fs[i].Flags&^protocol.FlagsAll != 0 { if debug { l.Debugln("dropping update for file with unknown bits set", fs[i]) @@ -748,7 +749,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset // ReplaceLocal replaces the local folder index with the given list of files. func (m *Model) ReplaceLocal(folder string, fs []protocol.FileInfo) { m.fmut.RLock() - m.folderFiles[folder].ReplaceWithDelete(protocol.LocalDeviceID, fs) + m.folderFiles[folder].ReplaceWithDelete(protocol.LocalDeviceID, fs, m.shortID) m.fmut.RUnlock() } @@ -1149,6 +1150,7 @@ func (m *Model) ScanFolderSub(folder, sub string) error { IgnorePerms: folderCfg.IgnorePerms, AutoNormalize: folderCfg.AutoNormalize, Hashers: folderCfg.Hashers, + ShortID: m.shortID, } runner.setState(FolderScanning) @@ -1233,7 +1235,7 @@ func (m *Model) ScanFolderSub(folder, sub string) error { Name: f.Name, Flags: f.Flags | protocol.FlagDeleted, Modified: f.Modified, - Version: lamport.Default.Tick(f.Version), + Version: f.Version.Update(m.shortID), } events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{ "folder": folder, @@ -1329,7 +1331,7 @@ func (m *Model) Override(folder string) { // We have the file, replace with our version need = have } - need.Version = lamport.Default.Tick(need.Version) + need.Version = need.Version.Update(m.shortID) need.LocalVersion = 0 batch = append(batch, need) return true diff --git a/internal/scanner/walk.go b/internal/scanner/walk.go index 5ea670757..bba98685a 100644 --- a/internal/scanner/walk.go +++ b/internal/scanner/walk.go @@ -17,7 +17,6 @@ import ( "github.com/syncthing/protocol" "github.com/syncthing/syncthing/internal/ignore" - "github.com/syncthing/syncthing/internal/lamport" "github.com/syncthing/syncthing/internal/symlinks" "golang.org/x/text/unicode/norm" ) @@ -61,6 +60,8 @@ type Walker struct { AutoNormalize bool // Number of routines to use for hashing Hashers int + // Our vector clock id + ShortID uint64 } type TempNamer interface { @@ -203,6 +204,9 @@ func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo) filepath.WalkFun rn = normalizedRn } + var cf protocol.FileInfo + var ok bool + // Index wise symlinks are always files, regardless of what the target // is, because symlinks carry their target path as their content. if info.Mode()&os.ModeSymlink == os.ModeSymlink { @@ -243,7 +247,7 @@ func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo) filepath.WalkFun // - it wasn't invalid // - the symlink type (file/dir) was the same // - the block list (i.e. hash of target) was the same - cf, ok := w.CurrentFiler.CurrentFile(rn) + cf, ok = w.CurrentFiler.CurrentFile(rn) if ok && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && SymlinkTypeEqual(flags, cf.Flags) && BlocksEqual(cf.Blocks, blocks) { return skip } @@ -251,7 +255,7 @@ func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo) filepath.WalkFun f := protocol.FileInfo{ Name: rn, - Version: lamport.Default.Tick(0), + Version: cf.Version.Update(w.ShortID), Flags: protocol.FlagSymlink | flags | protocol.FlagNoPermBits | 0666, Modified: 0, Blocks: blocks, @@ -275,7 +279,7 @@ func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo) filepath.WalkFun // - was a directory previously (not a file or something else) // - was not a symlink (since it's a directory now) // - was not invalid (since it looks valid now) - cf, ok := w.CurrentFiler.CurrentFile(rn) + cf, ok = w.CurrentFiler.CurrentFile(rn) permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Flags, uint32(info.Mode())) if ok && permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() { return nil @@ -290,7 +294,7 @@ func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo) filepath.WalkFun } f := protocol.FileInfo{ Name: rn, - Version: lamport.Default.Tick(0), + Version: cf.Version.Update(w.ShortID), Flags: flags, Modified: info.ModTime().Unix(), } @@ -312,7 +316,7 @@ func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo) filepath.WalkFun // - was not a symlink (since it's a file now) // - was not invalid (since it looks valid now) // - has the same size as previously - cf, ok := w.CurrentFiler.CurrentFile(rn) + cf, ok = w.CurrentFiler.CurrentFile(rn) permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Flags, uint32(info.Mode())) if ok && permUnchanged && !cf.IsDeleted() && cf.Modified == info.ModTime().Unix() && !cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() && cf.Size() == info.Size() { @@ -331,7 +335,7 @@ func (w *Walker) walkAndHashFiles(fchan chan protocol.FileInfo) filepath.WalkFun f := protocol.FileInfo{ Name: rn, - Version: lamport.Default.Tick(0), + Version: cf.Version.Update(w.ShortID), Flags: flags, Modified: info.ModTime().Unix(), }