From 704e0fa6b80f65f9d89fd490b18666198156122e Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sun, 29 Dec 2013 12:18:59 -0500 Subject: [PATCH] Improve puller somewhat --- main.go | 28 +++++---- model_puller.go | 147 ++++++++++++++++++++++++++++-------------------- 2 files changed, 104 insertions(+), 71 deletions(-) diff --git a/main.go b/main.go index 5a243f7d8..d211ae001 100644 --- a/main.go +++ b/main.go @@ -21,15 +21,14 @@ import ( ) type Options struct { - ConfDir string `short:"c" long:"cfg" description:"Configuration directory" default:"~/.syncthing" value-name:"DIR"` - Listen string `short:"l" long:"listen" description:"Listen address" default:":22000" value-name:"ADDR"` - ReadOnly bool `short:"r" long:"ro" description:"Repository is read only"` - Delete bool `short:"d" long:"delete" description:"Delete files from repo when deleted from cluster"` - NoSymlinks bool `long:"no-symlinks" description:"Don't follow first level symlinks in the repo"` - ScanInterval time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"` - ConnInterval time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"` - Discovery DiscoveryOptions `group:"Discovery Options"` - Debug DebugOptions `group:"Debugging Options"` + ConfDir string `short:"c" long:"cfg" description:"Configuration directory" default:"~/.syncthing" value-name:"DIR"` + Listen string `short:"l" long:"listen" description:"Listen address" default:":22000" value-name:"ADDR"` + ReadOnly bool `short:"r" long:"ro" description:"Repository is read only"` + Delete bool `short:"d" long:"delete" description:"Delete files deleted from cluster"` + NoSymlinks bool `long:"no-symlinks" description:"Don't follow first level symlinks in the repo"` + Discovery DiscoveryOptions `group:"Discovery Options"` + Advanced AdvancedOptions `group:"Advanced Options"` + Debug DebugOptions `group:"Debugging Options"` } type DebugOptions struct { @@ -46,6 +45,13 @@ type DiscoveryOptions struct { NoLocalDiscovery bool `short:"N" long:"no-local-announce" description:"Do not announce presence locally"` } +type AdvancedOptions struct { + RequestsInFlight int `long:"reqs-in-flight" description:"Parallell in flight requests per file" default:"8" value-name:"REQS"` + FilesInFlight int `long:"files-in-flight" description:"Parallell in flight file pulls" default:"4" value-name:"FILES"` + ScanInterval time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"` + ConnInterval time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"` +} + var opts Options var Version string @@ -162,7 +168,7 @@ func main() { // XXX: Should use some fsnotify mechanism. go func() { for { - time.Sleep(opts.ScanInterval) + time.Sleep(opts.Advanced.ScanInterval) updateLocalModel(m) } }() @@ -286,7 +292,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, } } - time.Sleep(opts.ConnInterval) + time.Sleep(opts.Advanced.ConnInterval) } } diff --git a/model_puller.go b/model_puller.go index 90a9415ae..af9ac1cc7 100644 --- a/model_puller.go +++ b/model_puller.go @@ -6,10 +6,10 @@ Locking ======= These methods are never called from the outside so don't follow the locking -policy in model.go. Instead, appropriate locks are acquired when needed and -held for as short a time as possible. +policy in model.go. TODO(jb): Refactor this into smaller and cleaner pieces. +TODO(jb): Increase performance by taking apparent peer bandwidth into account. */ @@ -25,8 +25,6 @@ import ( "github.com/calmh/syncthing/buffers" ) -const RemoteFetchers = 8 - func (m *Model) pullFile(name string) error { m.RLock() var localFile = m.local[name] @@ -81,58 +79,38 @@ func (m *Model) pullFile(name string) error { m.RLock() var nodeIDs = m.whoHas(name) m.RUnlock() - var remoteBlocksChan = make(chan Block) - go func() { - for _, block := range remote { - remoteBlocksChan <- block - } - close(remoteBlocksChan) - }() + var remoteBlocks = blockIterator{blocks: remote} + for i := 0; i < opts.Advanced.RequestsInFlight; i++ { + curNode := nodeIDs[i%len(nodeIDs)] + fetchDone.Add(1) - // XXX: This should be rewritten into something nicer that takes differing - // peer performance into account. - - for i := 0; i < RemoteFetchers; i++ { - for _, nodeID := range nodeIDs { - fetchDone.Add(1) - go func(nodeID string) { - for block := range remoteBlocksChan { - data, err := m.RequestGlobal(nodeID, name, block.Offset, block.Length, block.Hash) - if err != nil { - break - } - contentChan <- content{ - offset: int64(block.Offset), - data: data, - } + go func(nodeID string) { + for { + block, ok := remoteBlocks.Next() + if !ok { + break } - fetchDone.Done() - }(nodeID) - } + data, err := m.RequestGlobal(nodeID, name, block.Offset, block.Length, block.Hash) + if err != nil { + break + } + contentChan <- content{ + offset: int64(block.Offset), + data: data, + } + } + fetchDone.Done() + }(curNode) } fetchDone.Wait() close(contentChan) applyDone.Wait() - rf, err := os.Open(tmpFilename) + err = hashCheck(tmpFilename, globalFile.Blocks) if err != nil { return err } - defer rf.Close() - - writtenBlocks, err := Blocks(rf, BlockSize) - if err != nil { - return err - } - if len(writtenBlocks) != len(globalFile.Blocks) { - return fmt.Errorf("%s: incorrect number of blocks after sync", tmpFilename) - } - for i := range writtenBlocks { - if bytes.Compare(writtenBlocks[i].Hash, globalFile.Blocks[i].Hash) != 0 { - return fmt.Errorf("%s: hash mismatch after sync\n %v\n %v", tmpFilename, writtenBlocks[i], globalFile.Blocks[i]) - } - } err = os.Chtimes(tmpFilename, time.Unix(globalFile.Modified, 0), time.Unix(globalFile.Modified, 0)) if err != nil { @@ -148,23 +126,29 @@ func (m *Model) pullFile(name string) error { } func (m *Model) puller() { + for { - for { - var n string - var f File + time.Sleep(time.Second) - m.RLock() - for n = range m.need { - break // just pick first name - } - if len(n) != 0 { - f = m.global[n] - } - m.RUnlock() + var ns []string + m.RLock() + for n := range m.need { + ns = append(ns, n) + } + m.RUnlock() - if len(n) == 0 { - // we got nothing - break + if len(ns) == 0 { + continue + } + + var limiter = make(chan bool, opts.Advanced.FilesInFlight) + + for _, n := range ns { + limiter <- true + + f, ok := m.GlobalFile(n) + if !ok { + continue } var err error @@ -185,8 +169,9 @@ func (m *Model) puller() { } else { warnln(err) } + + <-limiter } - time.Sleep(time.Second) } } @@ -208,3 +193,45 @@ func applyContent(cc <-chan content, dst io.WriterAt) error { return nil } + +func hashCheck(name string, correct []Block) error { + rf, err := os.Open(name) + if err != nil { + return err + } + defer rf.Close() + + current, err := Blocks(rf, BlockSize) + if err != nil { + return err + } + if len(current) != len(correct) { + return fmt.Errorf("%s: incorrect number of blocks after sync", name) + } + for i := range current { + if bytes.Compare(current[i].Hash, correct[i].Hash) != 0 { + return fmt.Errorf("%s: hash mismatch after sync\n %v\n %v", name, current[i], correct[i]) + } + } + + return nil +} + +type blockIterator struct { + sync.Mutex + blocks []Block +} + +func (i *blockIterator) Next() (b Block, ok bool) { + i.Lock() + defer i.Unlock() + + if len(i.blocks) == 0 { + return + } + + b, i.blocks = i.blocks[0], i.blocks[1:] + ok = true + + return +}