From d9ffd359e24697e9f8ef2ccaa113d9d2301cc39e Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 20 Jan 2014 22:22:27 +0100 Subject: [PATCH] Tweak locking and integration test. --- integration/genfiles.go | 11 ++-- integration/test.sh | 10 ++-- main.go | 8 ++- model/filequeue.go | 23 +++----- model/filequeue_test.go | 34 ++++++------ model/model.go | 113 ++++++++++++++++++++++------------------ model/walk.go | 33 ++++++------ 7 files changed, 122 insertions(+), 110 deletions(-) diff --git a/integration/genfiles.go b/integration/genfiles.go index 763fb66af..a2abb77ed 100644 --- a/integration/genfiles.go +++ b/integration/genfiles.go @@ -18,17 +18,22 @@ func name() string { func main() { var files int - var maxsize int + var maxexp int flag.IntVar(&files, "files", 1000, "Number of files") - flag.IntVar(&maxsize, "maxsize", 1000, "Maximum file size (KB)") + flag.IntVar(&maxexp, "maxexp", 20, "Maximum file size (max = 2^n + 128*1024 B)") flag.Parse() for i := 0; i < files; i++ { n := name() p0 := path.Join(string(n[0]), n[0:2]) os.MkdirAll(p0, 0755) - s := mr.Intn(maxsize * 1024) + s := 1 << uint(mr.Intn(maxexp)) + a := 128 * 1024 + if a > s { + a = s + } + s += mr.Intn(a) b := make([]byte, s) rand.Reader.Read(b) p1 := path.Join(p0, n) diff --git a/integration/test.sh b/integration/test.sh index 265be27ee..707a613c6 100755 --- a/integration/test.sh +++ b/integration/test.sh @@ -2,6 +2,7 @@ rm -rf files-* conf-* md5-* +extraopts="" p=$(pwd) go build genfiles.go @@ -29,19 +30,22 @@ EOT mkdir files-$i pushd files-$i >/dev/null - ../genfiles -maxsize 780 -files 1500 + ../genfiles -maxexp 21 -files 4000 ../md5r > ../md5-$i popd >/dev/null done echo "Starting..." for i in 1 2 3 ; do - syncthing -c conf-$i --no-gui -l :2200$i & + sleep 1 + syncthing -c conf-$i --no-gui -l :2200$i $extraopts & done cat md5-* | sort > md5-tot while true ; do - sleep 10 + read + echo Verifying... + conv=0 for i in 1 2 3 ; do pushd files-$i >/dev/null diff --git a/main.go b/main.go index 2b5e12bf0..50a81deb2 100644 --- a/main.go +++ b/main.go @@ -119,6 +119,8 @@ func main() { myID = string(certId(cert.Certificate[0])) infoln("My ID:", myID) + log.SetPrefix("[" + myID[0:5] + "] ") + logger.SetPrefix("[" + myID[0:5] + "] ") if opts.Debug.Profiler != "" { go func() { @@ -223,7 +225,9 @@ func main() { go func() { for { time.Sleep(opts.Advanced.ScanInterval) - updateLocalModel(m) + if m.LocalAge() > opts.Advanced.ScanInterval.Seconds()/2 { + updateLocalModel(m) + } } }() @@ -248,7 +252,7 @@ func printStatsLoop(m *model.Model) { outbps := 8 * int(float64(stats.OutBytesTotal-lastStats[node].OutBytesTotal)/secs) if inbps+outbps > 0 { - infof("%s: %sb/s in, %sb/s out", node, MetricPrefix(inbps), MetricPrefix(outbps)) + infof("%s: %sb/s in, %sb/s out", node[0:5], MetricPrefix(inbps), MetricPrefix(outbps)) } lastStats[node] = stats diff --git a/model/filequeue.go b/model/filequeue.go index 8086ee6fe..cce4c6734 100644 --- a/model/filequeue.go +++ b/model/filequeue.go @@ -57,6 +57,12 @@ type queuedBlock struct { index int } +func NewFileQueue() *FileQueue { + return &FileQueue{ + availability: make(map[string][]string), + } +} + func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) { q.fmut.Lock() defer q.fmut.Unlock() @@ -212,24 +218,11 @@ func (q *FileQueue) deleteFile(n string) { } } -func (q *FileQueue) SetAvailable(file, node string) { +func (q *FileQueue) SetAvailable(file string, nodes []string) { q.amut.Lock() defer q.amut.Unlock() - if q.availability == nil { - q.availability = make(map[string][]string) - } - q.availability[file] = []string{node} -} - -func (q *FileQueue) AddAvailable(file, node string) { - q.amut.Lock() - defer q.amut.Unlock() - - if q.availability == nil { - q.availability = make(map[string][]string) - } - q.availability[file] = append(q.availability[file], node) + q.availability[file] = nodes } func (q *FileQueue) RemoveAvailable(toRemove string) { diff --git a/model/filequeue_test.go b/model/filequeue_test.go index e43351a52..7a274ce67 100644 --- a/model/filequeue_test.go +++ b/model/filequeue_test.go @@ -8,14 +8,14 @@ import ( ) func TestFileQueueAdd(t *testing.T) { - q := FileQueue{} + q := NewFileQueue() q.Add("foo", nil, nil) } func TestFileQueueAddSorting(t *testing.T) { - q := FileQueue{} - q.SetAvailable("zzz", "nodeID") - q.SetAvailable("aaa", "nodeID") + q := NewFileQueue() + q.SetAvailable("zzz", []string{"nodeID"}) + q.SetAvailable("aaa", []string{"nodeID"}) q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) q.Add("aaa", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) @@ -24,9 +24,9 @@ func TestFileQueueAddSorting(t *testing.T) { t.Errorf("Incorrectly sorted get: %+v", b) } - q = FileQueue{} - q.SetAvailable("zzz", "nodeID") - q.SetAvailable("aaa", "nodeID") + q = NewFileQueue() + q.SetAvailable("zzz", []string{"nodeID"}) + q.SetAvailable("aaa", []string{"nodeID"}) q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) b, _ = q.Get("nodeID") // Start on zzzz @@ -42,7 +42,7 @@ func TestFileQueueAddSorting(t *testing.T) { } func TestFileQueueLen(t *testing.T) { - q := FileQueue{} + q := NewFileQueue() q.Add("foo", nil, nil) q.Add("bar", nil, nil) @@ -52,9 +52,9 @@ func TestFileQueueLen(t *testing.T) { } func TestFileQueueGet(t *testing.T) { - q := FileQueue{} - q.SetAvailable("foo", "nodeID") - q.SetAvailable("bar", "nodeID") + q := NewFileQueue() + q.SetAvailable("foo", []string{"nodeID"}) + q.SetAvailable("bar", []string{"nodeID"}) q.Add("foo", []Block{ {Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")}, @@ -177,11 +177,9 @@ func TestFileQueueDone(t *testing.T) { */ func TestFileQueueGetNodeIDs(t *testing.T) { - q := FileQueue{} - q.SetAvailable("a-foo", "nodeID") - q.AddAvailable("a-foo", "a") - q.SetAvailable("b-bar", "nodeID") - q.AddAvailable("b-bar", "b") + q := NewFileQueue() + q.SetAvailable("a-foo", []string{"nodeID", "a"}) + q.SetAvailable("b-bar", []string{"nodeID", "b"}) q.Add("a-foo", []Block{ {Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")}, @@ -254,9 +252,9 @@ func TestFileQueueThreadHandling(t *testing.T) { total += i } - q := FileQueue{} + q := NewFileQueue() q.Add("foo", blocks, nil) - q.SetAvailable("foo", "nodeID") + q.SetAvailable("foo", []string{"nodeID"}) var start = make(chan bool) var gotTot uint32 diff --git a/model/model.go b/model/model.go index ea83e6b5d..7fafd53f5 100644 --- a/model/model.go +++ b/model/model.go @@ -1,16 +1,5 @@ package model -/* - -Locking -======= - -The model has read and write locks. These must be acquired as appropriate by -public methods. To prevent deadlock situations, private methods should never -acquire locks, but document what locks they require. - -*/ - import ( "crypto/sha1" "errors" @@ -40,7 +29,9 @@ type Model struct { rawConn map[string]io.Closer pmut sync.RWMutex // protects protoConn and rawConn - fq FileQueue // queue for files to fetch + // Queue for files to fetch. fq can call back into the model, so we must ensure + // to hold no locks when calling methods on fq. + fq *FileQueue dq chan File // queue for files to delete updatedLocal int64 // timestamp of last update to local @@ -100,6 +91,7 @@ func NewModel(dir string) *Model { trace: make(map[string]bool), fileLastChanged: make(map[string]time.Time), fileWasSuppressed: make(map[string]int), + fq: NewFileQueue(), dq: make(chan File), } @@ -157,6 +149,13 @@ func (m *Model) Generation() int64 { return m.updatedLocal + m.updateGlobal } +func (m *Model) LocalAge() float64 { + m.umut.RLock() + defer m.umut.RUnlock() + + return time.Since(time.Unix(m.updatedLocal, 0)).Seconds() +} + type ConnectionInfo struct { protocol.Statistics Address string @@ -241,9 +240,11 @@ func (m *Model) InSyncSize() (files, bytes int) { // NeedFiles returns the list of currently needed files and the total size. func (m *Model) NeedFiles() (files []File, bytes int) { + qf := m.fq.QueuedFiles() + m.gmut.RLock() - for _, n := range m.fq.QueuedFiles() { + for _, n := range qf { f := m.global[n] files = append(files, f) bytes += f.Size() @@ -320,9 +321,11 @@ func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) { repo[f.Name] = fileFromFileInfo(f) } -// Close removes the peer from the model and closes the underlyign connection if possible. +// Close removes the peer from the model and closes the underlying connection if possible. // Implements the protocol.Model interface. func (m *Model) Close(node string, err error) { + m.fq.RemoveAvailable(node) + m.pmut.Lock() m.rmut.Lock() @@ -334,7 +337,6 @@ func (m *Model) Close(node string, err error) { delete(m.remote, node) delete(m.protoConn, node) delete(m.rawConn, node) - m.fq.RemoveAvailable(node) m.rmut.Unlock() m.pmut.Unlock() @@ -471,37 +473,42 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { protoConn.Index(idx) }() - if m.rwRunning { - for i := 0; i < m.parallellRequests; i++ { - i := i - go func() { - if m.trace["pull"] { - log.Println("PULL: Starting", nodeID, i) - } - for { - m.pmut.RLock() - if _, ok := m.protoConn[nodeID]; !ok { - if m.trace["pull"] { - log.Println("PULL: Exiting", nodeID, i) - } - m.pmut.RUnlock() - return + m.initmut.Lock() + rw := m.rwRunning + m.initmut.Unlock() + if !rw { + return + } + + for i := 0; i < m.parallellRequests; i++ { + i := i + go func() { + if m.trace["pull"] { + log.Println("PULL: Starting", nodeID, i) + } + for { + m.pmut.RLock() + if _, ok := m.protoConn[nodeID]; !ok { + if m.trace["pull"] { + log.Println("PULL: Exiting", nodeID, i) } m.pmut.RUnlock() - - qb, ok := m.fq.Get(nodeID) - if ok { - if m.trace["pull"] { - log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset) - } - data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash) - m.fq.Done(qb.name, qb.block.Offset, data) - } else { - time.Sleep(1 * time.Second) - } + return } - }() - } + m.pmut.RUnlock() + + qb, ok := m.fq.Get(nodeID) + if ok { + if m.trace["pull"] { + log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset) + } + data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash) + m.fq.Done(qb.name, qb.block.Offset, data) + } else { + time.Sleep(1 * time.Second) + } + } + }() } } @@ -606,7 +613,6 @@ func (m *Model) broadcastIndexLoop() { } // markDeletedLocals sets the deleted flag on files that have gone missing locally. -// Must be called with the write lock held. func (m *Model) markDeletedLocals(newLocal map[string]File) bool { // For every file in the existing local table, check if they are also // present in the new local table. If they are not, check that we already @@ -661,7 +667,6 @@ func (m *Model) updateLocal(f File) { } } -// Must be called with the write lock held. func (m *Model) recomputeGlobal() { var newGlobal = make(map[string]File) @@ -671,23 +676,29 @@ func (m *Model) recomputeGlobal() { } m.lmut.RUnlock() + var available = make(map[string][]string) + m.rmut.RLock() var highestMod int64 for nodeID, fs := range m.remote { for n, nf := range fs { if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) { newGlobal[n] = nf - m.fq.SetAvailable(n, nodeID) + available[n] = []string{nodeID} if nf.Modified > highestMod { highestMod = nf.Modified } } else if lf.Equals(nf) { - m.fq.AddAvailable(n, nodeID) + available[n] = append(available[n], nodeID) } } } m.rmut.RUnlock() + for f, ns := range available { + m.fq.SetAvailable(f, ns) + } + // Figure out if anything actually changed m.gmut.RLock() @@ -727,10 +738,6 @@ func (m *Model) recomputeNeed() { m.gmut.RLock() for n, gf := range m.global { - if m.fq.Queued(n) { - continue - } - m.lmut.RLock() lf, ok := m.local[n] m.lmut.RUnlock() @@ -771,7 +778,9 @@ func (m *Model) recomputeNeed() { m.gmut.RUnlock() for _, ao := range toAdd { - m.fq.Add(ao.n, ao.remote, ao.fm) + if !m.fq.Queued(ao.n) { + m.fq.Add(ao.n, ao.remote, ao.fm) + } } for _, gf := range toDelete { m.dq <- gf diff --git a/model/walk.go b/model/walk.go index 91dfa8c62..1898c8752 100644 --- a/model/walk.go +++ b/model/walk.go @@ -85,6 +85,9 @@ func (m *Model) loadIgnoreFiles(ign map[string][]string) filepath.WalkFunc { func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.WalkFunc { return func(p string, info os.FileInfo, err error) error { if err != nil { + if m.trace["file"] { + log.Printf("FILE: %q: %v", p, err) + } return nil } @@ -110,22 +113,18 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath. } if info.Mode()&os.ModeType == 0 { - fi, err := os.Stat(p) - if err != nil { - return nil - } - modified := fi.ModTime().Unix() + modified := info.ModTime().Unix() m.lmut.RLock() - hf, ok := m.local[rn] + lf, ok := m.local[rn] m.lmut.RUnlock() - if ok && hf.Modified == modified { - if nf := uint32(info.Mode()); nf != hf.Flags { - hf.Flags = nf - hf.Version++ + if ok && lf.Modified == modified { + if nf := uint32(info.Mode()); nf != lf.Flags { + lf.Flags = nf + lf.Version++ } - *res = append(*res, hf) + *res = append(*res, lf) } else { if m.shouldSuppressChange(rn) { if m.trace["file"] { @@ -133,9 +132,9 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath. } if ok { - hf.Flags = protocol.FlagInvalid - hf.Version++ - *res = append(*res, hf) + lf.Flags = protocol.FlagInvalid + lf.Version++ + *res = append(*res, lf) } return nil } @@ -195,9 +194,9 @@ func (m *Model) Walk(followSymlinks bool) (files []File, ignore map[string][]str return } - for _, fi := range fis { - if fi.Mode()&os.ModeSymlink != 0 { - dir := path.Join(m.dir, fi.Name()) + "/" + for _, info := range fis { + if info.Mode()&os.ModeSymlink != 0 { + dir := path.Join(m.dir, info.Name()) + "/" filepath.Walk(dir, m.loadIgnoreFiles(ignore)) filepath.Walk(dir, hashFiles) }