From 5eb5a056bfbdd0a7a3ee94d12aaf9b1e7ab3c79e Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sat, 29 Mar 2014 18:53:48 +0100 Subject: [PATCH] Basic support for synchronizing multiple repositories (fixes #35) --- build.sh | 6 +- cmd/stcli/main.go | 8 +- cmd/syncthing/config.go | 15 ++ cmd/syncthing/main.go | 81 +----- cmd/syncthing/model.go | 433 +++++++++++++++++++++++--------- cmd/syncthing/model_test.go | 18 +- cmd/syncthing/puller.go | 84 +++---- integration/.gitignore | 4 + integration/h1/config.xml | 8 + integration/h2/config.xml | 16 ++ integration/h3/config.xml | 8 + integration/test.sh | 39 ++- protocol/common_test.go | 4 +- protocol/nativemodel_darwin.go | 8 +- protocol/nativemodel_unix.go | 8 +- protocol/nativemodel_windows.go | 8 +- protocol/protocol.go | 38 +-- protocol/wireformat.go | 8 +- 18 files changed, 492 insertions(+), 302 deletions(-) diff --git a/build.sh b/build.sh index cac937b1c..9ef62c978 100755 --- a/build.sh +++ b/build.sh @@ -14,7 +14,7 @@ build() { go get -d ./cmd/syncthing godep= fi - ${godep} go build -ldflags "-w -X main.Version $version" ./cmd/syncthing + ${godep} go build $* -ldflags "-w -X main.Version $version" ./cmd/syncthing ${godep} go build -ldflags "-w -X main.Version $version" ./cmd/stcli } @@ -61,6 +61,10 @@ case "$1" in build ;; + race) + build -race + ;; + test) test ;; diff --git a/cmd/stcli/main.go b/cmd/stcli/main.go index e52b37f05..6e7ec11fe 100644 --- a/cmd/stcli/main.go +++ b/cmd/stcli/main.go @@ -78,8 +78,8 @@ func prtIndex(files []protocol.FileInfo) { } } -func (m Model) Index(nodeID string, files []protocol.FileInfo) { - log.Printf("Received index") +func (m Model) Index(nodeID string, repo string, files []protocol.FileInfo) { + log.Printf("Received index for repo %q", repo) if cmd == "idx" { prtIndex(files) if get != "" { @@ -117,8 +117,8 @@ func getFile(f protocol.FileInfo) { fd.Close() } -func (m Model) IndexUpdate(nodeID string, files []protocol.FileInfo) { - log.Println("Received index update") +func (m Model) IndexUpdate(nodeID string, repo string, files []protocol.FileInfo) { + log.Printf("Received index update for repo %q", repo) if cmd == "idx" { prtIndex(files) if exit { diff --git a/cmd/syncthing/config.go b/cmd/syncthing/config.go index b8bdf4094..f0c7ecce3 100644 --- a/cmd/syncthing/config.go +++ b/cmd/syncthing/config.go @@ -19,6 +19,7 @@ type Configuration struct { } type RepositoryConfiguration struct { + ID string `xml:"id,attr"` Directory string `xml:"directory,attr"` Nodes []NodeConfiguration `xml:"node"` } @@ -181,6 +182,20 @@ func readConfigXML(rd io.Reader) (Configuration, error) { fillNilSlices(&cfg.Options) cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress) + + var seenRepos = map[string]bool{} + for i := range cfg.Repositories { + if cfg.Repositories[i].ID == "" { + cfg.Repositories[i].ID = "default" + } + + id := cfg.Repositories[i].ID + if seenRepos[id] { + panic("duplicate repository ID " + id) + } + seenRepos[id] = true + } + return cfg, err } diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 6acf7eb76..02d273b11 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -1,7 +1,6 @@ package main import ( - "compress/gzip" "crypto/tls" "flag" "fmt" @@ -20,7 +19,6 @@ import ( "github.com/calmh/ini" "github.com/calmh/syncthing/discover" "github.com/calmh/syncthing/protocol" - "github.com/calmh/syncthing/scanner" ) const BlockSize = 128 * 1024 @@ -166,11 +164,6 @@ func main() { infof("Edit %s to taste or use the GUI\n", cfgFile) } - // Make sure the local node is in the node list. - cfg.Repositories[0].Nodes = cleanNodeList(cfg.Repositories[0].Nodes, myID) - - var dir = expandTilde(cfg.Repositories[0].Directory) - if profiler := os.Getenv("STPROFILER"); len(profiler) > 0 { go func() { dlog.Println("Starting profiler on", profiler) @@ -194,12 +187,18 @@ func main() { MinVersion: tls.VersionTLS12, } - ensureDir(dir, -1) - m := NewModel(dir, cfg.Options.MaxChangeKbps*1000) + m := NewModel(cfg.Options.MaxChangeKbps * 1000) if cfg.Options.MaxSendKbps > 0 { m.LimitRate(cfg.Options.MaxSendKbps) } + for i := range cfg.Repositories { + cfg.Repositories[i].Nodes = cleanNodeList(cfg.Repositories[i].Nodes, myID) + dir := expandTilde(cfg.Repositories[i].Directory) + ensureDir(dir, -1) + m.AddRepo(cfg.Repositories[i].ID, dir, cfg.Repositories[i].Nodes) + } + // GUI if cfg.Options.GUIEnabled && cfg.Options.GUIAddress != "" { addr, err := net.ResolveTCPAddr("tcp", cfg.Options.GUIAddress) @@ -233,19 +232,9 @@ func main() { if verbose { infoln("Populating repository index") } - loadIndex(m) - - sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)} - w := &scanner.Walker{ - Dir: m.dir, - IgnoreFile: ".stignore", - FollowSymlinks: cfg.Options.FollowSymlinks, - BlockSize: BlockSize, - TempNamer: defTempNamer, - Suppressor: sup, - CurrentFiler: m, - } - updateLocalModel(m, w) + m.LoadIndexes(confDir) + m.ScanRepos() + m.SaveIndexes(confDir) connOpts := map[string]string{ "clientId": "syncthing", @@ -467,54 +456,6 @@ func discovery() *discover.Discoverer { return disc } -func updateLocalModel(m *Model, w *scanner.Walker) { - files, _ := w.Walk() - m.ReplaceLocal(files) - saveIndex(m) -} - -func saveIndex(m *Model) { - name := m.RepoID() + ".idx.gz" - fullName := filepath.Join(confDir, name) - idxf, err := os.Create(fullName + ".tmp") - if err != nil { - return - } - - gzw := gzip.NewWriter(idxf) - - protocol.IndexMessage{ - Repository: "local", - Files: m.ProtocolIndex(), - }.EncodeXDR(gzw) - gzw.Close() - idxf.Close() - - Rename(fullName+".tmp", fullName) -} - -func loadIndex(m *Model) { - name := m.RepoID() + ".idx.gz" - idxf, err := os.Open(filepath.Join(confDir, name)) - if err != nil { - return - } - defer idxf.Close() - - gzr, err := gzip.NewReader(idxf) - if err != nil { - return - } - defer gzr.Close() - - var im protocol.IndexMessage - err = im.DecodeXDR(gzr) - if err != nil || im.Repository != "local" { - return - } - m.SeedLocal(im.Files) -} - func ensureDir(dir string, mode int) { fi, err := os.Stat(dir) if os.IsNotExist(err) { diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go index 7c53cca67..0e1d61b04 100644 --- a/cmd/syncthing/model.go +++ b/cmd/syncthing/model.go @@ -1,6 +1,7 @@ package main import ( + "compress/gzip" "crypto/sha1" "errors" "fmt" @@ -20,28 +21,26 @@ import ( ) type Model struct { - dir string - cm *cid.Map - fs *files.Set + repoDirs map[string]string // repo -> dir + repoFiles map[string]*files.Set // repo -> files + repoNodes map[string][]string // repo -> nodeIDs + nodeRepos map[string][]string // nodeID -> repos + rmut sync.RWMutex // protects the above + + cm *cid.Map protoConn map[string]protocol.Connection rawConn map[string]io.Closer pmut sync.RWMutex // protects protoConn and rawConn - initOnce sync.Once - sup suppressor limitRequestRate chan struct{} - imut sync.Mutex // protects Index + addedRepo bool + started bool } -const ( - idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification - idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long -) - var ( ErrNoSuchFile = errors.New("no such file") ErrInvalid = errors.New("file is invalid") @@ -50,11 +49,13 @@ var ( // NewModel creates and starts a new model. The model starts in read-only mode, // where it sends index information to connected peers and responds to requests // for file data without altering the local repository in any way. -func NewModel(dir string, maxChangeBw int) *Model { +func NewModel(maxChangeBw int) *Model { m := &Model{ - dir: dir, + repoDirs: make(map[string]string), + repoFiles: make(map[string]*files.Set), + repoNodes: make(map[string][]string), + nodeRepos: make(map[string][]string), cm: cid.NewMap(), - fs: files.NewSet(), protoConn: make(map[string]protocol.Connection), rawConn: make(map[string]io.Closer), sup: suppressor{threshold: int64(maxChangeBw)}, @@ -83,25 +84,32 @@ func (m *Model) LimitRate(kbps int) { // read/write mode the model will attempt to keep in sync with the cluster by // pulling needed files from peer nodes. func (m *Model) StartRW(threads int) { - m.initOnce.Do(func() { - newPuller("default", m.dir, m, threads) - }) + m.rmut.Lock() + defer m.rmut.Unlock() + + if !m.addedRepo { + panic("cannot start without repo") + } + m.started = true + for repo, dir := range m.repoDirs { + newPuller(repo, dir, m, threads) + } } // StartRO starts read only processing on the current model. When in // read only mode the model will announce files to the cluster but not // pull in any external changes. func (m *Model) StartRO() { - m.initOnce.Do(func() { - newPuller("default", m.dir, m, 0) // zero threads => read only - }) -} + m.rmut.Lock() + defer m.rmut.Unlock() -// Generation returns an opaque integer that is guaranteed to increment on -// every change to the local repository or global model. -func (m *Model) Generation() uint64 { - c := m.fs.Changes(cid.LocalID) - return c + if !m.addedRepo { + panic("cannot start without repo") + } + m.started = true + for repo, dir := range m.repoDirs { + newPuller(repo, dir, m, 0) // zero threads => read only + } } type ConnectionInfo struct { @@ -119,13 +127,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { } m.pmut.RLock() - - var tot int64 - for _, f := range m.fs.Global() { - if f.Flags&protocol.FlagDeleted == 0 { - tot += f.Size - } - } + m.rmut.RLock() var res = make(map[string]ConnectionInfo) for node, conn := range m.protoConn { @@ -138,10 +140,21 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { ci.Address = nc.RemoteAddr().String() } - var have = tot - for _, f := range m.fs.Need(m.cm.Get(node)) { - if f.Flags&protocol.FlagDeleted == 0 { - have -= f.Size + var tot int64 + var have int64 + + for _, repo := range m.nodeRepos[node] { + for _, f := range m.repoFiles[repo].Global() { + if f.Flags&protocol.FlagDeleted == 0 { + tot += f.Size + have += f.Size + } + } + + for _, f := range m.repoFiles[repo].Need(m.cm.Get(node)) { + if f.Flags&protocol.FlagDeleted == 0 { + have -= f.Size + } } } @@ -153,6 +166,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { res[node] = ci } + m.rmut.RUnlock() m.pmut.RUnlock() return res @@ -173,32 +187,54 @@ func sizeOf(fs []scanner.File) (files, deleted int, bytes int64) { // GlobalSize returns the number of files, deleted files and total bytes for all // files in the global model. func (m *Model) GlobalSize() (files, deleted int, bytes int64) { - fs := m.fs.Global() + m.rmut.RLock() + var fs []scanner.File + for _, rf := range m.repoFiles { + fs = append(fs, rf.Global()...) + } + m.rmut.RUnlock() return sizeOf(fs) } // LocalSize returns the number of files, deleted files and total bytes for all // files in the local repository. func (m *Model) LocalSize() (files, deleted int, bytes int64) { - fs := m.fs.Have(cid.LocalID) + m.rmut.RLock() + var fs []scanner.File + for _, rf := range m.repoFiles { + fs = append(fs, rf.Have(cid.LocalID)...) + } + m.rmut.RUnlock() return sizeOf(fs) } // InSyncSize returns the number and total byte size of the local files that // are in sync with the global model. func (m *Model) InSyncSize() (files int, bytes int64) { - gf := m.fs.Global() - hf := m.fs.Need(cid.LocalID) + var gf []scanner.File + var nf []scanner.File + + m.rmut.RLock() + for _, rf := range m.repoFiles { + gf = append(gf, rf.Global()...) + nf = append(nf, rf.Need(cid.LocalID)...) + } + m.rmut.RUnlock() gn, _, gb := sizeOf(gf) - hn, _, hb := sizeOf(hf) + nn, _, nb := sizeOf(nf) - return gn - hn, gb - hb + return gn - nn, gb - nb } // NeedFiles returns the list of currently needed files and the total size. func (m *Model) NeedFiles() ([]scanner.File, int64) { - nf := m.fs.Need(cid.LocalID) + var nf []scanner.File + m.rmut.RLock() + for _, rf := range m.repoFiles { + nf = append(nf, rf.Need(cid.LocalID)...) + } + m.rmut.RUnlock() var bytes int64 for _, f := range nf { @@ -210,24 +246,11 @@ func (m *Model) NeedFiles() ([]scanner.File, int64) { // Index is called when a new node is connected and we receive their full index. // Implements the protocol.Model interface. -func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { - var files = make([]scanner.File, len(fs)) - for i := range fs { - lamport.Default.Tick(fs[i].Version) - files[i] = fileFromFileInfo(fs[i]) - } - - cid := m.cm.Get(nodeID) - m.fs.Replace(cid, files) - +func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) { if debugNet { - dlog.Printf("IDX(in): %s: %d files", nodeID, len(fs)) + dlog.Printf("IDX(in): %s / %q: %d files", nodeID, repo, len(fs)) } -} -// IndexUpdate is called for incremental updates to connected nodes' indexes. -// Implements the protocol.Model interface. -func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { var files = make([]scanner.File, len(fs)) for i := range fs { lamport.Default.Tick(fs[i].Version) @@ -235,11 +258,36 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { } id := m.cm.Get(nodeID) - m.fs.Update(id, files) - - if debugNet { - dlog.Printf("IDXUP(in): %s: %d files", nodeID, len(files)) + m.rmut.RLock() + if r, ok := m.repoFiles[repo]; ok { + r.Replace(id, files) + } else { + warnf("Index from %s for nonexistant repo %q; dropping", nodeID, repo) } + m.rmut.RUnlock() +} + +// IndexUpdate is called for incremental updates to connected nodes' indexes. +// Implements the protocol.Model interface. +func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo) { + if debugNet { + dlog.Printf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs)) + } + + var files = make([]scanner.File, len(fs)) + for i := range fs { + lamport.Default.Tick(fs[i].Version) + files[i] = fileFromFileInfo(fs[i]) + } + + id := m.cm.Get(nodeID) + m.rmut.RLock() + if r, ok := m.repoFiles[repo]; ok { + r.Update(id, files) + } else { + warnf("Index update from %s for nonexistant repo %q; dropping", nodeID, repo) + } + m.rmut.RUnlock() } // Close removes the peer from the model and closes the underlying connection if possible. @@ -255,7 +303,11 @@ func (m *Model) Close(node string, err error) { } cid := m.cm.Get(node) - m.fs.Replace(cid, nil) + m.rmut.RLock() + for _, repo := range m.nodeRepos[node] { + m.repoFiles[repo].Replace(cid, nil) + } + m.rmut.RUnlock() m.cm.Clear(node) m.pmut.Lock() @@ -272,19 +324,31 @@ func (m *Model) Close(node string, err error) { // Implements the protocol.Model interface. func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]byte, error) { // Verify that the requested file exists in the local model. - lf := m.fs.Get(cid.LocalID, name) + m.rmut.RLock() + r, ok := m.repoFiles[repo] + m.rmut.RUnlock() + + if !ok { + warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo) + return nil, ErrNoSuchFile + } + + lf := r.Get(cid.LocalID, name) if offset > lf.Size { warnf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size) return nil, ErrNoSuchFile } + if lf.Suppressed { return nil, ErrInvalid } if debugNet && nodeID != "" { - dlog.Printf("REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size) + dlog.Printf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size) } - fn := filepath.Join(m.dir, name) + m.rmut.RLock() + fn := filepath.Join(m.repoDirs[repo], name) + m.rmut.RUnlock() fd, err := os.Open(fn) // XXX: Inefficient, should cache fd? if err != nil { return nil, err @@ -307,24 +371,34 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by } // ReplaceLocal replaces the local repository index with the given list of files. -func (m *Model) ReplaceLocal(fs []scanner.File) { - m.fs.ReplaceWithDelete(cid.LocalID, fs) +func (m *Model) ReplaceLocal(repo string, fs []scanner.File) { + m.rmut.RLock() + m.repoFiles[repo].ReplaceWithDelete(cid.LocalID, fs) + m.rmut.RUnlock() } -// ReplaceLocal replaces the local repository index with the given list of files. -func (m *Model) SeedLocal(fs []protocol.FileInfo) { +func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) { var sfs = make([]scanner.File, len(fs)) for i := 0; i < len(fs); i++ { lamport.Default.Tick(fs[i].Version) sfs[i] = fileFromFileInfo(fs[i]) } - m.fs.Replace(cid.LocalID, sfs) + m.rmut.RLock() + m.repoFiles[repo].Replace(cid.LocalID, sfs) + m.rmut.RUnlock() +} + +type cFiler struct { + m *Model + r string } // Implements scanner.CurrentFiler -func (m *Model) CurrentFile(file string) scanner.File { - f := m.fs.Get(cid.LocalID, file) +func (cf cFiler) CurrentFile(file string) scanner.File { + cf.m.rmut.RLock() + f := cf.m.repoFiles[cf.r].Get(cid.LocalID, file) + cf.m.rmut.RUnlock() return f } @@ -336,11 +410,6 @@ func (m *Model) ConnectedTo(nodeID string) bool { return ok } -// RepoID returns a unique ID representing the current repository location. -func (m *Model) RepoID() string { - return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir))) -} - // AddConnection adds a new peer connection to the model. An initial index will // be sent to the connected peer, thereafter index updates whenever the local // repository changes. @@ -358,20 +427,27 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) m.pmut.Unlock() go func() { - idx := m.ProtocolIndex() - if debugNet { - dlog.Printf("IDX(out/initial): %s: %d files", nodeID, len(idx)) + m.rmut.RLock() + repos := m.nodeRepos[nodeID] + m.rmut.RUnlock() + for _, repo := range repos { + idx := m.ProtocolIndex(repo) + if debugNet { + dlog.Printf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx)) + } + protoConn.Index(repo, idx) } - protoConn.Index("default", idx) }() } // ProtocolIndex returns the current local index in protocol data types. // Must be called with the read lock held. -func (m *Model) ProtocolIndex() []protocol.FileInfo { +func (m *Model) ProtocolIndex(repo string) []protocol.FileInfo { var index []protocol.FileInfo - fs := m.fs.Have(cid.LocalID) + m.rmut.RLock() + fs := m.repoFiles[repo].Have(cid.LocalID) + m.rmut.RUnlock() for _, f := range fs { mf := fileInfoFromFile(f) @@ -380,7 +456,7 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo { if mf.Flags&protocol.FlagDeleted != 0 { flagComment = " (deleted)" } - dlog.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks)) + dlog.Printf("IDX(out): %q/%q m=%d f=%o%s v=%d (%d blocks)", repo, mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks)) } index = append(index, mf) } @@ -388,11 +464,13 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo { return index } -func (m *Model) updateLocal(f scanner.File) { - m.fs.Update(cid.LocalID, []scanner.File{f}) +func (m *Model) updateLocal(repo string, f scanner.File) { + m.rmut.RLock() + m.repoFiles[repo].Update(cid.LocalID, []scanner.File{f}) + m.rmut.RUnlock() } -func (m *Model) requestGlobal(nodeID, name string, offset int64, size int, hash []byte) ([]byte, error) { +func (m *Model) requestGlobal(nodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) { m.pmut.RLock() nc, ok := m.protoConn[nodeID] m.pmut.RUnlock() @@ -402,52 +480,163 @@ func (m *Model) requestGlobal(nodeID, name string, offset int64, size int, hash } if debugNet { - dlog.Printf("REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + dlog.Printf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash) } - return nc.Request("default", name, offset, size) + return nc.Request(repo, name, offset, size) } func (m *Model) broadcastIndexLoop() { - var lastChange uint64 + var lastChange = map[string]uint64{} for { time.Sleep(5 * time.Second) - c := m.fs.Changes(cid.LocalID) - if c == lastChange { - continue - } - lastChange = c - - saveIndex(m) // This should be cleaned up we don't do a lot of processing twice - - fs := m.fs.Have(cid.LocalID) - - var indexWg sync.WaitGroup - indexWg.Add(len(m.protoConn)) - - var idx = make([]protocol.FileInfo, len(fs)) - for i, f := range fs { - idx[i] = fileInfoFromFile(f) - } - m.pmut.RLock() - for _, node := range m.protoConn { - node := node - if debugNet { - dlog.Printf("IDX(out/loop): %s: %d files", node.ID(), len(idx)) - } - go func() { - node.Index("default", idx) - indexWg.Done() - }() - } - m.pmut.RUnlock() + m.rmut.RLock() - indexWg.Wait() + for repo, fs := range m.repoFiles { + c := fs.Changes(cid.LocalID) + if c == lastChange[repo] { + continue + } + lastChange[repo] = c + + idx := m.ProtocolIndex(repo) + m.saveIndex(repo, confDir, idx) + + var indexWg sync.WaitGroup + for _, nodeID := range m.repoNodes[repo] { + if conn, ok := m.protoConn[nodeID]; ok { + indexWg.Add(1) + if debugNet { + dlog.Printf("IDX(out/loop): %s: %d files", nodeID, len(idx)) + } + go func() { + conn.Index(repo, idx) + indexWg.Done() + }() + } + } + + indexWg.Wait() + } + + m.rmut.RUnlock() + m.pmut.RUnlock() } } +func (m *Model) AddRepo(id, dir string, nodes []NodeConfiguration) { + if m.started { + panic("cannot add repo to started model") + } + if len(id) == 0 { + panic("cannot add empty repo id") + } + + m.rmut.Lock() + m.repoDirs[id] = dir + m.repoFiles[id] = files.NewSet() + + m.repoNodes[id] = make([]string, len(nodes)) + for i, node := range nodes { + m.repoNodes[id][i] = node.NodeID + m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], id) + } + + m.addedRepo = true + m.rmut.Unlock() +} + +func (m *Model) ScanRepos() { + m.rmut.RLock() + for repo := range m.repoDirs { + m.ScanRepo(repo) + } + m.rmut.RUnlock() +} + +func (m *Model) ScanRepo(repo string) { + sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)} + w := &scanner.Walker{ + Dir: m.repoDirs[repo], + IgnoreFile: ".stignore", + FollowSymlinks: cfg.Options.FollowSymlinks, + BlockSize: BlockSize, + TempNamer: defTempNamer, + Suppressor: sup, + CurrentFiler: cFiler{m, repo}, + } + fs, _ := w.Walk() + m.ReplaceLocal(repo, fs) +} + +func (m *Model) SaveIndexes(dir string) { + m.rmut.RLock() + for repo := range m.repoDirs { + fs := m.ProtocolIndex(repo) + m.saveIndex(repo, dir, fs) + } + m.rmut.RUnlock() +} + +func (m *Model) LoadIndexes(dir string) { + m.rmut.RLock() + for repo := range m.repoDirs { + fs := m.loadIndex(repo, dir) + m.SeedLocal(repo, fs) + } + m.rmut.RUnlock() +} + +func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) { + id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoDirs[repo]))) + name := id + ".idx.gz" + name = filepath.Join(dir, name) + + idxf, err := os.Create(name + ".tmp") + if err != nil { + return + } + + gzw := gzip.NewWriter(idxf) + + protocol.IndexMessage{ + Repository: repo, + Files: fs, + }.EncodeXDR(gzw) + gzw.Close() + idxf.Close() + + Rename(name+".tmp", name) +} + +func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo { + id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoDirs[repo]))) + name := id + ".idx.gz" + name = filepath.Join(dir, name) + + idxf, err := os.Open(name) + if err != nil { + return nil + } + defer idxf.Close() + + gzr, err := gzip.NewReader(idxf) + if err != nil { + return nil + } + defer gzr.Close() + + var im protocol.IndexMessage + err = im.DecodeXDR(gzr) + if err != nil || im.Repository != repo { + return nil + } + + return im.Files +} + func fileFromFileInfo(f protocol.FileInfo) scanner.File { var blocks = make([]scanner.Block, len(f.Blocks)) var offset int64 diff --git a/cmd/syncthing/model_test.go b/cmd/syncthing/model_test.go index 186ff01bb..0dbf096bf 100644 --- a/cmd/syncthing/model_test.go +++ b/cmd/syncthing/model_test.go @@ -92,7 +92,7 @@ func BenchmarkIndex10000(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - m.Index("42", files) + m.Index("42", "default", files) } } @@ -105,7 +105,7 @@ func BenchmarkIndex00100(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - m.Index("42", files) + m.Index("42", "default", files) } } @@ -115,11 +115,11 @@ func BenchmarkIndexUpdate10000f10000(b *testing.B) { fs, _ := w.Walk() m.ReplaceLocal(fs) files := genFiles(10000) - m.Index("42", files) + m.Index("42", "default", files) b.ResetTimer() for i := 0; i < b.N; i++ { - m.IndexUpdate("42", files) + m.IndexUpdate("42", "default", files) } } @@ -129,12 +129,12 @@ func BenchmarkIndexUpdate10000f00100(b *testing.B) { fs, _ := w.Walk() m.ReplaceLocal(fs) files := genFiles(10000) - m.Index("42", files) + m.Index("42", "default", files) ufiles := genFiles(100) b.ResetTimer() for i := 0; i < b.N; i++ { - m.IndexUpdate("42", ufiles) + m.IndexUpdate("42", "default", ufiles) } } @@ -144,12 +144,12 @@ func BenchmarkIndexUpdate10000f00001(b *testing.B) { fs, _ := w.Walk() m.ReplaceLocal(fs) files := genFiles(10000) - m.Index("42", files) + m.Index("42", "default", files) ufiles := genFiles(1) b.ResetTimer() for i := 0; i < b.N; i++ { - m.IndexUpdate("42", ufiles) + m.IndexUpdate("42", "default", ufiles) } } @@ -206,7 +206,7 @@ func BenchmarkRequest(b *testing.B) { requestData: []byte("some data to return"), } m.AddConnection(fc, fc) - m.Index("42", files) + m.Index("42", "default", files) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/cmd/syncthing/puller.go b/cmd/syncthing/puller.go index 6f30cf59f..cad660e68 100644 --- a/cmd/syncthing/puller.go +++ b/cmd/syncthing/puller.go @@ -111,7 +111,7 @@ func (p *puller) run() { <-p.requestSlots b := p.bq.get() if debugPull { - dlog.Printf("filler: queueing %q offset %d copy %d", b.file.Name, b.block.Offset, len(b.copy)) + dlog.Printf("filler: queueing %q / %q offset %d copy %d", p.repo, b.file.Name, b.block.Offset, len(b.copy)) } p.blocks <- b } @@ -120,17 +120,6 @@ func (p *puller) run() { walkTicker := time.Tick(time.Duration(cfg.Options.RescanIntervalS) * time.Second) timeout := time.Tick(5 * time.Second) - sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)} - w := &scanner.Walker{ - Dir: p.dir, - IgnoreFile: ".stignore", - FollowSymlinks: cfg.Options.FollowSymlinks, - BlockSize: BlockSize, - TempNamer: defTempNamer, - Suppressor: sup, - CurrentFiler: p.model, - } - for { // Run the pulling loop as long as there are blocks to fetch pull: @@ -152,7 +141,7 @@ func (p *puller) run() { break pull } if debugPull { - dlog.Printf("idle but have %d open files", len(p.openFiles)) + dlog.Printf("%q: idle but have %d open files", p.repo, len(p.openFiles)) i := 5 for _, f := range p.openFiles { dlog.Printf(" %v", f) @@ -169,10 +158,9 @@ func (p *puller) run() { select { case <-walkTicker: if debugPull { - dlog.Println("time for rescan") + dlog.Printf("%q: time for rescan", p.repo) } - files, _ := w.Walk() - p.model.fs.ReplaceWithDelete(cid.LocalID, files) + p.model.ScanRepo(p.repo) default: } @@ -185,23 +173,11 @@ func (p *puller) run() { func (p *puller) runRO() { walkTicker := time.Tick(time.Duration(cfg.Options.RescanIntervalS) * time.Second) - sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)} - w := &scanner.Walker{ - Dir: p.dir, - IgnoreFile: ".stignore", - FollowSymlinks: cfg.Options.FollowSymlinks, - BlockSize: BlockSize, - TempNamer: defTempNamer, - Suppressor: sup, - CurrentFiler: p.model, - } - for _ = range walkTicker { if debugPull { - dlog.Println("time for rescan") + dlog.Printf("%q: time for rescan", p.repo) } - files, _ := w.Walk() - p.model.fs.ReplaceWithDelete(cid.LocalID, files) + p.model.ScanRepo(p.repo) } } @@ -222,12 +198,12 @@ func (p *puller) handleRequestResult(res requestResult) { p.openFiles[f.Name] = of if debugPull { - dlog.Printf("pull: wrote %q offset %d outstanding %d done %v", f.Name, res.offset, of.outstanding, of.done) + dlog.Printf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repo, f.Name, res.offset, of.outstanding, of.done) } if of.done && of.outstanding == 0 { if debugPull { - dlog.Printf("pull: closing %q", f.Name) + dlog.Printf("pull: closing %q / %q", p.repo, f.Name) } of.file.Close() defer os.Remove(of.temp) @@ -237,7 +213,7 @@ func (p *puller) handleRequestResult(res requestResult) { fd, err := os.Open(of.temp) if err != nil { if debugPull { - dlog.Printf("pull: error: %q: %v", f.Name, err) + dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, err) } return } @@ -246,14 +222,14 @@ func (p *puller) handleRequestResult(res requestResult) { if l0, l1 := len(hb), len(f.Blocks); l0 != l1 { if debugPull { - dlog.Printf("pull: %q: nblocks %d != %d", f.Name, l0, l1) + dlog.Printf("pull: %q / %q: nblocks %d != %d", p.repo, f.Name, l0, l1) } return } for i := range hb { if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 { - dlog.Printf("pull: %q: block %d hash mismatch", f.Name, i) + dlog.Printf("pull: %q / %q: block %d hash mismatch", p.repo, f.Name, i) return } } @@ -262,12 +238,12 @@ func (p *puller) handleRequestResult(res requestResult) { os.Chtimes(of.temp, t, t) os.Chmod(of.temp, os.FileMode(f.Flags&0777)) if debugPull { - dlog.Printf("pull: rename %q: %q", f.Name, of.filepath) + dlog.Printf("pull: rename %q / %q: %q", p.repo, f.Name, of.filepath) } if err := Rename(of.temp, of.filepath); err == nil { - p.model.fs.Update(cid.LocalID, []scanner.File{f}) + p.model.updateLocal(p.repo, f) } else { - dlog.Printf("pull: error: %q: %v", f.Name, err) + dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, err) } } } @@ -280,10 +256,10 @@ func (p *puller) handleBlock(b bqBlock) { if !ok { if debugPull { - dlog.Printf("pull: opening file %q", f.Name) + dlog.Printf("pull: %q: opening file %q", p.repo, f.Name) } - of.availability = uint64(p.model.fs.Availability(f.Name)) + of.availability = uint64(p.model.repoFiles[p.repo].Availability(f.Name)) of.filepath = filepath.Join(p.dir, f.Name) of.temp = filepath.Join(p.dir, defTempNamer.TempName(f.Name)) @@ -293,13 +269,13 @@ func (p *puller) handleBlock(b bqBlock) { err = os.MkdirAll(dirName, 0777) } if err != nil { - dlog.Printf("pull: error: %q: %v", f.Name, err) + dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, err) } of.file, of.err = os.Create(of.temp) if of.err != nil { if debugPull { - dlog.Printf("pull: error: %q: %v", f.Name, of.err) + dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, of.err) } if !b.last { p.openFiles[f.Name] = of @@ -312,10 +288,10 @@ func (p *puller) handleBlock(b bqBlock) { if of.err != nil { // We have already failed this file. if debugPull { - dlog.Printf("pull: error: %q has already failed: %v", f.Name, of.err) + dlog.Printf("pull: error: %q / %q has already failed: %v", p.repo, f.Name, of.err) } if b.last { - dlog.Printf("pull: removing failed file %q", f.Name) + dlog.Printf("pull: removing failed file %q / %q", p.repo, f.Name) delete(p.openFiles, f.Name) } @@ -346,14 +322,14 @@ func (p *puller) handleCopyBlock(b bqBlock) { of := p.openFiles[f.Name] if debugPull { - dlog.Printf("pull: copying %d blocks for %q", len(b.copy), f.Name) + dlog.Printf("pull: copying %d blocks for %q / %q", len(b.copy), p.repo, f.Name) } var exfd *os.File exfd, of.err = os.Open(of.filepath) if of.err != nil { if debugPull { - dlog.Printf("pull: error: %q: %v", f.Name, of.err) + dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, of.err) } of.file.Close() of.file = nil @@ -372,7 +348,7 @@ func (p *puller) handleCopyBlock(b bqBlock) { buffers.Put(bs) if of.err != nil { if debugPull { - dlog.Printf("pull: error: %q: %v", f.Name, of.err) + dlog.Printf("pull: error: %q / %q: %v", p.repo, f.Name, of.err) } exfd.Close() of.file.Close() @@ -412,10 +388,10 @@ func (p *puller) handleRequestBlock(b bqBlock) { go func(node string, b bqBlock) { if debugPull { - dlog.Printf("pull: requesting %q offset %d size %d from %q outstanding %d", f.Name, b.block.Offset, b.block.Size, node, of.outstanding) + dlog.Printf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repo, f.Name, b.block.Offset, b.block.Size, node, of.outstanding) } - bs, err := p.model.requestGlobal(node, f.Name, b.block.Offset, int(b.block.Size), nil) + bs, err := p.model.requestGlobal(node, p.repo, f.Name, b.block.Offset, int(b.block.Size), nil) p.requestResults <- requestResult{ node: node, file: f, @@ -445,7 +421,7 @@ func (p *puller) handleEmptyBlock(b bqBlock) { os.Remove(of.filepath) } else { if debugPull { - dlog.Printf("pull: no blocks to fetch and nothing to copy for %q", f.Name) + dlog.Printf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repo, f.Name) } t := time.Unix(f.Modified, 0) os.Chtimes(of.temp, t, t) @@ -453,13 +429,13 @@ func (p *puller) handleEmptyBlock(b bqBlock) { Rename(of.temp, of.filepath) } delete(p.openFiles, f.Name) - p.model.fs.Update(cid.LocalID, []scanner.File{f}) + p.model.repoFiles[p.repo].Update(cid.LocalID, []scanner.File{f}) } func (p *puller) queueNeededBlocks() { queued := 0 - for _, f := range p.model.fs.Need(cid.LocalID) { - lf := p.model.fs.Get(cid.LocalID, f.Name) + for _, f := range p.model.repoFiles[p.repo].Need(cid.LocalID) { + lf := p.model.repoFiles[p.repo].Get(cid.LocalID, f.Name) have, need := scanner.BlockDiff(lf.Blocks, f.Blocks) if debugNeed { dlog.Printf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need) @@ -472,6 +448,6 @@ func (p *puller) queueNeededBlocks() { }) } if debugPull && queued > 0 { - dlog.Printf("queued %d blocks", queued) + dlog.Printf("%q: queued %d blocks", p.repo, queued) } } diff --git a/integration/.gitignore b/integration/.gitignore index 898903296..ba94cadc9 100644 --- a/integration/.gitignore +++ b/integration/.gitignore @@ -1,6 +1,10 @@ s1 s2 s3 +s12-1 +s12-2 +s23-2 +s23-3 md5-* genfiles md5r diff --git a/integration/h1/config.xml b/integration/h1/config.xml index 4e313de87..2fc198170 100644 --- a/integration/h1/config.xml +++ b/integration/h1/config.xml @@ -10,6 +10,14 @@
127.0.0.1:22003
+ + +
127.0.0.1:22001
+
+ +
127.0.0.1:22002
+
+
127.0.0.1:22001 false diff --git a/integration/h2/config.xml b/integration/h2/config.xml index 65ccf4e4c..fbfc9efca 100644 --- a/integration/h2/config.xml +++ b/integration/h2/config.xml @@ -10,6 +10,22 @@
127.0.0.1:22003
+ + +
127.0.0.1:22001
+
+ +
127.0.0.1:22002
+
+
+ + +
127.0.0.1:22002
+
+ +
127.0.0.1:22003
+
+
127.0.0.1:22002 false diff --git a/integration/h3/config.xml b/integration/h3/config.xml index bdd23c5fa..ba506d2e2 100644 --- a/integration/h3/config.xml +++ b/integration/h3/config.xml @@ -10,6 +10,14 @@
127.0.0.1:22003
+ + +
127.0.0.1:22002
+
+ +
127.0.0.1:22003
+
+
127.0.0.1:22003 false diff --git a/integration/test.sh b/integration/test.sh index 839468082..b92a56ac4 100755 --- a/integration/test.sh +++ b/integration/test.sh @@ -15,7 +15,7 @@ go build json.go testConvergence() { echo "Starting..." for i in 1 2 3 ; do - syncthing -home "h$i" & + STPROFILER=":909$i" syncthing -home "h$i" & done while true ; do @@ -36,9 +36,11 @@ testConvergence() { done echo "Verifying..." - cat md5-* | sort | uniq > md5-tot + cat md5-? | sort | uniq > md5-tot + cat md5-12-? | sort | uniq > md5-12-tot + cat md5-23-? | sort | uniq > md5-23-tot - for i in 1 2 3 ; do + for i in 1 2 3 12-1 12-2 23-2 23-3; do pushd "s$i" >/dev/null ../md5r -l | sort > ../md5-$i popd >/dev/null @@ -47,19 +49,35 @@ testConvergence() { ok=0 for i in 1 2 3 ; do if ! cmp "md5-$i" md5-tot >/dev/null ; then - echo "Fail: instance $i unconverged" + echo "Fail: instance $i unconverged for default" else ok=$(($ok + 1)) - echo "OK: instance $i converged" + echo "OK: instance $i converged for default" fi done - if [[ $ok != 3 ]] ; then + for i in 12-1 12-2 ; do + if ! cmp "md5-$i" md5-12-tot >/dev/null ; then + echo "Fail: instance $i unconverged for s12" + else + ok=$(($ok + 1)) + echo "OK: instance $i converged for s12" + fi + done + for i in 23-2 23-3 ; do + if ! cmp "md5-$i" md5-23-tot >/dev/null ; then + echo "Fail: instance $i unconverged for s23" + else + ok=$(($ok + 1)) + echo "OK: instance $i converged for s23" + fi + done + if [[ $ok != 7 ]] ; then exit 1 fi } echo "Setting up files..." -for i in 1 2 3 ; do +for i in 1 2 3 12-1 12-2 23-2 23-3; do rm -f h$i/*.idx.gz rm -rf "s$i" mkdir "s$i" @@ -74,7 +92,7 @@ for i in 1 2 3 ; do done echo "MD5-summing..." -for i in 1 2 3 ; do +for i in 1 2 3 12-1 12-2 23-2 23-3 ; do pushd "s$i" >/dev/null ../md5r -l > ../md5-$i popd >/dev/null @@ -84,13 +102,14 @@ testConvergence for ((t = 0; t < $iterations; t++)) ; do echo "Add and remove random files ($((t+1)) / $iterations)..." - for i in 1 2 3 ; do + for i in 1 2 3 12-1 12-2 23-2 23-3 ; do pushd "s$i" >/dev/null rm -rf */?[02468ace] ../genfiles -maxexp 22 -files 600 echo " $i: append to large file" dd if=/dev/urandom bs=1024k count=4 >> large-$i 2>/dev/null - ../md5r -l | egrep -v "large-[^$i]" > ../md5-$i + ../md5r -l > ../md5-tmp + (grep -v large ../md5-tmp ; grep "large-$i" ../md5-tmp) > ../md5-$i popd >/dev/null done diff --git a/protocol/common_test.go b/protocol/common_test.go index a24075e05..246d2b8fb 100644 --- a/protocol/common_test.go +++ b/protocol/common_test.go @@ -20,10 +20,10 @@ func newTestModel() *TestModel { } } -func (t *TestModel) Index(nodeID string, files []FileInfo) { +func (t *TestModel) Index(nodeID string, repo string, files []FileInfo) { } -func (t *TestModel) IndexUpdate(nodeID string, files []FileInfo) { +func (t *TestModel) IndexUpdate(nodeID string, repo string, files []FileInfo) { } func (t *TestModel) Request(nodeID, repo, name string, offset int64, size int) ([]byte, error) { diff --git a/protocol/nativemodel_darwin.go b/protocol/nativemodel_darwin.go index 38b683e8e..f89a26941 100644 --- a/protocol/nativemodel_darwin.go +++ b/protocol/nativemodel_darwin.go @@ -10,18 +10,18 @@ type nativeModel struct { next Model } -func (m nativeModel) Index(nodeID string, files []FileInfo) { +func (m nativeModel) Index(nodeID string, repo string, files []FileInfo) { for i := range files { files[i].Name = norm.NFD.String(files[i].Name) } - m.next.Index(nodeID, files) + m.next.Index(nodeID, repo, files) } -func (m nativeModel) IndexUpdate(nodeID string, files []FileInfo) { +func (m nativeModel) IndexUpdate(nodeID string, repo string, files []FileInfo) { for i := range files { files[i].Name = norm.NFD.String(files[i].Name) } - m.next.IndexUpdate(nodeID, files) + m.next.IndexUpdate(nodeID, repo, files) } func (m nativeModel) Request(nodeID, repo string, name string, offset int64, size int) ([]byte, error) { diff --git a/protocol/nativemodel_unix.go b/protocol/nativemodel_unix.go index 62f090c39..9739874fc 100644 --- a/protocol/nativemodel_unix.go +++ b/protocol/nativemodel_unix.go @@ -8,12 +8,12 @@ type nativeModel struct { next Model } -func (m nativeModel) Index(nodeID string, files []FileInfo) { - m.next.Index(nodeID, files) +func (m nativeModel) Index(nodeID string, repo string, files []FileInfo) { + m.next.Index(nodeID, repo, files) } -func (m nativeModel) IndexUpdate(nodeID string, files []FileInfo) { - m.next.IndexUpdate(nodeID, files) +func (m nativeModel) IndexUpdate(nodeID string, repo string, files []FileInfo) { + m.next.IndexUpdate(nodeID, repo, files) } func (m nativeModel) Request(nodeID, repo string, name string, offset int64, size int) ([]byte, error) { diff --git a/protocol/nativemodel_windows.go b/protocol/nativemodel_windows.go index 083b56974..a7aeedf6f 100644 --- a/protocol/nativemodel_windows.go +++ b/protocol/nativemodel_windows.go @@ -10,18 +10,18 @@ type nativeModel struct { next Model } -func (m nativeModel) Index(nodeID string, files []FileInfo) { +func (m nativeModel) Index(nodeID string, repo string, files []FileInfo) { for i := range files { files[i].Name = filepath.FromSlash(files[i].Name) } - m.next.Index(nodeID, files) + m.next.Index(nodeID, repo, files) } -func (m nativeModel) IndexUpdate(nodeID string, files []FileInfo) { +func (m nativeModel) IndexUpdate(nodeID string, repo string, files []FileInfo) { for i := range files { files[i].Name = filepath.FromSlash(files[i].Name) } - m.next.IndexUpdate(nodeID, files) + m.next.IndexUpdate(nodeID, repo, files) } func (m nativeModel) Request(nodeID, repo string, name string, offset int64, size int) ([]byte, error) { diff --git a/protocol/protocol.go b/protocol/protocol.go index 731d03fef..69ab7aaf6 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -1,6 +1,7 @@ package protocol import ( + "bufio" "compress/flate" "errors" "fmt" @@ -37,19 +38,19 @@ var ( type Model interface { // An index was received from the peer node - Index(nodeID string, files []FileInfo) + Index(nodeID string, repo string, files []FileInfo) // An index update was received from the peer node - IndexUpdate(nodeID string, files []FileInfo) + IndexUpdate(nodeID string, repo string, files []FileInfo) // A request was made by the peer node - Request(nodeID, repo string, name string, offset int64, size int) ([]byte, error) + Request(nodeID string, repo string, name string, offset int64, size int) ([]byte, error) // The peer node closed the connection Close(nodeID string, err error) } type Connection interface { ID() string - Index(string, []FileInfo) - Request(repo, name string, offset int64, size int) ([]byte, error) + Index(repo string, files []FileInfo) + Request(repo string, name string, offset int64, size int) ([]byte, error) Statistics() Statistics Option(key string) string } @@ -62,6 +63,7 @@ type rawConnection struct { reader io.ReadCloser xr *xdr.Reader writer io.WriteCloser + wb *bufio.Writer xw *xdr.Writer closed chan struct{} awaiting map[int]chan asyncResult @@ -73,8 +75,6 @@ type rawConnection struct { hasSentIndex bool hasRecvdIndex bool - - statisticsLock sync.Mutex } type asyncResult struct { @@ -93,6 +93,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M if err != nil { panic(err) } + wb := bufio.NewWriter(flwr) c := rawConnection{ id: nodeID, @@ -100,7 +101,8 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M reader: flrd, xr: xdr.NewReader(flrd), writer: flwr, - xw: xdr.NewWriter(flwr), + wb: wb, + xw: xdr.NewWriter(wb), closed: make(chan struct{}), awaiting: make(map[int]chan asyncResult), indexSent: make(map[string]map[string][2]int64), @@ -245,6 +247,7 @@ type flusher interface { } func (c *rawConnection) flush() error { + c.wb.Flush() if f, ok := c.writer.(flusher); ok { return f.Flush() } @@ -302,7 +305,15 @@ loop: c.close(c.xr.Error()) break loop } else { - c.receiver.Index(c.id, im.Files) + + // We run this (and the corresponding one for update, below) + // in a separate goroutine to avoid blocking the read loop. + // There is otherwise a potential deadlock where both sides + // has the model locked because it's sending a large index + // update and can't receive the large index update from the + // other side. + + go c.receiver.Index(c.id, im.Repository, im.Files) } c.Lock() c.hasRecvdIndex = true @@ -315,7 +326,7 @@ loop: c.close(c.xr.Error()) break loop } else { - c.receiver.IndexUpdate(c.id, im.Files) + go c.receiver.IndexUpdate(c.id, im.Repository, im.Files) } case messageTypeRequest: @@ -454,16 +465,11 @@ type Statistics struct { } func (c *rawConnection) Statistics() Statistics { - c.statisticsLock.Lock() - defer c.statisticsLock.Unlock() - - stats := Statistics{ + return Statistics{ At: time.Now(), InBytesTotal: int(c.xr.Tot()), OutBytesTotal: int(c.xw.Tot()), } - - return stats } func (c *rawConnection) Option(key string) string { diff --git a/protocol/wireformat.go b/protocol/wireformat.go index 117a016c3..d3963ccf3 100644 --- a/protocol/wireformat.go +++ b/protocol/wireformat.go @@ -15,10 +15,14 @@ func (c wireFormatConnection) ID() string { } func (c wireFormatConnection) Index(node string, fs []FileInfo) { + var myFs = make([]FileInfo, len(fs)) + copy(myFs, fs) + for i := range fs { - fs[i].Name = norm.NFC.String(filepath.ToSlash(fs[i].Name)) + myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) } - c.next.Index(node, fs) + + c.next.Index(node, myFs) } func (c wireFormatConnection) Request(repo, name string, offset int64, size int) ([]byte, error) {