diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index 0f510928f..a06f22c64 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -180,7 +180,7 @@ func restGetModelVersion(m *model.Model, w http.ResponseWriter, r *http.Request) var repo = qs.Get("repo") var res = make(map[string]interface{}) - res["version"] = m.Version(repo) + res["version"] = m.LocalVersion(repo) w.Header().Set("Content-Type", "application/json; charset=utf-8") json.NewEncoder(w).Encode(res) @@ -210,7 +210,7 @@ func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) { res["inSyncFiles"], res["inSyncBytes"] = globalFiles-needFiles, globalBytes-needBytes res["state"] = m.State(repo) - res["version"] = m.Version(repo) + res["version"] = m.LocalVersion(repo) w.Header().Set("Content-Type", "application/json; charset=utf-8") json.NewEncoder(w).Encode(res) diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 5827c251a..8a2f9bbc6 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -290,11 +290,6 @@ func main() { rateBucket = ratelimit.NewBucketWithRate(float64(1000*cfg.Options.MaxSendKbps), int64(5*1000*cfg.Options.MaxSendKbps)) } - havePersistentIndex := false - if fi, err := os.Stat(filepath.Join(confDir, "index")); err == nil && fi.IsDir() { - havePersistentIndex = true - } - db, err := leveldb.OpenFile(filepath.Join(confDir, "index"), nil) if err != nil { l.Fatalln("leveldb.OpenFile():", err) @@ -364,11 +359,6 @@ nextRepo: // Walk the repository and update the local model before establishing any // connections to other nodes. - if !havePersistentIndex { - // There's no new style index, load old ones - l.Infoln("Loading legacy index files") - m.LoadIndexes(confDir) - } m.CleanRepos() l.Infoln("Performing initial repository scan") m.ScanRepos() @@ -645,9 +635,10 @@ next: if rateBucket != nil { wr = &limitedWriter{conn, rateBucket} } - protoConn := protocol.NewConnection(remoteID, conn, wr, m) + name := fmt.Sprintf("%s-%s", conn.LocalAddr(), conn.RemoteAddr()) + protoConn := protocol.NewConnection(remoteID, conn, wr, m, name) - l.Infof("Established secure connection to %s at %v", remoteID, conn.RemoteAddr()) + l.Infof("Established secure connection to %s at %s", remoteID, name) if debugNet { l.Debugf("cipher suite %04X", conn.ConnectionState().CipherSuite) } diff --git a/files/leveldb.go b/files/leveldb.go index 9ff5a6301..ce4b0e40b 100644 --- a/files/leveldb.go +++ b/files/leveldb.go @@ -3,6 +3,7 @@ package files import ( "bytes" "sort" + "sync" "github.com/calmh/syncthing/lamport" "github.com/calmh/syncthing/protocol" @@ -12,6 +13,22 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +var ( + clockTick uint64 + clockMut sync.Mutex +) + +func clock(v uint64) uint64 { + clockMut.Lock() + defer clockMut.Unlock() + if v > clockTick { + clockTick = v + 1 + } else { + clockTick++ + } + return clockTick +} + const ( keyTypeNode = iota keyTypeGlobal @@ -91,11 +108,11 @@ func globalKeyName(key []byte) []byte { return key[1+64:] } -type deletionHandler func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) bool +type deletionHandler func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 type fileIterator func(f protocol.FileInfo) bool -func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo, deleteFn deletionHandler) bool { +func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo, deleteFn deletionHandler) uint64 { sort.Sort(fileList(fs)) // sort list on name, same as on disk start := nodeKey(repo, node, nil) // before all repo/node files @@ -112,7 +129,7 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo moreDb := dbi.Next() fsi := 0 - changed := false + var maxLocalVer uint64 for { var newName, oldName []byte @@ -144,9 +161,10 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo switch { case moreFs && (!moreDb || cmp == -1): - changed = true // Disk is missing this file. Insert it. - ldbInsert(batch, repo, node, newName, fs[fsi]) + if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer { + maxLocalVer = lv + } ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version) fsi++ @@ -155,9 +173,10 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo var ef protocol.FileInfo ef.UnmarshalXDR(dbi.Value()) if fs[fsi].Version > ef.Version { - ldbInsert(batch, repo, node, newName, fs[fsi]) + if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer { + maxLocalVer = lv + } ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version) - changed = true } // Iterate both sides. fsi++ @@ -165,8 +184,8 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo case moreDb && (!moreFs || cmp == 1): if deleteFn != nil { - if deleteFn(snap, batch, repo, node, oldName, dbi) { - changed = true + if lv := deleteFn(snap, batch, repo, node, oldName, dbi); lv > maxLocalVer { + maxLocalVer = lv } } moreDb = dbi.Next() @@ -178,23 +197,24 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo panic(err) } - return changed + return maxLocalVer } -func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool { - return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) bool { +func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 { + // TODO: Return the remaining maxLocalVer? + return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 { // Disk has files that we are missing. Remove it. if debug { l.Debugf("delete; repo=%q node=%x name=%q", repo, node, name) } batch.Delete(dbi.Key()) ldbRemoveFromGlobal(db, batch, repo, node, name) - return true + return 0 }) } -func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool { - return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) bool { +func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 { + return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 { var f protocol.FileInfo err := f.UnmarshalXDR(dbi.Value()) if err != nil { @@ -204,18 +224,20 @@ func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileI if debug { l.Debugf("mark deleted; repo=%q node=%x name=%q", repo, node, name) } + ts := clock(f.LocalVersion) f.Blocks = nil f.Version = lamport.Default.Tick(f.Version) f.Flags |= protocol.FlagDeleted + f.LocalVersion = ts batch.Put(dbi.Key(), f.MarshalXDR()) ldbUpdateGlobal(db, batch, repo, node, nodeKeyName(dbi.Key()), f.Version) - return true + return ts } - return false + return 0 }) } -func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool { +func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 { batch := new(leveldb.Batch) snap, err := db.GetSnapshot() if err != nil { @@ -223,12 +245,15 @@ func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool { } defer snap.Release() + var maxLocalVer uint64 for _, f := range fs { name := []byte(f.Name) fk := nodeKey(repo, node, name) bs, err := snap.Get(fk, nil) if err == leveldb.ErrNotFound { - ldbInsert(batch, repo, node, name, f) + if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer { + maxLocalVer = lv + } ldbUpdateGlobal(snap, batch, repo, node, name, f.Version) continue } @@ -239,7 +264,9 @@ func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool { panic(err) } if ef.Version != f.Version { - ldbInsert(batch, repo, node, name, f) + if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer { + maxLocalVer = lv + } ldbUpdateGlobal(snap, batch, repo, node, name, f.Version) } } @@ -249,16 +276,22 @@ func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool { panic(err) } - return true + return maxLocalVer } -func ldbInsert(batch dbWriter, repo, node, name []byte, file protocol.FileInfo) { +func ldbInsert(batch dbWriter, repo, node, name []byte, file protocol.FileInfo) uint64 { if debug { l.Debugf("insert; repo=%q node=%x %v", repo, node, file) } + if file.LocalVersion == 0 { + file.LocalVersion = clock(0) + } + nk := nodeKey(repo, node, name) batch.Put(nk, file.MarshalXDR()) + + return file.LocalVersion } // ldbUpdateGlobal adds this node+version to the version list for the given diff --git a/files/set.go b/files/set.go index a2330a6ea..11941dd8d 100644 --- a/files/set.go +++ b/files/set.go @@ -21,18 +21,29 @@ type fileRecord struct { type bitset uint64 type Set struct { - changes map[protocol.NodeID]uint64 - mutex sync.Mutex - repo string - db *leveldb.DB + localVersion map[protocol.NodeID]uint64 + mutex sync.Mutex + repo string + db *leveldb.DB } func NewSet(repo string, db *leveldb.DB) *Set { var s = Set{ - changes: make(map[protocol.NodeID]uint64), - repo: repo, - db: db, + localVersion: make(map[protocol.NodeID]uint64), + repo: repo, + db: db, } + + var lv uint64 + ldbWithHave(db, []byte(repo), protocol.LocalNodeID[:], func(f protocol.FileInfo) bool { + if f.LocalVersion > lv { + lv = f.LocalVersion + } + return true + }) + s.localVersion[protocol.LocalNodeID] = lv + clock(lv) + return &s } @@ -42,8 +53,8 @@ func (s *Set) Replace(node protocol.NodeID, fs []protocol.FileInfo) { } s.mutex.Lock() defer s.mutex.Unlock() - if ldbReplace(s.db, []byte(s.repo), node[:], fs) { - s.changes[node]++ + if lv := ldbReplace(s.db, []byte(s.repo), node[:], fs); lv > s.localVersion[node] { + s.localVersion[node] = lv } } @@ -53,8 +64,8 @@ func (s *Set) ReplaceWithDelete(node protocol.NodeID, fs []protocol.FileInfo) { } s.mutex.Lock() defer s.mutex.Unlock() - if ldbReplaceWithDelete(s.db, []byte(s.repo), node[:], fs) { - s.changes[node]++ + if lv := ldbReplaceWithDelete(s.db, []byte(s.repo), node[:], fs); lv > s.localVersion[node] { + s.localVersion[node] = lv } } @@ -64,8 +75,8 @@ func (s *Set) Update(node protocol.NodeID, fs []protocol.FileInfo) { } s.mutex.Lock() defer s.mutex.Unlock() - if ldbUpdate(s.db, []byte(s.repo), node[:], fs) { - s.changes[node]++ + if lv := ldbUpdate(s.db, []byte(s.repo), node[:], fs); lv > s.localVersion[node] { + s.localVersion[node] = lv } } @@ -102,8 +113,8 @@ func (s *Set) Availability(file string) []protocol.NodeID { return ldbAvailability(s.db, []byte(s.repo), []byte(file)) } -func (s *Set) Changes(node protocol.NodeID) uint64 { +func (s *Set) LocalVersion(node protocol.NodeID) uint64 { s.mutex.Lock() defer s.mutex.Unlock() - return s.changes[node] + return s.localVersion[node] } diff --git a/files/set_test.go b/files/set_test.go index 7eef793c0..90eacdf8c 100644 --- a/files/set_test.go +++ b/files/set_test.go @@ -554,7 +554,7 @@ func TestNeed(t *testing.T) { } } -func TestChanges(t *testing.T) { +func TestLocalVersion(t *testing.T) { db, err := leveldb.Open(storage.NewMemStorage(), nil) if err != nil { t.Fatal(err) @@ -578,17 +578,17 @@ func TestChanges(t *testing.T) { } m.ReplaceWithDelete(protocol.LocalNodeID, local1) - c0 := m.Changes(protocol.LocalNodeID) + c0 := m.LocalVersion(protocol.LocalNodeID) m.ReplaceWithDelete(protocol.LocalNodeID, local2) - c1 := m.Changes(protocol.LocalNodeID) + c1 := m.LocalVersion(protocol.LocalNodeID) if !(c1 > c0) { - t.Fatal("Change number should have incremented") + t.Fatal("Local version number should have incremented") } m.ReplaceWithDelete(protocol.LocalNodeID, local2) - c2 := m.Changes(protocol.LocalNodeID) + c2 := m.LocalVersion(protocol.LocalNodeID) if c2 != c1 { - t.Fatal("Change number should be unchanged") + t.Fatal("Local version number should be unchanged") } } diff --git a/files/sort.go b/files/sort.go index e8002dfff..34f4e5835 100644 --- a/files/sort.go +++ b/files/sort.go @@ -1,8 +1,8 @@ package files import ( - "sort" "github.com/calmh/syncthing/protocol" + "sort" ) type SortBy func(p protocol.FileInfo) int diff --git a/integration/test-delupd.sh b/integration/test-delupd.sh index 104606ed4..7d18522e7 100755 --- a/integration/test-delupd.sh +++ b/integration/test-delupd.sh @@ -112,6 +112,9 @@ alterFiles() { done pkill -CONT syncthing + + echo "Restarting instance 2" + curl -HX-API-Key:abc123 -X POST "http://localhost:8082/rest/restart" } rm -rf h?/*.idx.gz h?/index diff --git a/luhn/luhn.go b/luhn/luhn.go index bc4e15b36..fe90c29a6 100644 --- a/luhn/luhn.go +++ b/luhn/luhn.go @@ -17,8 +17,8 @@ var ( // Generate returns a check digit for the string s, which should be composed // of characters from the Alphabet a. func (a Alphabet) Generate(s string) (rune, error) { - if err:=a.check();err!=nil{ - return 0,err + if err := a.check(); err != nil { + return 0, err } factor := 1 diff --git a/model/model.go b/model/model.go index eef00af6c..3f075d3ee 100644 --- a/model/model.go +++ b/model/model.go @@ -5,8 +5,6 @@ package model import ( - "compress/gzip" - "crypto/sha1" "errors" "fmt" "io" @@ -21,7 +19,6 @@ import ( "github.com/calmh/syncthing/events" "github.com/calmh/syncthing/files" "github.com/calmh/syncthing/lamport" - "github.com/calmh/syncthing/osutil" "github.com/calmh/syncthing/protocol" "github.com/calmh/syncthing/scanner" "github.com/syndtr/goleveldb/leveldb" @@ -42,6 +39,9 @@ const ( // transfer to bring the systems into synchronization. const zeroEntrySize = 128 +// How many files to send in each Index/IndexUpdate message. +const indexBatchSize = 1000 + type Model struct { indexDir string cfg *config.Configuration @@ -65,6 +65,9 @@ type Model struct { nodeVer map[protocol.NodeID]string pmut sync.RWMutex // protects protoConn and rawConn + sentLocalVer map[protocol.NodeID]map[string]uint64 + slMut sync.Mutex + sup suppressor addedRepo bool @@ -95,6 +98,7 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers protoConn: make(map[protocol.NodeID]protocol.Connection), rawConn: make(map[protocol.NodeID]io.Closer), nodeVer: make(map[protocol.NodeID]string), + sentLocalVer: make(map[protocol.NodeID]map[string]uint64), sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)}, } @@ -108,7 +112,6 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second) deadlockDetect(&m.smut, time.Duration(timeout)*time.Second) deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second) - go m.broadcastIndexLoop() return m } @@ -392,13 +395,13 @@ func (m *Model) Close(node protocol.NodeID, err error) { "error": err.Error(), }) + m.pmut.Lock() m.rmut.RLock() for _, repo := range m.nodeRepos[node] { m.repoFiles[repo].Replace(node, nil) } m.rmut.RUnlock() - m.pmut.Lock() conn, ok := m.rawConn[node] if ok { conn.Close() @@ -502,6 +505,7 @@ func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool { // repository changes. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) { nodeID := protoConn.ID() + m.pmut.Lock() if _, ok := m.protoConn[nodeID]; ok { panic("add existing node") @@ -511,48 +515,99 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) panic("add existing node") } m.rawConn[nodeID] = rawConn - m.pmut.Unlock() cm := m.clusterConfig(nodeID) protoConn.ClusterConfig(cm) - var idxToSend = make(map[string][]protocol.FileInfo) - m.rmut.RLock() for _, repo := range m.nodeRepos[nodeID] { - idxToSend[repo] = m.protocolIndex(repo) + fs := m.repoFiles[repo] + go sendIndexes(protoConn, repo, fs) } m.rmut.RUnlock() - - go func() { - for repo, idx := range idxToSend { - if debug { - l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx)) - } - const batchSize = 1000 - for i := 0; i < len(idx); i += batchSize { - if len(idx[i:]) < batchSize { - protoConn.Index(repo, idx[i:]) - } else { - protoConn.Index(repo, idx[i:i+batchSize]) - } - } - } - }() + m.pmut.Unlock() } -// protocolIndex returns the current local index in protocol data types. -func (m *Model) protocolIndex(repo string) []protocol.FileInfo { - var fs []protocol.FileInfo - m.repoFiles[repo].WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool { - fs = append(fs, f) - return true - }) +func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) { + nodeID := conn.ID() + name := conn.Name() - return fs + if debug { + l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo) + } + + initial := true + minLocalVer := uint64(0) + var err error + + defer func() { + if debug { + l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err) + } + }() + + for err == nil { + if !initial && fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer { + time.Sleep(1 * time.Second) + continue + } + + batch := make([]protocol.FileInfo, 0, indexBatchSize) + maxLocalVer := uint64(0) + + fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool { + if f.LocalVersion <= minLocalVer { + return true + } + + if f.LocalVersion > maxLocalVer { + maxLocalVer = f.LocalVersion + } + + if len(batch) == indexBatchSize { + if initial { + if err = conn.Index(repo, batch); err != nil { + return false + } + if debug { + l.Debugf("sendIndexes for %s-%s/%q: %d files (initial index)", nodeID, name, repo, len(batch)) + } + initial = false + } else { + if err = conn.IndexUpdate(repo, batch); err != nil { + return false + } + if debug { + l.Debugf("sendIndexes for %s-%s/%q: %d files (batched update)", nodeID, name, repo, len(batch)) + } + } + + batch = make([]protocol.FileInfo, 0, indexBatchSize) + } + + batch = append(batch, f) + return true + }) + + if initial { + err = conn.Index(repo, batch) + if debug && err == nil { + l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch)) + } + initial = false + } else if len(batch) > 0 { + err = conn.IndexUpdate(repo, batch) + if debug && err == nil { + l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch)) + } + } + + minLocalVer = maxLocalVer + } } func (m *Model) updateLocal(repo string, f protocol.FileInfo) { + f.LocalVersion = 0 m.rmut.RLock() m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f}) m.rmut.RUnlock() @@ -575,49 +630,6 @@ func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset return nc.Request(repo, name, offset, size) } -func (m *Model) broadcastIndexLoop() { - // TODO: Rewrite to send index in segments - var lastChange = map[string]uint64{} - for { - time.Sleep(5 * time.Second) - - m.pmut.RLock() - m.rmut.RLock() - - var indexWg sync.WaitGroup - for repo, fs := range m.repoFiles { - repo := repo - - c := fs.Changes(protocol.LocalNodeID) - if c == lastChange[repo] { - continue - } - lastChange[repo] = c - - idx := m.protocolIndex(repo) - - for _, nodeID := range m.repoNodes[repo] { - nodeID := nodeID - if conn, ok := m.protoConn[nodeID]; ok { - indexWg.Add(1) - if debug { - l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx)) - } - go func() { - conn.Index(repo, idx) - indexWg.Done() - }() - } - } - } - - m.rmut.RUnlock() - m.pmut.RUnlock() - - indexWg.Wait() - } -} - func (m *Model) AddRepo(cfg config.RepositoryConfiguration) { if m.started { panic("cannot add repo to started model") @@ -709,88 +721,6 @@ func (m *Model) ScanRepo(repo string) error { return nil } -func (m *Model) LoadIndexes(dir string) { - m.rmut.RLock() - for repo := range m.repoCfgs { - fs := m.loadIndex(repo, dir) - - var sfs = make([]protocol.FileInfo, len(fs)) - for i := 0; i < len(fs); i++ { - lamport.Default.Tick(fs[i].Version) - fs[i].Flags &= ^uint32(protocol.FlagInvalid) // we might have saved an index with files that were suppressed; the should not be on startup - } - - m.repoFiles[repo].Replace(protocol.LocalNodeID, sfs) - } - m.rmut.RUnlock() -} - -func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) error { - id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory))) - name := id + ".idx.gz" - name = filepath.Join(dir, name) - tmp := fmt.Sprintf("%s.tmp.%d", name, time.Now().UnixNano()) - idxf, err := os.OpenFile(tmp, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0644) - if err != nil { - return err - } - defer os.Remove(tmp) - - gzw := gzip.NewWriter(idxf) - - n, err := protocol.IndexMessage{ - Repository: repo, - Files: fs, - }.EncodeXDR(gzw) - if err != nil { - gzw.Close() - idxf.Close() - return err - } - - err = gzw.Close() - if err != nil { - return err - } - - err = idxf.Close() - if err != nil { - return err - } - - if debug { - l.Debugln("wrote index,", n, "bytes uncompressed") - } - - return osutil.Rename(tmp, name) -} - -func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo { - id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory))) - name := id + ".idx.gz" - name = filepath.Join(dir, name) - - idxf, err := os.Open(name) - if err != nil { - return nil - } - defer idxf.Close() - - gzr, err := gzip.NewReader(idxf) - if err != nil { - return nil - } - defer gzr.Close() - - var im protocol.IndexMessage - err = im.DecodeXDR(gzr) - if err != nil || im.Repository != repo { - return nil - } - - return im.Files -} - // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage { cm := protocol.ClusterConfigMessage{ @@ -868,12 +798,12 @@ func (m *Model) Override(repo string) { // Version returns the change version for the given repository. This is // guaranteed to increment if the contents of the local or global repository // has changed. -func (m *Model) Version(repo string) uint64 { +func (m *Model) LocalVersion(repo string) uint64 { var ver uint64 m.rmut.Lock() for _, n := range m.repoNodes[repo] { - ver += m.repoFiles[repo].Changes(n) + ver += m.repoFiles[repo].LocalVersion(n) } m.rmut.Unlock() diff --git a/model/puller.go b/model/puller.go index d5e28f380..8cdfb25d4 100644 --- a/model/puller.go +++ b/model/puller.go @@ -193,7 +193,7 @@ func (p *puller) run() { default: } - if v := p.model.Version(p.repoCfg.ID); v > prevVer { + if v := p.model.LocalVersion(p.repoCfg.ID); v > prevVer { // Queue more blocks to fetch, if any p.queueNeededBlocks() prevVer = v @@ -335,15 +335,21 @@ func (p *puller) handleRequestResult(res requestResult) { return } - _, of.err = of.file.WriteAt(res.data, res.offset) + if res.err != nil { + of.err = res.err + if debug { + l.Debugf("pull: not writing %q / %q offset %d: %v", p.repoCfg.ID, f.Name, res.offset, res.err) + } + } else { + _, of.err = of.file.WriteAt(res.data, res.offset) + if debug { + l.Debugf("pull: wrote %q / %q offset %d len %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, len(res.data), of.outstanding, of.done) + } + } of.outstanding-- p.openFiles[f.Name] = of - if debug { - l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, of.outstanding, of.done) - } - if of.done && of.outstanding == 0 { p.closeFile(f) } @@ -526,7 +532,7 @@ func (p *puller) handleRequestBlock(b bqBlock) bool { } node := p.oustandingPerNode.leastBusyNode(of.availability) - if len(node) == 0 { + if node == (protocol.NodeID{}) { of.err = errNoNode if of.file != nil { of.file.Close() @@ -662,7 +668,7 @@ func (p *puller) closeFile(f protocol.FileInfo) { for i := range hb { if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 { - l.Debugf("pull: %q / %q: block %d hash mismatch", p.repoCfg.ID, f.Name, i) + l.Debugf("pull: %q / %q: block %d hash mismatch\n\thave: %x\n\twant: %x", p.repoCfg.ID, f.Name, i, hb[i].Hash, f.Blocks[i].Hash) return } } diff --git a/protocol/PROTOCOL.md b/protocol/PROTOCOL.md index adefa0771..e65f0883f 100644 --- a/protocol/PROTOCOL.md +++ b/protocol/PROTOCOL.md @@ -182,7 +182,7 @@ Cluster Config messages MUST NOT be sent after the initial exchange. | Flags | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | - + Max Version (64 bits) + + + Max Local Version (64 bits) + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -255,13 +255,13 @@ The Node Flags field contains the following single bit flags: Exactly one of the T, R or S bits MUST be set. -The Node Max Version field contains the highest file version number of -the files already known to be in the index sent by this node. If nothing -is known about the index of a given node, this field MUST be set to -zero. When receiving a Cluster Config message with a non-zero Max +The Node Max Local Version field contains the highest local file version +number of the files already known to be in the index sent by this node. +If nothing is known about the index of a given node, this field MUST be +set to zero. When receiving a Cluster Config message with a non-zero Max Version for the local node ID, a node MAY elect to send an Index Update -message containing only files with higher version numbers in place of -the initial Index message. +message containing only files with higher local version numbers in place +of the initial Index message. The Options field contain option values to be used in an implementation specific manner. The options list is conceptually a map of Key => Value @@ -292,7 +292,7 @@ peers acting in a specific manner as a result of sent options. struct Node { string ID<>; unsigned int Flags; - unsigned hyper MaxVersion; + unsigned hyper MaxLocalVersion; } struct Option { @@ -359,6 +359,10 @@ Index message MUST be sent. There is no response to the Index message. + Version (64 bits) + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + + Local Version (64 bits) + + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Number of Blocks | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / / @@ -400,6 +404,10 @@ detected and received change. The combination of Repository, Name and Version uniquely identifies the contents of a file at a given point in time. +The Local Version field is the value of a node local monotonic clock at +the time of last local database update to a file. The clock ticks on +every local database update. + The Flags field is made up of the following single bit flags: 0 1 2 3 @@ -458,6 +466,7 @@ block which may represent a smaller amount of data. unsigned int Flags; hyper Modified; unsigned hyper Version; + unsigned hyper LocalVer; BlockInfo Blocks<>; } diff --git a/protocol/message.go b/protocol/message.go index c080a4be3..4a546d0eb 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -12,11 +12,12 @@ type IndexMessage struct { } type FileInfo struct { - Name string // max:1024 - Flags uint32 - Modified int64 - Version uint64 - Blocks []BlockInfo // max:1000000 + Name string // max:1024 + Flags uint32 + Modified int64 + Version uint64 + LocalVersion uint64 + Blocks []BlockInfo // max:1000000 } func (f FileInfo) String() string { @@ -69,9 +70,9 @@ type Repository struct { } type Node struct { - ID []byte // max:32 - Flags uint32 - MaxVersion uint64 + ID []byte // max:32 + Flags uint32 + MaxLocalVersion uint64 } type Option struct { diff --git a/protocol/message_xdr.go b/protocol/message_xdr.go index dc5180939..83367bacc 100644 --- a/protocol/message_xdr.go +++ b/protocol/message_xdr.go @@ -121,6 +121,10 @@ FileInfo Structure: + Version (64 bits) + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Local Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Number of Blocks | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / / @@ -134,6 +138,7 @@ struct FileInfo { unsigned int Flags; hyper Modified; unsigned hyper Version; + unsigned hyper LocalVersion; BlockInfo Blocks<1000000>; } @@ -163,6 +168,7 @@ func (o FileInfo) encodeXDR(xw *xdr.Writer) (int, error) { xw.WriteUint32(o.Flags) xw.WriteUint64(uint64(o.Modified)) xw.WriteUint64(o.Version) + xw.WriteUint64(o.LocalVersion) if len(o.Blocks) > 1000000 { return xw.Tot(), xdr.ErrElementSizeExceeded } @@ -189,6 +195,7 @@ func (o *FileInfo) decodeXDR(xr *xdr.Reader) error { o.Flags = xr.ReadUint32() o.Modified = int64(xr.ReadUint64()) o.Version = xr.ReadUint64() + o.LocalVersion = xr.ReadUint64() _BlocksSize := int(xr.ReadUint32()) if _BlocksSize > 1000000 { return xdr.ErrElementSizeExceeded @@ -567,7 +574,7 @@ Node Structure: | Flags | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | -+ Max Version (64 bits) + ++ Max Local Version (64 bits) + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -575,7 +582,7 @@ Node Structure: struct Node { opaque ID<32>; unsigned int Flags; - unsigned hyper MaxVersion; + unsigned hyper MaxLocalVersion; } */ @@ -602,7 +609,7 @@ func (o Node) encodeXDR(xw *xdr.Writer) (int, error) { } xw.WriteBytes(o.ID) xw.WriteUint32(o.Flags) - xw.WriteUint64(o.MaxVersion) + xw.WriteUint64(o.MaxLocalVersion) return xw.Tot(), xw.Error() } @@ -620,7 +627,7 @@ func (o *Node) UnmarshalXDR(bs []byte) error { func (o *Node) decodeXDR(xr *xdr.Reader) error { o.ID = xr.ReadBytesMax(32) o.Flags = xr.ReadUint32() - o.MaxVersion = xr.ReadUint64() + o.MaxLocalVersion = xr.ReadUint64() return xr.Error() } diff --git a/protocol/protocol.go b/protocol/protocol.go index 8f1ed1a08..1e131740b 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -66,7 +66,9 @@ type Model interface { type Connection interface { ID() NodeID - Index(repo string, files []FileInfo) + Name() string + Index(repo string, files []FileInfo) error + IndexUpdate(repo string, files []FileInfo) error Request(repo string, name string, offset int64, size int) ([]byte, error) ClusterConfig(config ClusterConfigMessage) Statistics() Statistics @@ -74,6 +76,7 @@ type Connection interface { type rawConnection struct { id NodeID + name string receiver Model state int @@ -87,8 +90,7 @@ type rawConnection struct { awaiting []chan asyncResult awaitingMut sync.Mutex - idxSent map[string]map[string]uint64 - idxMut sync.Mutex // ensures serialization of Index calls + idxMut sync.Mutex // ensures serialization of Index calls nextID chan int outbox chan []encodable @@ -106,15 +108,16 @@ const ( pingIdleTime = 60 * time.Second ) -func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model) Connection { +func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string) Connection { cr := &countingReader{Reader: reader} cw := &countingWriter{Writer: writer} rb := bufio.NewReader(cr) - wb := bufio.NewWriter(cw) + wb := bufio.NewWriterSize(cw, 65536) c := rawConnection{ id: nodeID, + name: name, receiver: nativeModel{receiver}, state: stateInitial, cr: cr, @@ -123,7 +126,6 @@ func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver M wb: wb, xw: xdr.NewWriter(wb), awaiting: make([]chan asyncResult, 0x1000), - idxSent: make(map[string]map[string]uint64), outbox: make(chan []encodable), nextID: make(chan int), closed: make(chan struct{}), @@ -142,36 +144,34 @@ func (c *rawConnection) ID() NodeID { return c.id } +func (c *rawConnection) Name() string { + return c.name +} + // Index writes the list of file information to the connected peer node -func (c *rawConnection) Index(repo string, idx []FileInfo) { +func (c *rawConnection) Index(repo string, idx []FileInfo) error { + select { + case <-c.closed: + return ErrClosed + default: + } c.idxMut.Lock() - defer c.idxMut.Unlock() + c.send(header{0, -1, messageTypeIndex}, IndexMessage{repo, idx}) + c.idxMut.Unlock() + return nil +} - var msgType int - if c.idxSent[repo] == nil { - // This is the first time we send an index. - msgType = messageTypeIndex - - c.idxSent[repo] = make(map[string]uint64) - for _, f := range idx { - c.idxSent[repo][f.Name] = f.Version - } - } else { - // We have sent one full index. Only send updates now. - msgType = messageTypeIndexUpdate - var diff []FileInfo - for _, f := range idx { - if vs, ok := c.idxSent[repo][f.Name]; !ok || f.Version != vs { - diff = append(diff, f) - c.idxSent[repo][f.Name] = f.Version - } - } - idx = diff - } - - if msgType == messageTypeIndex || len(idx) > 0 { - c.send(header{0, -1, msgType}, IndexMessage{repo, idx}) +// IndexUpdate writes the list of file information to the connected peer node as an update +func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error { + select { + case <-c.closed: + return ErrClosed + default: } + c.idxMut.Lock() + c.send(header{0, -1, messageTypeIndexUpdate}, IndexMessage{repo, idx}) + c.idxMut.Unlock() + return nil } // Request returns the bytes for the specified block after fetching them from the connected peer. @@ -445,15 +445,13 @@ func (c *rawConnection) send(h header, es ...encodable) bool { } func (c *rawConnection) writerLoop() { - var err error for { select { case es := <-c.outbox: for _, e := range es { e.encodeXDR(c.xw) } - - if err = c.flush(); err != nil { + if err := c.flush(); err != nil { c.close(err) return } @@ -471,11 +469,9 @@ func (c *rawConnection) flush() error { if err := c.xw.Error(); err != nil { return err } - if err := c.wb.Flush(); err != nil { return err } - return nil } diff --git a/protocol/wireformat.go b/protocol/wireformat.go index 605eaf861..987c03eff 100644 --- a/protocol/wireformat.go +++ b/protocol/wireformat.go @@ -18,7 +18,11 @@ func (c wireFormatConnection) ID() NodeID { return c.next.ID() } -func (c wireFormatConnection) Index(repo string, fs []FileInfo) { +func (c wireFormatConnection) Name() string { + return c.next.Name() +} + +func (c wireFormatConnection) Index(repo string, fs []FileInfo) error { var myFs = make([]FileInfo, len(fs)) copy(myFs, fs) @@ -26,7 +30,18 @@ func (c wireFormatConnection) Index(repo string, fs []FileInfo) { myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) } - c.next.Index(repo, myFs) + return c.next.Index(repo, myFs) +} + +func (c wireFormatConnection) IndexUpdate(repo string, fs []FileInfo) error { + var myFs = make([]FileInfo, len(fs)) + copy(myFs, fs) + + for i := range fs { + myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) + } + + return c.next.IndexUpdate(repo, myFs) } func (c wireFormatConnection) Request(repo, name string, offset int64, size int) ([]byte, error) { diff --git a/scanner/walk.go b/scanner/walk.go index 374b29ae9..cc4725395 100644 --- a/scanner/walk.go +++ b/scanner/walk.go @@ -6,6 +6,7 @@ package scanner import ( "bytes" + "code.google.com/p/go.text/unicode/norm" "errors" "fmt" "io/ioutil" @@ -14,7 +15,6 @@ import ( "runtime" "strings" "time" - "code.google.com/p/go.text/unicode/norm" "github.com/calmh/syncthing/lamport" "github.com/calmh/syncthing/protocol" @@ -216,6 +216,7 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str l.Infof("Changes to %q are being temporarily suppressed because it changes too frequently.", p) cf.Flags |= protocol.FlagInvalid cf.Version = lamport.Default.Tick(cf.Version) + cf.LocalVersion = 0 if debug { l.Debugln("suppressed:", cf) }