Refactor model into separate package

This commit is contained in:
Jakob Borg 2014-05-15 00:26:55 -03:00
parent f8e34c083e
commit 3e34fc66e6
20 changed files with 161 additions and 127 deletions

View File

@ -7,7 +7,4 @@ import (
var ( var (
debugNet = strings.Contains(os.Getenv("STTRACE"), "net") || os.Getenv("STTRACE") == "all" debugNet = strings.Contains(os.Getenv("STTRACE"), "net") || os.Getenv("STTRACE") == "all"
debugIdx = strings.Contains(os.Getenv("STTRACE"), "idx") || os.Getenv("STTRACE") == "all"
debugNeed = strings.Contains(os.Getenv("STTRACE"), "need") || os.Getenv("STTRACE") == "all"
debugPull = strings.Contains(os.Getenv("STTRACE"), "pull") || os.Getenv("STTRACE") == "all"
) )

View File

@ -16,6 +16,7 @@ import (
"code.google.com/p/go.crypto/bcrypt" "code.google.com/p/go.crypto/bcrypt"
"github.com/calmh/syncthing/config" "github.com/calmh/syncthing/config"
"github.com/calmh/syncthing/logger" "github.com/calmh/syncthing/logger"
"github.com/calmh/syncthing/model"
"github.com/codegangsta/martini" "github.com/codegangsta/martini"
) )
@ -40,7 +41,7 @@ func init() {
l.AddHandler(logger.LevelWarn, showGuiError) l.AddHandler(logger.LevelWarn, showGuiError)
} }
func startGUI(cfg config.GUIConfiguration, m *Model) error { func startGUI(cfg config.GUIConfiguration, m *model.Model) error {
listener, err := net.Listen("tcp", cfg.Address) listener, err := net.Listen("tcp", cfg.Address)
if err != nil { if err != nil {
return err return err
@ -95,7 +96,7 @@ func restGetVersion() string {
return Version return Version
} }
func restGetModel(m *Model, w http.ResponseWriter, r *http.Request) { func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) {
var qs = r.URL.Query() var qs = r.URL.Query()
var repo = qs.Get("repo") var repo = qs.Get("repo")
var res = make(map[string]interface{}) var res = make(map[string]interface{})
@ -124,7 +125,7 @@ func restGetModel(m *Model, w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(res) json.NewEncoder(w).Encode(res)
} }
func restGetConnections(m *Model, w http.ResponseWriter) { func restGetConnections(m *model.Model, w http.ResponseWriter) {
var res = m.ConnectionStats() var res = m.ConnectionStats()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(res) json.NewEncoder(w).Encode(res)

View File

@ -23,13 +23,12 @@ import (
"github.com/calmh/syncthing/config" "github.com/calmh/syncthing/config"
"github.com/calmh/syncthing/discover" "github.com/calmh/syncthing/discover"
"github.com/calmh/syncthing/logger" "github.com/calmh/syncthing/logger"
"github.com/calmh/syncthing/model"
"github.com/calmh/syncthing/protocol" "github.com/calmh/syncthing/protocol"
"github.com/calmh/syncthing/upnp" "github.com/calmh/syncthing/upnp"
"github.com/juju/ratelimit" "github.com/juju/ratelimit"
) )
const BlockSize = 128 * 1024
var ( var (
Version = "unknown-dev" Version = "unknown-dev"
BuildStamp = "0" BuildStamp = "0"
@ -75,15 +74,12 @@ const (
STTRACE A comma separated string of facilities to trace. The valid STTRACE A comma separated string of facilities to trace. The valid
facility strings: facility strings:
- "discover" (the node discovery package) - "beacon" (the beacon package)
- "files" (file set store) - "discover" (the discover package)
- "idx" (index sending and receiving) - "files" (the files package)
- "mc" (multicast beacon) - "net" (the main packge; connections & network messages)
- "need" (file need calculations) - "scanner" (the scanner package)
- "net" (connecting and disconnecting, network messages) - "upnp" (the upnp package)
- "pull" (file pull activity)
- "scanner" (the file change scanner)
- "upnp" (the upnp port mapper)
- "all" (all of the above) - "all" (all of the above)
STCPUPROFILE Write CPU profile to the specified file.` STCPUPROFILE Write CPU profile to the specified file.`
@ -245,7 +241,7 @@ func main() {
rateBucket = ratelimit.NewBucketWithRate(float64(1000*cfg.Options.MaxSendKbps), int64(5*1000*cfg.Options.MaxSendKbps)) rateBucket = ratelimit.NewBucketWithRate(float64(1000*cfg.Options.MaxSendKbps), int64(5*1000*cfg.Options.MaxSendKbps))
} }
m := NewModel(cfg.Options.MaxChangeKbps * 1000) m := model.NewModel(confDir, &cfg, "syncthing", Version)
for _, repo := range cfg.Repositories { for _, repo := range cfg.Repositories {
if repo.Invalid != "" { if repo.Invalid != "" {
@ -313,6 +309,7 @@ func main() {
ensureDir(dir, -1) ensureDir(dir, -1)
} }
m.CleanRepos()
m.ScanRepos() m.ScanRepos()
m.SaveIndexes(confDir) m.SaveIndexes(confDir)
@ -466,7 +463,7 @@ func saveConfigLoop(cfgFile string) {
continue continue
} }
err = Rename(cfgFile+".tmp", cfgFile) err = model.Rename(cfgFile+".tmp", cfgFile)
if err != nil { if err != nil {
l.Warnln(err) l.Warnln(err)
} }
@ -477,7 +474,7 @@ func saveConfig() {
saveConfigCh <- struct{}{} saveConfigCh <- struct{}{}
} }
func listenConnect(myID string, m *Model, tlsCfg *tls.Config) { func listenConnect(myID string, m *model.Model, tlsCfg *tls.Config) {
var conns = make(chan *tls.Conn) var conns = make(chan *tls.Conn)
// Listen // Listen

View File

@ -57,7 +57,7 @@ type OptionsConfiguration struct {
MaxSendKbps int `xml:"maxSendKbps"` MaxSendKbps int `xml:"maxSendKbps"`
RescanIntervalS int `xml:"rescanIntervalS" default:"60"` RescanIntervalS int `xml:"rescanIntervalS" default:"60"`
ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60"` ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60"`
MaxChangeKbps int `xml:"maxChangeKbps" default:"1000"` MaxChangeKbps int `xml:"maxChangeKbps" default:"10000"`
StartBrowser bool `xml:"startBrowser" default:"true"` StartBrowser bool `xml:"startBrowser" default:"true"`
UPnPEnabled bool `xml:"upnpEnabled" default:"true"` UPnPEnabled bool `xml:"upnpEnabled" default:"true"`

View File

@ -35,7 +35,7 @@
<maxSendKbps>0</maxSendKbps> <maxSendKbps>0</maxSendKbps>
<rescanIntervalS>10</rescanIntervalS> <rescanIntervalS>10</rescanIntervalS>
<reconnectionIntervalS>5</reconnectionIntervalS> <reconnectionIntervalS>5</reconnectionIntervalS>
<maxChangeKbps>1000</maxChangeKbps> <maxChangeKbps>10000</maxChangeKbps>
<startBrowser>false</startBrowser> <startBrowser>false</startBrowser>
</options> </options>
</configuration> </configuration>

View File

@ -40,7 +40,7 @@
<maxSendKbps>0</maxSendKbps> <maxSendKbps>0</maxSendKbps>
<rescanIntervalS>15</rescanIntervalS> <rescanIntervalS>15</rescanIntervalS>
<reconnectionIntervalS>5</reconnectionIntervalS> <reconnectionIntervalS>5</reconnectionIntervalS>
<maxChangeKbps>1000</maxChangeKbps> <maxChangeKbps>10000</maxChangeKbps>
<startBrowser>false</startBrowser> <startBrowser>false</startBrowser>
</options> </options>
</configuration> </configuration>

View File

@ -32,7 +32,7 @@
<maxSendKbps>0</maxSendKbps> <maxSendKbps>0</maxSendKbps>
<rescanIntervalS>20</rescanIntervalS> <rescanIntervalS>20</rescanIntervalS>
<reconnectionIntervalS>5</reconnectionIntervalS> <reconnectionIntervalS>5</reconnectionIntervalS>
<maxChangeKbps>1000</maxChangeKbps> <maxChangeKbps>10000</maxChangeKbps>
<startBrowser>false</startBrowser> <startBrowser>false</startBrowser>
</options> </options>
</configuration> </configuration>

View File

@ -29,7 +29,7 @@
<maxSendKbps>0</maxSendKbps> <maxSendKbps>0</maxSendKbps>
<rescanIntervalS>60</rescanIntervalS> <rescanIntervalS>60</rescanIntervalS>
<reconnectionIntervalS>10</reconnectionIntervalS> <reconnectionIntervalS>10</reconnectionIntervalS>
<maxChangeKbps>1000</maxChangeKbps> <maxChangeKbps>10000</maxChangeKbps>
<startBrowser>false</startBrowser> <startBrowser>false</startBrowser>
<upnpEnabled>false</upnpEnabled> <upnpEnabled>false</upnpEnabled>
</options> </options>

View File

@ -1,4 +1,4 @@
package main package model
import ( import (
"sync/atomic" "sync/atomic"

13
model/debug.go Normal file
View File

@ -0,0 +1,13 @@
package model
import (
"os"
"strings"
"github.com/calmh/syncthing/logger"
)
var (
debug = strings.Contains(os.Getenv("STTRACE"), "model") || os.Getenv("STTRACE") == "all"
l = logger.DefaultLogger
)

View File

@ -1,4 +1,4 @@
package main package model
import ( import (
"compress/gzip" "compress/gzip"
@ -31,11 +31,18 @@ const (
) )
type Model struct { type Model struct {
indexDir string
cfg *config.Configuration
clientName string
clientVersion string
repoDirs map[string]string // repo -> dir repoDirs map[string]string // repo -> dir
repoFiles map[string]*files.Set // repo -> files repoFiles map[string]*files.Set // repo -> files
repoNodes map[string][]string // repo -> nodeIDs repoNodes map[string][]string // repo -> nodeIDs
nodeRepos map[string][]string // nodeID -> repos nodeRepos map[string][]string // nodeID -> repos
repoState map[string]repoState // repo -> state repoState map[string]repoState // repo -> state
suppressor map[string]*suppressor // repo -> suppressor
rmut sync.RWMutex // protects the above rmut sync.RWMutex // protects the above
cm *cid.Map cm *cid.Map
@ -59,18 +66,23 @@ var (
// NewModel creates and starts a new model. The model starts in read-only mode, // NewModel creates and starts a new model. The model starts in read-only mode,
// where it sends index information to connected peers and responds to requests // where it sends index information to connected peers and responds to requests
// for file data without altering the local repository in any way. // for file data without altering the local repository in any way.
func NewModel(maxChangeBw int) *Model { func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string) *Model {
m := &Model{ m := &Model{
indexDir: indexDir,
cfg: cfg,
clientName: clientName,
clientVersion: clientVersion,
repoDirs: make(map[string]string), repoDirs: make(map[string]string),
repoFiles: make(map[string]*files.Set), repoFiles: make(map[string]*files.Set),
repoNodes: make(map[string][]string), repoNodes: make(map[string][]string),
nodeRepos: make(map[string][]string), nodeRepos: make(map[string][]string),
repoState: make(map[string]repoState), repoState: make(map[string]repoState),
suppressor: make(map[string]*suppressor),
cm: cid.NewMap(), cm: cid.NewMap(),
protoConn: make(map[string]protocol.Connection), protoConn: make(map[string]protocol.Connection),
rawConn: make(map[string]io.Closer), rawConn: make(map[string]io.Closer),
nodeVer: make(map[string]string), nodeVer: make(map[string]string),
sup: suppressor{threshold: int64(maxChangeBw)}, sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
} }
go m.broadcastIndexLoop() go m.broadcastIndexLoop()
@ -87,7 +99,7 @@ func (m *Model) StartRepoRW(repo string, threads int) {
if dir, ok := m.repoDirs[repo]; !ok { if dir, ok := m.repoDirs[repo]; !ok {
panic("cannot start without repo") panic("cannot start without repo")
} else { } else {
newPuller(repo, dir, m, threads) newPuller(repo, dir, m, threads, m.cfg)
} }
} }
@ -214,7 +226,7 @@ func (m *Model) NeedFilesRepo(repo string) []scanner.File {
// Index is called when a new node is connected and we receive their full index. // Index is called when a new node is connected and we receive their full index.
// Implements the protocol.Model interface. // Implements the protocol.Model interface.
func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) { func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) {
if debugNet { if debug {
l.Debugf("IDX(in): %s / %q: %d files", nodeID, repo, len(fs)) l.Debugf("IDX(in): %s / %q: %d files", nodeID, repo, len(fs))
} }
@ -237,7 +249,7 @@ func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) {
// IndexUpdate is called for incremental updates to connected nodes' indexes. // IndexUpdate is called for incremental updates to connected nodes' indexes.
// Implements the protocol.Model interface. // Implements the protocol.Model interface.
func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo) { func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo) {
if debugNet { if debug {
l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs)) l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
} }
@ -259,7 +271,7 @@ func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo)
func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessage) { func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessage) {
compErr := compareClusterConfig(m.clusterConfig(nodeID), config) compErr := compareClusterConfig(m.clusterConfig(nodeID), config)
if debugNet { if debug {
l.Debugf("ClusterConfig: %s: %#v", nodeID, config) l.Debugf("ClusterConfig: %s: %#v", nodeID, config)
l.Debugf(" ... compare: %s: %v", nodeID, compErr) l.Debugf(" ... compare: %s: %v", nodeID, compErr)
} }
@ -281,7 +293,7 @@ func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessag
// Close removes the peer from the model and closes the underlying connection if possible. // Close removes the peer from the model and closes the underlying connection if possible.
// Implements the protocol.Model interface. // Implements the protocol.Model interface.
func (m *Model) Close(node string, err error) { func (m *Model) Close(node string, err error) {
if debugNet { if debug {
l.Debugf("%s: %v", node, err) l.Debugf("%s: %v", node, err)
} }
@ -329,13 +341,13 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by
} }
if offset > lf.Size { if offset > lf.Size {
if debugNet { if debug {
l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size) l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
} }
return nil, ErrNoSuchFile return nil, ErrNoSuchFile
} }
if debugNet && nodeID != "<local>" { if debug && nodeID != "<local>" {
l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size) l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
} }
m.rmut.RLock() m.rmut.RLock()
@ -436,7 +448,7 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
go func() { go func() {
for repo, idx := range idxToSend { for repo, idx := range idxToSend {
if debugNet { if debug {
l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx)) l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
} }
protoConn.Index(repo, idx) protoConn.Index(repo, idx)
@ -452,7 +464,7 @@ func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
for _, f := range fs { for _, f := range fs {
mf := fileInfoFromFile(f) mf := fileInfoFromFile(f)
if debugIdx { if debug {
var flagComment string var flagComment string
if mf.Flags&protocol.FlagDeleted != 0 { if mf.Flags&protocol.FlagDeleted != 0 {
flagComment = " (deleted)" flagComment = " (deleted)"
@ -480,7 +492,7 @@ func (m *Model) requestGlobal(nodeID, repo, name string, offset int64, size int,
return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID) return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
} }
if debugNet { if debug {
l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash) l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
} }
@ -503,13 +515,13 @@ func (m *Model) broadcastIndexLoop() {
lastChange[repo] = c lastChange[repo] = c
idx := m.protocolIndex(repo) idx := m.protocolIndex(repo)
m.saveIndex(repo, confDir, idx) m.saveIndex(repo, m.indexDir, idx)
var indexWg sync.WaitGroup var indexWg sync.WaitGroup
for _, nodeID := range m.repoNodes[repo] { for _, nodeID := range m.repoNodes[repo] {
if conn, ok := m.protoConn[nodeID]; ok { if conn, ok := m.protoConn[nodeID]; ok {
indexWg.Add(1) indexWg.Add(1)
if debugNet { if debug {
l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx)) l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx))
} }
go func() { go func() {
@ -538,6 +550,7 @@ func (m *Model) AddRepo(id, dir string, nodes []config.NodeConfiguration) {
m.rmut.Lock() m.rmut.Lock()
m.repoDirs[id] = dir m.repoDirs[id] = dir
m.repoFiles[id] = files.NewSet() m.repoFiles[id] = files.NewSet()
m.suppressor[id] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
m.repoNodes[id] = make([]string, len(nodes)) m.repoNodes[id] = make([]string, len(nodes))
for i, node := range nodes { for i, node := range nodes {
@ -569,15 +582,37 @@ func (m *Model) ScanRepos() {
wg.Wait() wg.Wait()
} }
func (m *Model) CleanRepos() {
m.rmut.RLock()
var dirs = make([]string, 0, len(m.repoDirs))
for _, dir := range m.repoDirs {
dirs = append(dirs, dir)
}
m.rmut.RUnlock()
var wg sync.WaitGroup
wg.Add(len(dirs))
for _, dir := range dirs {
w := &scanner.Walker{
Dir: dir,
TempNamer: defTempNamer,
}
go func() {
w.CleanTempFiles()
wg.Done()
}()
}
wg.Wait()
}
func (m *Model) ScanRepo(repo string) error { func (m *Model) ScanRepo(repo string) error {
sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)}
m.rmut.RLock() m.rmut.RLock()
w := &scanner.Walker{ w := &scanner.Walker{
Dir: m.repoDirs[repo], Dir: m.repoDirs[repo],
IgnoreFile: ".stignore", IgnoreFile: ".stignore",
BlockSize: BlockSize, BlockSize: scanner.StandardBlockSize,
TempNamer: defTempNamer, TempNamer: defTempNamer,
Suppressor: sup, Suppressor: m.suppressor[repo],
CurrentFiler: cFiler{m, repo}, CurrentFiler: cFiler{m, repo},
} }
m.rmut.RUnlock() m.rmut.RUnlock()
@ -660,8 +695,8 @@ func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo {
// clusterConfig returns a ClusterConfigMessage that is correct for the given peer node // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
func (m *Model) clusterConfig(node string) protocol.ClusterConfigMessage { func (m *Model) clusterConfig(node string) protocol.ClusterConfigMessage {
cm := protocol.ClusterConfigMessage{ cm := protocol.ClusterConfigMessage{
ClientName: "syncthing", ClientName: m.clientName,
ClientVersion: Version, ClientVersion: m.clientVersion,
} }
m.rmut.RLock() m.rmut.RLock()

View File

@ -1,4 +1,4 @@
package main package model
import ( import (
"bytes" "bytes"

View File

@ -1,4 +1,4 @@
package main package model
import ( import (
"bytes" "bytes"
@ -62,6 +62,7 @@ func (m activityMap) decrease(node string) {
var errNoNode = errors.New("no available source node") var errNoNode = errors.New("no available source node")
type puller struct { type puller struct {
cfg *config.Configuration
repo string repo string
dir string dir string
bq *blockQueue bq *blockQueue
@ -73,8 +74,9 @@ type puller struct {
requestResults chan requestResult requestResults chan requestResult
} }
func newPuller(repo, dir string, model *Model, slots int) *puller { func newPuller(repo, dir string, model *Model, slots int, cfg *config.Configuration) *puller {
p := &puller{ p := &puller{
cfg: cfg,
repo: repo, repo: repo,
dir: dir, dir: dir,
bq: newBlockQueue(), bq: newBlockQueue(),
@ -91,13 +93,13 @@ func newPuller(repo, dir string, model *Model, slots int) *puller {
for i := 0; i < slots; i++ { for i := 0; i < slots; i++ {
p.requestSlots <- true p.requestSlots <- true
} }
if debugPull { if debug {
l.Debugf("starting puller; repo %q dir %q slots %d", repo, dir, slots) l.Debugf("starting puller; repo %q dir %q slots %d", repo, dir, slots)
} }
go p.run() go p.run()
} else { } else {
// Read only // Read only
if debugPull { if debug {
l.Debugf("starting puller; repo %q dir %q (read only)", repo, dir) l.Debugf("starting puller; repo %q dir %q (read only)", repo, dir)
} }
go p.runRO() go p.runRO()
@ -111,14 +113,14 @@ func (p *puller) run() {
for { for {
<-p.requestSlots <-p.requestSlots
b := p.bq.get() b := p.bq.get()
if debugPull { if debug {
l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repo, b.file.Name, b.block.Offset, len(b.copy)) l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repo, b.file.Name, b.block.Offset, len(b.copy))
} }
p.blocks <- b p.blocks <- b
} }
}() }()
walkTicker := time.Tick(time.Duration(cfg.Options.RescanIntervalS) * time.Second) walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
timeout := time.Tick(5 * time.Second) timeout := time.Tick(5 * time.Second)
changed := true changed := true
@ -146,7 +148,7 @@ func (p *puller) run() {
// Nothing more to do for the moment // Nothing more to do for the moment
break pull break pull
} }
if debugPull { if debug {
l.Debugf("%q: idle but have %d open files", p.repo, len(p.openFiles)) l.Debugf("%q: idle but have %d open files", p.repo, len(p.openFiles))
i := 5 i := 5
for _, f := range p.openFiles { for _, f := range p.openFiles {
@ -171,12 +173,12 @@ func (p *puller) run() {
// Do a rescan if it's time for it // Do a rescan if it's time for it
select { select {
case <-walkTicker: case <-walkTicker:
if debugPull { if debug {
l.Debugf("%q: time for rescan", p.repo) l.Debugf("%q: time for rescan", p.repo)
} }
err := p.model.ScanRepo(p.repo) err := p.model.ScanRepo(p.repo)
if err != nil { if err != nil {
invalidateRepo(cfg, p.repo, err) invalidateRepo(p.cfg, p.repo, err)
return return
} }
@ -189,15 +191,15 @@ func (p *puller) run() {
} }
func (p *puller) runRO() { func (p *puller) runRO() {
walkTicker := time.Tick(time.Duration(cfg.Options.RescanIntervalS) * time.Second) walkTicker := time.Tick(time.Duration(p.cfg.Options.RescanIntervalS) * time.Second)
for _ = range walkTicker { for _ = range walkTicker {
if debugPull { if debug {
l.Debugf("%q: time for rescan", p.repo) l.Debugf("%q: time for rescan", p.repo)
} }
err := p.model.ScanRepo(p.repo) err := p.model.ScanRepo(p.repo)
if err != nil { if err != nil {
invalidateRepo(cfg, p.repo, err) invalidateRepo(p.cfg, p.repo, err)
return return
} }
} }
@ -226,7 +228,7 @@ func (p *puller) fixupDirectories() {
} }
if cur.Flags&protocol.FlagDeleted != 0 { if cur.Flags&protocol.FlagDeleted != 0 {
if debugPull { if debug {
l.Debugf("queue delete dir: %v", cur) l.Debugf("queue delete dir: %v", cur)
} }
@ -240,7 +242,7 @@ func (p *puller) fixupDirectories() {
if cur.Flags&uint32(os.ModePerm) != uint32(info.Mode()&os.ModePerm) { if cur.Flags&uint32(os.ModePerm) != uint32(info.Mode()&os.ModePerm) {
os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm) os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm)
if debugPull { if debug {
l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur) l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur)
} }
} }
@ -248,7 +250,7 @@ func (p *puller) fixupDirectories() {
if cur.Modified != info.ModTime().Unix() { if cur.Modified != info.ModTime().Unix() {
t := time.Unix(cur.Modified, 0) t := time.Unix(cur.Modified, 0)
os.Chtimes(path, t, t) os.Chtimes(path, t, t)
if debugPull { if debug {
l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur) l.Debugf("restored dir modtime: %d -> %v", info.ModTime().Unix(), cur)
} }
} }
@ -258,7 +260,7 @@ func (p *puller) fixupDirectories() {
// Delete any queued directories // Delete any queued directories
for i := len(deleteDirs) - 1; i >= 0; i-- { for i := len(deleteDirs) - 1; i >= 0; i-- {
if debugPull { if debug {
l.Debugln("delete dir:", deleteDirs[i]) l.Debugln("delete dir:", deleteDirs[i])
} }
err := os.Remove(deleteDirs[i]) err := os.Remove(deleteDirs[i])
@ -284,7 +286,7 @@ func (p *puller) handleRequestResult(res requestResult) {
of.outstanding-- of.outstanding--
p.openFiles[f.Name] = of p.openFiles[f.Name] = of
if debugPull { if debug {
l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repo, f.Name, res.offset, of.outstanding, of.done) l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repo, f.Name, res.offset, of.outstanding, of.done)
} }
@ -314,7 +316,7 @@ func (p *puller) handleBlock(b bqBlock) bool {
of.done = b.last of.done = b.last
if !ok { if !ok {
if debugPull { if debug {
l.Debugf("pull: %q: opening file %q", p.repo, f.Name) l.Debugf("pull: %q: opening file %q", p.repo, f.Name)
} }
@ -333,7 +335,7 @@ func (p *puller) handleBlock(b bqBlock) bool {
of.file, of.err = os.Create(of.temp) of.file, of.err = os.Create(of.temp)
if of.err != nil { if of.err != nil {
if debugPull { if debug {
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err) l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err)
} }
if !b.last { if !b.last {
@ -346,7 +348,7 @@ func (p *puller) handleBlock(b bqBlock) bool {
if of.err != nil { if of.err != nil {
// We have already failed this file. // We have already failed this file.
if debugPull { if debug {
l.Debugf("pull: error: %q / %q has already failed: %v", p.repo, f.Name, of.err) l.Debugf("pull: error: %q / %q has already failed: %v", p.repo, f.Name, of.err)
} }
if b.last { if b.last {
@ -378,14 +380,14 @@ func (p *puller) handleCopyBlock(b bqBlock) {
f := b.file f := b.file
of := p.openFiles[f.Name] of := p.openFiles[f.Name]
if debugPull { if debug {
l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repo, f.Name) l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repo, f.Name)
} }
var exfd *os.File var exfd *os.File
exfd, of.err = os.Open(of.filepath) exfd, of.err = os.Open(of.filepath)
if of.err != nil { if of.err != nil {
if debugPull { if debug {
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err) l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err)
} }
of.file.Close() of.file.Close()
@ -404,7 +406,7 @@ func (p *puller) handleCopyBlock(b bqBlock) {
} }
buffers.Put(bs) buffers.Put(bs)
if of.err != nil { if of.err != nil {
if debugPull { if debug {
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err) l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, of.err)
} }
exfd.Close() exfd.Close()
@ -447,7 +449,7 @@ func (p *puller) handleRequestBlock(b bqBlock) bool {
p.openFiles[f.Name] = of p.openFiles[f.Name] = of
go func(node string, b bqBlock) { go func(node string, b bqBlock) {
if debugPull { if debug {
l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repo, f.Name, b.block.Offset, b.block.Size, node, of.outstanding) l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repo, f.Name, b.block.Offset, b.block.Size, node, of.outstanding)
} }
@ -476,13 +478,13 @@ func (p *puller) handleEmptyBlock(b bqBlock) {
} }
if f.Flags&protocol.FlagDeleted != 0 { if f.Flags&protocol.FlagDeleted != 0 {
if debugPull { if debug {
l.Debugf("pull: delete %q", f.Name) l.Debugf("pull: delete %q", f.Name)
} }
os.Remove(of.temp) os.Remove(of.temp)
os.Remove(of.filepath) os.Remove(of.filepath)
} else { } else {
if debugPull { if debug {
l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repo, f.Name) l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repo, f.Name)
} }
t := time.Unix(f.Modified, 0) t := time.Unix(f.Modified, 0)
@ -500,7 +502,7 @@ func (p *puller) queueNeededBlocks() {
for _, f := range p.model.NeedFilesRepo(p.repo) { for _, f := range p.model.NeedFilesRepo(p.repo) {
lf := p.model.CurrentRepoFile(p.repo, f.Name) lf := p.model.CurrentRepoFile(p.repo, f.Name)
have, need := scanner.BlockDiff(lf.Blocks, f.Blocks) have, need := scanner.BlockDiff(lf.Blocks, f.Blocks)
if debugNeed { if debug {
l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need) l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need)
} }
queued++ queued++
@ -510,13 +512,13 @@ func (p *puller) queueNeededBlocks() {
need: need, need: need,
}) })
} }
if debugPull && queued > 0 { if debug && queued > 0 {
l.Debugf("%q: queued %d blocks", p.repo, queued) l.Debugf("%q: queued %d blocks", p.repo, queued)
} }
} }
func (p *puller) closeFile(f scanner.File) { func (p *puller) closeFile(f scanner.File) {
if debugPull { if debug {
l.Debugf("pull: closing %q / %q", p.repo, f.Name) l.Debugf("pull: closing %q / %q", p.repo, f.Name)
} }
@ -528,16 +530,16 @@ func (p *puller) closeFile(f scanner.File) {
fd, err := os.Open(of.temp) fd, err := os.Open(of.temp)
if err != nil { if err != nil {
if debugPull { if debug {
l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, err) l.Debugf("pull: error: %q / %q: %v", p.repo, f.Name, err)
} }
return return
} }
hb, _ := scanner.Blocks(fd, BlockSize) hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize)
fd.Close() fd.Close()
if l0, l1 := len(hb), len(f.Blocks); l0 != l1 { if l0, l1 := len(hb), len(f.Blocks); l0 != l1 {
if debugPull { if debug {
l.Debugf("pull: %q / %q: nblocks %d != %d", p.repo, f.Name, l0, l1) l.Debugf("pull: %q / %q: nblocks %d != %d", p.repo, f.Name, l0, l1)
} }
return return
@ -554,7 +556,7 @@ func (p *puller) closeFile(f scanner.File) {
os.Chtimes(of.temp, t, t) os.Chtimes(of.temp, t, t)
os.Chmod(of.temp, os.FileMode(f.Flags&0777)) os.Chmod(of.temp, os.FileMode(f.Flags&0777))
defTempNamer.Show(of.temp) defTempNamer.Show(of.temp)
if debugPull { if debug {
l.Debugf("pull: rename %q / %q: %q", p.repo, f.Name, of.filepath) l.Debugf("pull: rename %q / %q: %q", p.repo, f.Name, of.filepath)
} }
if err := Rename(of.temp, of.filepath); err == nil { if err := Rename(of.temp, of.filepath); err == nil {
@ -564,7 +566,7 @@ func (p *puller) closeFile(f scanner.File) {
} }
} }
func invalidateRepo(cfg config.Configuration, repoID string, err error) { func invalidateRepo(cfg *config.Configuration, repoID string, err error) {
for i := range cfg.Repositories { for i := range cfg.Repositories {
repo := &cfg.Repositories[i] repo := &cfg.Repositories[i]
if repo.ID == repoID { if repo.ID == repoID {

View File

@ -1,4 +1,4 @@
package main package model
import ( import (
"os" "os"
@ -52,9 +52,8 @@ func (h *changeHistory) append(size int64, t time.Time) {
h.changes = append(h.changes, c) h.changes = append(h.changes, c)
} }
func (s *suppressor) Suppress(name string, fi os.FileInfo) bool { func (s *suppressor) Suppress(name string, fi os.FileInfo) (cur, prev bool) {
sup, _ := s.suppress(name, fi.Size(), time.Now()) return s.suppress(name, fi.Size(), time.Now())
return sup
} }
func (s *suppressor) suppress(name string, size int64, t time.Time) (bool, bool) { func (s *suppressor) suppress(name string, size int64, t time.Time) (bool, bool) {

View File

@ -1,4 +1,4 @@
package main package model
import ( import (
"testing" "testing"

View File

@ -1,6 +1,6 @@
// +build !windows // +build !windows
package main package model
import ( import (
"fmt" "fmt"

View File

@ -1,6 +1,6 @@
// +build windows // +build windows
package main package model
import ( import (
"fmt" "fmt"

View File

@ -1,4 +1,4 @@
package main package model
import ( import (
"fmt" "fmt"

View File

@ -6,6 +6,8 @@ import (
"io" "io"
) )
const StandardBlockSize = 128 * 1024
type Block struct { type Block struct {
Offset int64 Offset int64
Size uint32 Size uint32

View File

@ -28,8 +28,6 @@ type Walker struct {
// Suppressed files will be returned with empty metadata and the Suppressed flag set. // Suppressed files will be returned with empty metadata and the Suppressed flag set.
// Requires CurrentFiler to be set. // Requires CurrentFiler to be set.
Suppressor Suppressor Suppressor Suppressor
suppressed map[string]bool // file name -> suppression status
} }
type TempNamer interface { type TempNamer interface {
@ -41,7 +39,7 @@ type TempNamer interface {
type Suppressor interface { type Suppressor interface {
// Supress returns true if the update to the named file should be ignored. // Supress returns true if the update to the named file should be ignored.
Suppress(name string, fi os.FileInfo) bool Suppress(name string, fi os.FileInfo) (bool, bool)
} }
type CurrentFiler interface { type CurrentFiler interface {
@ -52,8 +50,6 @@ type CurrentFiler interface {
// Walk returns the list of files found in the local repository by scanning the // Walk returns the list of files found in the local repository by scanning the
// file system. Files are blockwise hashed. // file system. Files are blockwise hashed.
func (w *Walker) Walk() (files []File, ignore map[string][]string, err error) { func (w *Walker) Walk() (files []File, ignore map[string][]string, err error) {
w.lazyInit()
if debug { if debug {
l.Debugln("Walk", w.Dir, w.BlockSize, w.IgnoreFile) l.Debugln("Walk", w.Dir, w.BlockSize, w.IgnoreFile)
} }
@ -86,12 +82,6 @@ func (w *Walker) CleanTempFiles() {
filepath.Walk(w.Dir, w.cleanTempFile) filepath.Walk(w.Dir, w.cleanTempFile)
} }
func (w *Walker) lazyInit() {
if w.suppressed == nil {
w.suppressed = make(map[string]bool)
}
}
func (w *Walker) loadIgnoreFiles(dir string, ign map[string][]string) filepath.WalkFunc { func (w *Walker) loadIgnoreFiles(dir string, ign map[string][]string) filepath.WalkFunc {
return func(p string, info os.FileInfo, err error) error { return func(p string, info os.FileInfo, err error) error {
if err != nil { if err != nil {
@ -203,21 +193,19 @@ func (w *Walker) walkAndHashFiles(res *[]File, ign map[string][]string) filepath
return nil return nil
} }
if w.Suppressor != nil && w.Suppressor.Suppress(rn, info) { if w.Suppressor != nil {
if !w.suppressed[rn] { if cur, prev := w.Suppressor.Suppress(rn, info); cur && !prev {
w.suppressed[rn] = true
l.Infof("Changes to %q are being temporarily suppressed because it changes too frequently.", p) l.Infof("Changes to %q are being temporarily suppressed because it changes too frequently.", p)
cf.Suppressed = true cf.Suppressed = true
cf.Version++ cf.Version++
}
if debug { if debug {
l.Debugln("suppressed:", cf) l.Debugln("suppressed:", cf)
} }
*res = append(*res, cf) *res = append(*res, cf)
return nil return nil
} else if w.suppressed[rn] { } else if prev && !cur {
l.Infof("Changes to %q are no longer suppressed.", p) l.Infof("Changes to %q are no longer suppressed.", p)
delete(w.suppressed, rn) }
} }
} }