syncthing/model/model.go
Jakob Borg 75d4d2df8b Remove SyncOrder, at least temporarily (sorry fREW)
Doesn't actually work very well with the batched approach to needed
files, not documented, not exposed in UI. I'll be happy to reintegrate
if this is solved.
2014-07-21 10:49:18 +02:00

910 lines
23 KiB
Go

// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package model
import (
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/calmh/syncthing/config"
"github.com/calmh/syncthing/events"
"github.com/calmh/syncthing/files"
"github.com/calmh/syncthing/lamport"
"github.com/calmh/syncthing/protocol"
"github.com/calmh/syncthing/scanner"
"github.com/syndtr/goleveldb/leveldb"
)
type repoState int
const (
RepoIdle repoState = iota
RepoScanning
RepoSyncing
RepoCleaning
)
func (s repoState) String() string {
switch s {
case RepoIdle:
return "idle"
case RepoScanning:
return "scanning"
case RepoCleaning:
return "cleaning"
case RepoSyncing:
return "syncing"
default:
return "unknown"
}
}
// Somewhat arbitrary amount of bytes that we choose to let represent the size
// of an unsynchronized directory entry or a deleted file. We need it to be
// larger than zero so that it's visible that there is some amount of bytes to
// transfer to bring the systems into synchronization.
const zeroEntrySize = 128
// How many files to send in each Index/IndexUpdate message.
const indexBatchSize = 1000
type Model struct {
indexDir string
cfg *config.Configuration
db *leveldb.DB
clientName string
clientVersion string
repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
repoFiles map[string]*files.Set // repo -> files
repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
nodeRepos map[protocol.NodeID][]string // nodeID -> repos
suppressor map[string]*suppressor // repo -> suppressor
rmut sync.RWMutex // protects the above
repoState map[string]repoState // repo -> state
repoStateChanged map[string]time.Time // repo -> time when state changed
smut sync.RWMutex
protoConn map[protocol.NodeID]protocol.Connection
rawConn map[protocol.NodeID]io.Closer
nodeVer map[protocol.NodeID]string
pmut sync.RWMutex // protects protoConn and rawConn
sentLocalVer map[protocol.NodeID]map[string]uint64
slMut sync.Mutex
sup suppressor
addedRepo bool
started bool
}
var (
ErrNoSuchFile = errors.New("no such file")
ErrInvalid = errors.New("file is invalid")
)
// NewModel creates and starts a new model. The model starts in read-only mode,
// where it sends index information to connected peers and responds to requests
// for file data without altering the local repository in any way.
func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string, db *leveldb.DB) *Model {
m := &Model{
indexDir: indexDir,
cfg: cfg,
db: db,
clientName: clientName,
clientVersion: clientVersion,
repoCfgs: make(map[string]config.RepositoryConfiguration),
repoFiles: make(map[string]*files.Set),
repoNodes: make(map[string][]protocol.NodeID),
nodeRepos: make(map[protocol.NodeID][]string),
repoState: make(map[string]repoState),
repoStateChanged: make(map[string]time.Time),
suppressor: make(map[string]*suppressor),
protoConn: make(map[protocol.NodeID]protocol.Connection),
rawConn: make(map[protocol.NodeID]io.Closer),
nodeVer: make(map[protocol.NodeID]string),
sentLocalVer: make(map[protocol.NodeID]map[string]uint64),
sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
}
var timeout = 20 * 60 // seconds
if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
it, err := strconv.Atoi(t)
if err == nil {
timeout = it
}
}
deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
return m
}
// StartRW starts read/write processing on the current model. When in
// read/write mode the model will attempt to keep in sync with the cluster by
// pulling needed files from peer nodes.
func (m *Model) StartRepoRW(repo string, threads int) {
m.rmut.RLock()
defer m.rmut.RUnlock()
if cfg, ok := m.repoCfgs[repo]; !ok {
panic("cannot start without repo")
} else {
newPuller(cfg, m, threads, m.cfg)
}
}
// StartRO starts read only processing on the current model. When in
// read only mode the model will announce files to the cluster but not
// pull in any external changes.
func (m *Model) StartRepoRO(repo string) {
m.StartRepoRW(repo, 0) // zero threads => read only
}
type ConnectionInfo struct {
protocol.Statistics
Address string
ClientVersion string
Completion int
}
// ConnectionStats returns a map with connection statistics for each connected node.
func (m *Model) ConnectionStats() map[string]ConnectionInfo {
type remoteAddrer interface {
RemoteAddr() net.Addr
}
m.pmut.RLock()
m.rmut.RLock()
var res = make(map[string]ConnectionInfo)
for node, conn := range m.protoConn {
ci := ConnectionInfo{
Statistics: conn.Statistics(),
ClientVersion: m.nodeVer[node],
}
if nc, ok := m.rawConn[node].(remoteAddrer); ok {
ci.Address = nc.RemoteAddr().String()
}
var tot int64
var have int64
for _, repo := range m.nodeRepos[node] {
m.repoFiles[repo].WithGlobal(func(f protocol.FileInfo) bool {
if !protocol.IsDeleted(f.Flags) {
var size int64
if protocol.IsDirectory(f.Flags) {
size = zeroEntrySize
} else {
size = f.Size()
}
tot += size
have += size
}
return true
})
m.repoFiles[repo].WithNeed(node, func(f protocol.FileInfo) bool {
if !protocol.IsDeleted(f.Flags) {
var size int64
if protocol.IsDirectory(f.Flags) {
size = zeroEntrySize
} else {
size = f.Size()
}
have -= size
}
return true
})
}
ci.Completion = 100
if tot != 0 {
ci.Completion = int(100 * have / tot)
}
res[node.String()] = ci
}
m.rmut.RUnlock()
m.pmut.RUnlock()
in, out := protocol.TotalInOut()
res["total"] = ConnectionInfo{
Statistics: protocol.Statistics{
At: time.Now(),
InBytesTotal: in,
OutBytesTotal: out,
},
}
return res
}
func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
for _, f := range fs {
fs, de, by := sizeOfFile(f)
files += fs
deleted += de
bytes += by
}
return
}
func sizeOfFile(f protocol.FileInfo) (files, deleted int, bytes int64) {
if !protocol.IsDeleted(f.Flags) {
files++
if !protocol.IsDirectory(f.Flags) {
bytes += f.Size()
} else {
bytes += zeroEntrySize
}
} else {
deleted++
bytes += zeroEntrySize
}
return
}
// GlobalSize returns the number of files, deleted files and total bytes for all
// files in the global model.
func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
rf.WithGlobal(func(f protocol.FileInfo) bool {
fs, de, by := sizeOfFile(f)
files += fs
deleted += de
bytes += by
return true
})
}
return
}
// LocalSize returns the number of files, deleted files and total bytes for all
// files in the local repository.
func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
rf.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
fs, de, by := sizeOfFile(f)
files += fs
deleted += de
bytes += by
return true
})
}
return
}
// NeedSize returns the number and total size of currently needed files.
func (m *Model) NeedSize(repo string) (files int, bytes int64) {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
fs, de, by := sizeOfFile(f)
files += fs + de
bytes += by
return true
})
}
return
}
// NeedFiles returns the list of currently needed files
func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo {
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
fs := make([]protocol.FileInfo, 0, indexBatchSize)
rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
fs = append(fs, f)
return len(fs) < indexBatchSize
})
return fs
}
return nil
}
// Index is called when a new node is connected and we receive their full index.
// Implements the protocol.Model interface.
func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
if debug {
l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
}
if !m.repoSharedWith(repo, nodeID) {
l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
return
}
for i := range fs {
lamport.Default.Tick(fs[i].Version)
}
m.rmut.RLock()
r, ok := m.repoFiles[repo]
m.rmut.RUnlock()
if ok {
r.Replace(nodeID, fs)
} else {
l.Fatalf("Index for nonexistant repo %q", repo)
}
events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
"node": nodeID.String(),
"repo": repo,
"items": len(fs),
"version": r.LocalVersion(nodeID),
})
}
// IndexUpdate is called for incremental updates to connected nodes' indexes.
// Implements the protocol.Model interface.
func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
if debug {
l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
}
if !m.repoSharedWith(repo, nodeID) {
l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
return
}
for i := range fs {
lamport.Default.Tick(fs[i].Version)
}
m.rmut.RLock()
r, ok := m.repoFiles[repo]
m.rmut.RUnlock()
if ok {
r.Update(nodeID, fs)
} else {
l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
}
events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
"node": nodeID.String(),
"repo": repo,
"items": len(fs),
"version": r.LocalVersion(nodeID),
})
}
func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
m.rmut.RLock()
defer m.rmut.RUnlock()
for _, nrepo := range m.nodeRepos[nodeID] {
if nrepo == repo {
return true
}
}
return false
}
func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
compErr := compareClusterConfig(m.clusterConfig(nodeID), config)
if debug {
l.Debugf("ClusterConfig: %s: %#v", nodeID, config)
l.Debugf(" ... compare: %s: %v", nodeID, compErr)
}
if compErr != nil {
l.Warnf("%s: %v", nodeID, compErr)
m.Close(nodeID, compErr)
}
m.pmut.Lock()
if config.ClientName == "syncthing" {
m.nodeVer[nodeID] = config.ClientVersion
} else {
m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
}
m.pmut.Unlock()
l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
}
// Close removes the peer from the model and closes the underlying connection if possible.
// Implements the protocol.Model interface.
func (m *Model) Close(node protocol.NodeID, err error) {
l.Infof("Connection to %s closed: %v", node, err)
events.Default.Log(events.NodeDisconnected, map[string]string{
"id": node.String(),
"error": err.Error(),
})
m.pmut.Lock()
m.rmut.RLock()
for _, repo := range m.nodeRepos[node] {
m.repoFiles[repo].Replace(node, nil)
}
m.rmut.RUnlock()
conn, ok := m.rawConn[node]
if ok {
conn.Close()
}
delete(m.protoConn, node)
delete(m.rawConn, node)
delete(m.nodeVer, node)
m.pmut.Unlock()
}
// Request returns the specified data segment by reading it from local disk.
// Implements the protocol.Model interface.
func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
// Verify that the requested file exists in the local model.
m.rmut.RLock()
r, ok := m.repoFiles[repo]
m.rmut.RUnlock()
if !ok {
l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
return nil, ErrNoSuchFile
}
lf := r.Get(protocol.LocalNodeID, name)
if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
if debug {
l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
}
return nil, ErrInvalid
}
if offset > lf.Size() {
if debug {
l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
}
return nil, ErrNoSuchFile
}
if debug && nodeID != protocol.LocalNodeID {
l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
}
m.rmut.RLock()
fn := filepath.Join(m.repoCfgs[repo].Directory, name)
m.rmut.RUnlock()
fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
if err != nil {
return nil, err
}
defer fd.Close()
buf := make([]byte, size)
_, err = fd.ReadAt(buf, offset)
if err != nil {
return nil, err
}
return buf, nil
}
// ReplaceLocal replaces the local repository index with the given list of files.
func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
m.rmut.RLock()
m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
m.rmut.RUnlock()
}
func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
m.rmut.RLock()
f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
m.rmut.RUnlock()
return f
}
func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
m.rmut.RLock()
f := m.repoFiles[repo].GetGlobal(file)
m.rmut.RUnlock()
return f
}
type cFiler struct {
m *Model
r string
}
// Implements scanner.CurrentFiler
func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
return cf.m.CurrentRepoFile(cf.r, file)
}
// ConnectedTo returns true if we are connected to the named node.
func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
m.pmut.RLock()
_, ok := m.protoConn[nodeID]
m.pmut.RUnlock()
return ok
}
// AddConnection adds a new peer connection to the model. An initial index will
// be sent to the connected peer, thereafter index updates whenever the local
// repository changes.
func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
nodeID := protoConn.ID()
m.pmut.Lock()
if _, ok := m.protoConn[nodeID]; ok {
panic("add existing node")
}
m.protoConn[nodeID] = protoConn
if _, ok := m.rawConn[nodeID]; ok {
panic("add existing node")
}
m.rawConn[nodeID] = rawConn
cm := m.clusterConfig(nodeID)
protoConn.ClusterConfig(cm)
m.rmut.RLock()
for _, repo := range m.nodeRepos[nodeID] {
fs := m.repoFiles[repo]
go sendIndexes(protoConn, repo, fs)
}
m.rmut.RUnlock()
m.pmut.Unlock()
}
func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) {
nodeID := conn.ID()
name := conn.Name()
if debug {
l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
}
initial := true
minLocalVer := uint64(0)
var err error
defer func() {
if debug {
l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
}
}()
for err == nil {
if !initial && fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
time.Sleep(1 * time.Second)
continue
}
batch := make([]protocol.FileInfo, 0, indexBatchSize)
maxLocalVer := uint64(0)
fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
if f.LocalVersion <= minLocalVer {
return true
}
if f.LocalVersion > maxLocalVer {
maxLocalVer = f.LocalVersion
}
if len(batch) == indexBatchSize {
if initial {
if err = conn.Index(repo, batch); err != nil {
return false
}
if debug {
l.Debugf("sendIndexes for %s-%s/%q: %d files (initial index)", nodeID, name, repo, len(batch))
}
initial = false
} else {
if err = conn.IndexUpdate(repo, batch); err != nil {
return false
}
if debug {
l.Debugf("sendIndexes for %s-%s/%q: %d files (batched update)", nodeID, name, repo, len(batch))
}
}
batch = make([]protocol.FileInfo, 0, indexBatchSize)
}
batch = append(batch, f)
return true
})
if initial {
err = conn.Index(repo, batch)
if debug && err == nil {
l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
}
initial = false
} else if len(batch) > 0 {
err = conn.IndexUpdate(repo, batch)
if debug && err == nil {
l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
}
}
minLocalVer = maxLocalVer
}
}
func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
f.LocalVersion = 0
m.rmut.RLock()
m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
m.rmut.RUnlock()
events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
"repo": repo,
"name": f.Name,
"modified": time.Unix(f.Modified, 0),
"flags": fmt.Sprintf("0%o", f.Flags),
"size": f.Size(),
})
}
func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
m.pmut.RLock()
nc, ok := m.protoConn[nodeID]
m.pmut.RUnlock()
if !ok {
return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
}
if debug {
l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
}
return nc.Request(repo, name, offset, size)
}
func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
if m.started {
panic("cannot add repo to started model")
}
if len(cfg.ID) == 0 {
panic("cannot add empty repo id")
}
m.rmut.Lock()
m.repoCfgs[cfg.ID] = cfg
m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
m.suppressor[cfg.ID] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
for i, node := range cfg.Nodes {
m.repoNodes[cfg.ID][i] = node.NodeID
m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
}
m.addedRepo = true
m.rmut.Unlock()
}
func (m *Model) ScanRepos() {
m.rmut.RLock()
var repos = make([]string, 0, len(m.repoCfgs))
for repo := range m.repoCfgs {
repos = append(repos, repo)
}
m.rmut.RUnlock()
var wg sync.WaitGroup
wg.Add(len(repos))
for _, repo := range repos {
repo := repo
go func() {
err := m.ScanRepo(repo)
if err != nil {
invalidateRepo(m.cfg, repo, err)
}
wg.Done()
}()
}
wg.Wait()
}
func (m *Model) CleanRepos() {
m.rmut.RLock()
var dirs = make([]string, 0, len(m.repoCfgs))
for _, cfg := range m.repoCfgs {
dirs = append(dirs, cfg.Directory)
}
m.rmut.RUnlock()
var wg sync.WaitGroup
wg.Add(len(dirs))
for _, dir := range dirs {
w := &scanner.Walker{
Dir: dir,
TempNamer: defTempNamer,
}
go func() {
w.CleanTempFiles()
wg.Done()
}()
}
wg.Wait()
}
func (m *Model) ScanRepo(repo string) error {
m.rmut.RLock()
fs := m.repoFiles[repo]
dir := m.repoCfgs[repo].Directory
w := &scanner.Walker{
Dir: dir,
IgnoreFile: ".stignore",
BlockSize: scanner.StandardBlockSize,
TempNamer: defTempNamer,
Suppressor: m.suppressor[repo],
CurrentFiler: cFiler{m, repo},
IgnorePerms: m.repoCfgs[repo].IgnorePerms,
}
m.rmut.RUnlock()
m.setState(repo, RepoScanning)
fchan, _, err := w.Walk()
if err != nil {
return err
}
batchSize := 100
batch := make([]protocol.FileInfo, 0, 00)
for f := range fchan {
if len(batch) == batchSize {
fs.Update(protocol.LocalNodeID, batch)
batch = batch[:0]
}
batch = append(batch, f)
}
if len(batch) > 0 {
fs.Update(protocol.LocalNodeID, batch)
}
batch = batch[:0]
fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
if !protocol.IsDeleted(f.Flags) {
if len(batch) == batchSize {
fs.Update(protocol.LocalNodeID, batch)
batch = batch[:0]
}
if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
// File has been deleted
f.Blocks = nil
f.Flags |= protocol.FlagDeleted
f.Version = lamport.Default.Tick(f.Version)
f.LocalVersion = 0
batch = append(batch, f)
}
}
return true
})
if len(batch) > 0 {
fs.Update(protocol.LocalNodeID, batch)
}
m.setState(repo, RepoIdle)
return nil
}
// clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
cm := protocol.ClusterConfigMessage{
ClientName: m.clientName,
ClientVersion: m.clientVersion,
}
m.rmut.RLock()
for _, repo := range m.nodeRepos[node] {
cr := protocol.Repository{
ID: repo,
}
for _, node := range m.repoNodes[repo] {
// TODO: Set read only bit when relevant
cr.Nodes = append(cr.Nodes, protocol.Node{
ID: node[:],
Flags: protocol.FlagShareTrusted,
})
}
cm.Repositories = append(cm.Repositories, cr)
}
m.rmut.RUnlock()
return cm
}
func (m *Model) setState(repo string, state repoState) {
m.smut.Lock()
oldState := m.repoState[repo]
changed, ok := m.repoStateChanged[repo]
if state != oldState {
m.repoState[repo] = state
m.repoStateChanged[repo] = time.Now()
eventData := map[string]interface{}{
"repo": repo,
"to": state.String(),
}
if ok {
eventData["duration"] = time.Since(changed).Seconds()
eventData["from"] = oldState.String()
}
events.Default.Log(events.StateChanged, eventData)
}
m.smut.Unlock()
}
func (m *Model) State(repo string) (string, time.Time) {
m.smut.RLock()
state := m.repoState[repo]
changed := m.repoStateChanged[repo]
m.smut.RUnlock()
return state.String(), changed
}
func (m *Model) Override(repo string) {
m.rmut.RLock()
fs := m.repoFiles[repo]
m.rmut.RUnlock()
batch := make([]protocol.FileInfo, 0, indexBatchSize)
fs.WithNeed(protocol.LocalNodeID, func(need protocol.FileInfo) bool {
if len(batch) == indexBatchSize {
fs.Update(protocol.LocalNodeID, batch)
batch = batch[:0]
}
have := fs.Get(protocol.LocalNodeID, need.Name)
if have.Name != need.Name {
// We are missing the file
need.Flags |= protocol.FlagDeleted
need.Blocks = nil
} else {
// We have the file, replace with our version
need = have
}
need.Version = lamport.Default.Tick(need.Version)
need.LocalVersion = 0
batch = append(batch, need)
return true
})
if len(batch) > 0 {
fs.Update(protocol.LocalNodeID, batch)
}
}
// Version returns the change version for the given repository. This is
// guaranteed to increment if the contents of the local or global repository
// has changed.
func (m *Model) LocalVersion(repo string) uint64 {
m.rmut.Lock()
defer m.rmut.Unlock()
fs, ok := m.repoFiles[repo]
if !ok {
return 0
}
ver := fs.LocalVersion(protocol.LocalNodeID)
for _, n := range m.repoNodes[repo] {
ver += fs.LocalVersion(n)
}
return ver
}