Don't hold memory used for sending indexes forever

This commit is contained in:
Jakob Borg 2014-07-30 20:08:04 +02:00
parent f4d1632506
commit c47aebdd2a

View File

@ -563,83 +563,86 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) { func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) {
nodeID := conn.ID() nodeID := conn.ID()
name := conn.Name() name := conn.Name()
var err error
if debug { if debug {
l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo) l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
} }
initial := true
minLocalVer := uint64(0)
var err error
defer func() { defer func() {
if debug { if debug {
l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err) l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
} }
}() }()
minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs)
for err == nil { for err == nil {
if !initial { time.Sleep(5 * time.Second)
time.Sleep(5 * time.Second) if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer { continue
continue
}
} }
batch := make([]protocol.FileInfo, 0, indexBatchSize) minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs)
maxLocalVer := uint64(0)
fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
if f.LocalVersion <= minLocalVer {
return true
}
if f.LocalVersion > maxLocalVer {
maxLocalVer = f.LocalVersion
}
if len(batch) == indexBatchSize {
if initial {
if err = conn.Index(repo, batch); err != nil {
return false
}
if debug {
l.Debugf("sendIndexes for %s-%s/%q: %d files (initial index)", nodeID, name, repo, len(batch))
}
initial = false
} else {
if err = conn.IndexUpdate(repo, batch); err != nil {
return false
}
if debug {
l.Debugf("sendIndexes for %s-%s/%q: %d files (batched update)", nodeID, name, repo, len(batch))
}
}
batch = make([]protocol.FileInfo, 0, indexBatchSize)
}
batch = append(batch, f)
return true
})
if initial {
err = conn.Index(repo, batch)
if debug && err == nil {
l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
}
initial = false
} else if len(batch) > 0 {
err = conn.IndexUpdate(repo, batch)
if debug && err == nil {
l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
}
}
minLocalVer = maxLocalVer
} }
} }
func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set) (uint64, error) {
nodeID := conn.ID()
name := conn.Name()
batch := make([]protocol.FileInfo, 0, indexBatchSize)
maxLocalVer := uint64(0)
var err error
fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
if f.LocalVersion <= minLocalVer {
return true
}
if f.LocalVersion > maxLocalVer {
maxLocalVer = f.LocalVersion
}
if len(batch) == indexBatchSize {
if initial {
if err = conn.Index(repo, batch); err != nil {
return false
}
if debug {
l.Debugf("sendIndexes for %s-%s/%q: %d files (initial index)", nodeID, name, repo, len(batch))
}
initial = false
} else {
if err = conn.IndexUpdate(repo, batch); err != nil {
return false
}
if debug {
l.Debugf("sendIndexes for %s-%s/%q: %d files (batched update)", nodeID, name, repo, len(batch))
}
}
batch = make([]protocol.FileInfo, 0, indexBatchSize)
}
batch = append(batch, f)
return true
})
if initial && err == nil {
err = conn.Index(repo, batch)
if debug && err == nil {
l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
}
} else if len(batch) > 0 && err == nil {
err = conn.IndexUpdate(repo, batch)
if debug && err == nil {
l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
}
}
return maxLocalVer, err
}
func (m *Model) updateLocal(repo string, f protocol.FileInfo) { func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
f.LocalVersion = 0 f.LocalVersion = 0
m.rmut.RLock() m.rmut.RLock()