From c47aebdd2a54507ab210d1a955f14131f5eedbc7 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 30 Jul 2014 20:08:04 +0200 Subject: [PATCH] Don't hold memory used for sending indexes forever --- model/model.go | 123 +++++++++++++++++++++++++------------------------ 1 file changed, 63 insertions(+), 60 deletions(-) diff --git a/model/model.go b/model/model.go index 690fee805..d1d60a1a2 100644 --- a/model/model.go +++ b/model/model.go @@ -563,83 +563,86 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) { nodeID := conn.ID() name := conn.Name() + var err error if debug { l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo) } - initial := true - minLocalVer := uint64(0) - var err error - defer func() { if debug { l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err) } }() + minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs) + for err == nil { - if !initial { - time.Sleep(5 * time.Second) - if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer { - continue - } + time.Sleep(5 * time.Second) + if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer { + continue } - batch := make([]protocol.FileInfo, 0, indexBatchSize) - maxLocalVer := uint64(0) - - fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool { - if f.LocalVersion <= minLocalVer { - return true - } - - if f.LocalVersion > maxLocalVer { - maxLocalVer = f.LocalVersion - } - - if len(batch) == indexBatchSize { - if initial { - if err = conn.Index(repo, batch); err != nil { - return false - } - if debug { - l.Debugf("sendIndexes for %s-%s/%q: %d files (initial index)", nodeID, name, repo, len(batch)) - } - initial = false - } else { - if err = conn.IndexUpdate(repo, batch); err != nil { - return false - } - if debug { - l.Debugf("sendIndexes for %s-%s/%q: %d files (batched update)", nodeID, name, repo, len(batch)) - } - } - - batch = make([]protocol.FileInfo, 0, indexBatchSize) - } - - batch = append(batch, f) - return true - }) - - if initial { - err = conn.Index(repo, batch) - if debug && err == nil { - l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch)) - } - initial = false - } else if len(batch) > 0 { - err = conn.IndexUpdate(repo, batch) - if debug && err == nil { - l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch)) - } - } - - minLocalVer = maxLocalVer + minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs) } } +func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set) (uint64, error) { + nodeID := conn.ID() + name := conn.Name() + batch := make([]protocol.FileInfo, 0, indexBatchSize) + maxLocalVer := uint64(0) + var err error + + fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool { + if f.LocalVersion <= minLocalVer { + return true + } + + if f.LocalVersion > maxLocalVer { + maxLocalVer = f.LocalVersion + } + + if len(batch) == indexBatchSize { + if initial { + if err = conn.Index(repo, batch); err != nil { + return false + } + if debug { + l.Debugf("sendIndexes for %s-%s/%q: %d files (initial index)", nodeID, name, repo, len(batch)) + } + initial = false + } else { + if err = conn.IndexUpdate(repo, batch); err != nil { + return false + } + if debug { + l.Debugf("sendIndexes for %s-%s/%q: %d files (batched update)", nodeID, name, repo, len(batch)) + } + } + + batch = make([]protocol.FileInfo, 0, indexBatchSize) + } + + batch = append(batch, f) + return true + }) + + if initial && err == nil { + err = conn.Index(repo, batch) + if debug && err == nil { + l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch)) + } + } else if len(batch) > 0 && err == nil { + err = conn.IndexUpdate(repo, batch) + if debug && err == nil { + l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch)) + } + } + + return maxLocalVer, err +} + func (m *Model) updateLocal(repo string, f protocol.FileInfo) { f.LocalVersion = 0 m.rmut.RLock()