From 99427d649e2b9cfc2c3e64150a0a901c7cd5ed87 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sat, 27 Sep 2014 14:44:15 +0200 Subject: [PATCH] Complete rewrite of the puller (fixes #638, fixes #715, fixes #701) --- check-contrib.sh | 2 + cmd/syncthing/gui.go | 14 +- cmd/syncthing/main.go | 10 +- internal/config/config.go | 1 - internal/config/config_test.go | 2 - internal/model/model.go | 118 +- internal/model/model_test.go | 19 - internal/model/nodeactivity.go | 51 + internal/model/nodeactivity_test.go | 56 + internal/model/puller.go | 1263 +++++++++------------- internal/model/sharedpullerstate.go | 183 ++++ internal/model/sharedpullerstate_test.go | 52 + internal/scanner/blocks.go | 30 + internal/scanner/walk_test.go | 44 + test/http.go | 1 - test/test-delupd.sh | 8 +- 16 files changed, 1030 insertions(+), 824 deletions(-) create mode 100644 internal/model/nodeactivity.go create mode 100644 internal/model/nodeactivity_test.go create mode 100644 internal/model/sharedpullerstate.go create mode 100644 internal/model/sharedpullerstate_test.go diff --git a/check-contrib.sh b/check-contrib.sh index 094bbf9dd..f33e7b7a7 100755 --- a/check-contrib.sh +++ b/check-contrib.sh @@ -8,6 +8,8 @@ missing-contribs() { no-docs-typos() { # Commits that are known to not change code + grep -v 63bd0136fb40a91efaa279cb4b4159d82e8e6904 |\ + grep -v 4e2feb6fbc791bb8a2daf0ab8efb10775d66343e |\ grep -v f2459ef3319b2f060dbcdacd0c35a1788a94b8bd |\ grep -v b61f418bf2d1f7d5a9d7088a20a2a448e5e66801 |\ grep -v f0621207e3953711f9ab86d99724f1d0faac45b1 |\ diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index 98c24d266..74601aca0 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -87,7 +87,6 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro getRestMux.HandleFunc("/rest/ignores", withModel(m, restGetIgnores)) getRestMux.HandleFunc("/rest/lang", restGetLang) getRestMux.HandleFunc("/rest/model", withModel(m, restGetModel)) - getRestMux.HandleFunc("/rest/model/version", withModel(m, restGetModelVersion)) getRestMux.HandleFunc("/rest/need", withModel(m, restGetNeed)) getRestMux.HandleFunc("/rest/nodeid", restGetNodeID) getRestMux.HandleFunc("/rest/report", withModel(m, restGetReport)) @@ -238,17 +237,6 @@ func restGetCompletion(m *model.Model, w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(res) } -func restGetModelVersion(m *model.Model, w http.ResponseWriter, r *http.Request) { - var qs = r.URL.Query() - var repo = qs.Get("repo") - var res = make(map[string]interface{}) - - res["version"] = m.LocalVersion(repo) - - w.Header().Set("Content-Type", "application/json; charset=utf-8") - json.NewEncoder(w).Encode(res) -} - func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) { var qs = r.URL.Query() var repo = qs.Get("repo") @@ -273,7 +261,7 @@ func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) { res["inSyncFiles"], res["inSyncBytes"] = globalFiles-needFiles, globalBytes-needBytes res["state"], res["stateChanged"] = m.State(repo) - res["version"] = m.LocalVersion(repo) + res["version"] = m.CurrentLocalVersion(repo) + m.RemoteLocalVersion(repo) w.Header().Set("Content-Type", "application/json; charset=utf-8") json.NewEncoder(w).Encode(res) diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 49dccab2a..0cf5f48b2 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -442,7 +442,7 @@ nextRepo: m.AddRepo(repo) fi, err := os.Stat(repo.Directory) - if m.LocalVersion(repo.ID) > 0 { + if m.CurrentLocalVersion(repo.ID) > 0 { // Safety check. If the cached index contains files but the // repository doesn't exist, we have a problem. We would assume // that all files have been deleted which might not be the case, @@ -453,8 +453,8 @@ nextRepo: continue nextRepo } } else if os.IsNotExist(err) { - // If we don't have ny files in the index, and the directory does - // exist, try creating it. + // If we don't have any files in the index, and the directory + // doesn't exist, try creating it. err = os.MkdirAll(repo.Directory, 0700) } @@ -582,7 +582,7 @@ nextRepo: m.StartRepoRO(repo.ID) } else { l.Okf("Ready to synchronize %s (read-write)", repo.ID) - m.StartRepoRW(repo.ID, cfg.Options.ParallelRequests) + m.StartRepoRW(repo.ID) } } @@ -1159,7 +1159,7 @@ func standbyMonitor() { for { time.Sleep(10 * time.Second) if time.Since(now) > 2*time.Minute { - l.Infoln("Paused state detected, possibly woke up from standby. Restarting in", restartDelay) + l.Infof("Paused state detected, possibly woke up from standby. Restarting in %v.", restartDelay) // We most likely just woke from standby. If we restart // immediately chances are we won't have networking ready. Give diff --git a/internal/config/config.go b/internal/config/config.go index 72080f131..db011251e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -118,7 +118,6 @@ type OptionsConfiguration struct { LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true"` LocalAnnPort int `xml:"localAnnouncePort" default:"21025"` LocalAnnMCAddr string `xml:"localAnnounceMCAddr" default:"[ff32::5222]:21026"` - ParallelRequests int `xml:"parallelRequests" default:"16"` MaxSendKbps int `xml:"maxSendKbps"` MaxRecvKbps int `xml:"maxRecvKbps"` ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60"` diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 3222da484..c7e99b962 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -29,7 +29,6 @@ func TestDefaultValues(t *testing.T) { LocalAnnEnabled: true, LocalAnnPort: 21025, LocalAnnMCAddr: "[ff32::5222]:21026", - ParallelRequests: 16, MaxSendKbps: 0, MaxRecvKbps: 0, ReconnectIntervalS: 60, @@ -121,7 +120,6 @@ func TestOverriddenValues(t *testing.T) { LocalAnnEnabled: false, LocalAnnPort: 42123, LocalAnnMCAddr: "quux:3232", - ParallelRequests: 32, MaxSendKbps: 1234, MaxRecvKbps: 2341, ReconnectIntervalS: 6000, diff --git a/internal/model/model.go b/internal/model/model.go index e588059cf..aaa110431 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -28,6 +28,7 @@ import ( "github.com/syncthing/syncthing/internal/protocol" "github.com/syncthing/syncthing/internal/scanner" "github.com/syncthing/syncthing/internal/stats" + "github.com/syncthing/syncthing/internal/versioner" "github.com/syndtr/goleveldb/leveldb" ) @@ -138,22 +139,54 @@ func NewModel(indexDir string, cfg *config.Configuration, nodeName, clientName, // StartRW starts read/write processing on the current model. When in // read/write mode the model will attempt to keep in sync with the cluster by // pulling needed files from peer nodes. -func (m *Model) StartRepoRW(repo string, threads int) { - m.rmut.RLock() - defer m.rmut.RUnlock() +func (m *Model) StartRepoRW(repo string) { + m.rmut.Lock() + cfg, ok := m.repoCfgs[repo] + m.rmut.Unlock() - if cfg, ok := m.repoCfgs[repo]; !ok { - panic("cannot start without repo") - } else { - newPuller(cfg, m, threads, m.cfg) + if !ok { + panic("cannot start nonexistent repo " + repo) } + + p := Puller{ + repo: repo, + dir: cfg.Directory, + scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second, + model: m, + } + + if len(cfg.Versioning.Type) > 0 { + factory, ok := versioner.Factories[cfg.Versioning.Type] + if !ok { + l.Fatalf("Requested versioning type %q that does not exist", cfg.Versioning.Type) + } + p.versioner = factory(repo, cfg.Directory, cfg.Versioning.Params) + } + + go p.Serve() } // StartRO starts read only processing on the current model. When in // read only mode the model will announce files to the cluster but not // pull in any external changes. func (m *Model) StartRepoRO(repo string) { - m.StartRepoRW(repo, 0) // zero threads => read only + intv := time.Duration(m.repoCfgs[repo].RescanIntervalS) * time.Second + go func() { + for { + time.Sleep(intv) + + if debug { + l.Debugln(m, "rescan", repo) + } + + m.setState(repo, RepoScanning) + if err := m.ScanRepo(repo); err != nil { + invalidateRepo(m.cfg, repo, err) + return + } + m.setState(repo, RepoIdle) + } + }() } type ConnectionInfo struct { @@ -240,7 +273,7 @@ func (m *Model) Completion(node protocol.NodeID, repo string) float64 { res := 100 * (1 - float64(need)/float64(tot)) if debug { - l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot) + l.Debugf("%v Completion(%s, %q): %f (%d / %d)", m, node, repo, res, need, tot) } return res @@ -316,7 +349,7 @@ func (m *Model) NeedSize(repo string) (files int, bytes int64) { }) } if debug { - l.Debugf("NeedSize(%q): %d %d", repo, files, bytes) + l.Debugf("%v NeedSize(%q): %d %d", m, repo, files, bytes) } return } @@ -389,7 +422,7 @@ func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInf // Implements the protocol.Model interface. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) { if debug { - l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs)) + l.Debugf("%v IDXUP(in): %s / %q: %d files", m, nodeID, repo, len(fs)) } if !m.repoSharedWith(repo, nodeID) { @@ -475,7 +508,7 @@ func (m *Model) ClusterConfig(nodeID protocol.NodeID, cm protocol.ClusterConfigM var id protocol.NodeID copy(id[:], node.ID) - if m.cfg.GetNodeConfiguration(id)==nil { + if m.cfg.GetNodeConfiguration(id) == nil { // The node is currently unknown. Add it to the config. l.Infof("Adding node %v to config (vouched for by introducer %v)", id, nodeID) @@ -574,20 +607,20 @@ func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, lf := r.Get(protocol.LocalNodeID, name) if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) { if debug { - l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf) + l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, nodeID, repo, name, offset, size, lf) } return nil, ErrInvalid } if offset > lf.Size() { if debug { - l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size) + l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, nodeID, name, offset, size) } return nil, ErrNoSuchFile } if debug && nodeID != protocol.LocalNodeID { - l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size) + l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, nodeID, repo, name, offset, size) } m.rmut.RLock() fn := filepath.Join(m.repoCfgs[repo].Directory, name) @@ -768,15 +801,9 @@ func sendIndexes(conn protocol.Connection, repo string, fs *files.Set, ignores i var err error if debug { - l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo) + l.Debugf("sendIndexes for %s-%s/%q starting", nodeID, name, repo) } - defer func() { - if debug { - l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err) - } - }() - minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs, ignores) for err == nil { @@ -787,6 +814,10 @@ func sendIndexes(conn protocol.Connection, repo string, fs *files.Set, ignores i minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs, ignores) } + + if debug { + l.Debugf("sendIndexes for %s-%s/%q exiting: %v", nodeID, name, repo, err) + } } func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) (uint64, error) { @@ -877,7 +908,7 @@ func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset } if debug { - l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash) + l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x", m, nodeID, repo, name, offset, size, hash) } return nc.Request(repo, name, offset, size) @@ -1175,10 +1206,10 @@ func (m *Model) Override(repo string) { m.setState(repo, RepoIdle) } -// Version returns the change version for the given repository. This is -// guaranteed to increment if the contents of the local or global repository -// has changed. -func (m *Model) LocalVersion(repo string) uint64 { +// CurrentLocalVersion returns the change version for the given repository. +// This is guaranteed to increment if the contents of the local repository has +// changed. +func (m *Model) CurrentLocalVersion(repo string) uint64 { m.rmut.Lock() defer m.rmut.Unlock() @@ -1187,10 +1218,41 @@ func (m *Model) LocalVersion(repo string) uint64 { panic("bug: LocalVersion called for nonexistent repo " + repo) } - ver := fs.LocalVersion(protocol.LocalNodeID) + return fs.LocalVersion(protocol.LocalNodeID) +} + +// RemoteLocalVersion returns the change version for the given repository, as +// sent by remote peers. This is guaranteed to increment if the contents of +// the remote or global repository has changed. +func (m *Model) RemoteLocalVersion(repo string) uint64 { + m.rmut.Lock() + defer m.rmut.Unlock() + + fs, ok := m.repoFiles[repo] + if !ok { + panic("bug: LocalVersion called for nonexistent repo " + repo) + } + + var ver uint64 for _, n := range m.repoNodes[repo] { ver += fs.LocalVersion(n) } return ver } + +func (m *Model) availability(repo string, file string) []protocol.NodeID { + m.rmut.Lock() + defer m.rmut.Unlock() + + fs, ok := m.repoFiles[repo] + if !ok { + return nil + } + + return fs.Availability(file) +} + +func (m *Model) String() string { + return fmt.Sprintf("model@%p", m) +} diff --git a/internal/model/model_test.go b/internal/model/model_test.go index ed36e5446..23393be5d 100644 --- a/internal/model/model_test.go +++ b/internal/model/model_test.go @@ -241,25 +241,6 @@ func BenchmarkRequest(b *testing.B) { } } -func TestActivityMap(t *testing.T) { - isValid := func(protocol.NodeID) bool { - return true - } - m := make(activityMap) - if node := m.leastBusyNode([]protocol.NodeID{node1}, isValid); node != node1 { - t.Errorf("Incorrect least busy node %q", node) - } - if node := m.leastBusyNode([]protocol.NodeID{node2}, isValid); node != node2 { - t.Errorf("Incorrect least busy node %q", node) - } - if node := m.leastBusyNode([]protocol.NodeID{node1, node2}, isValid); node != node1 { - t.Errorf("Incorrect least busy node %q", node) - } - if node := m.leastBusyNode([]protocol.NodeID{node1, node2}, isValid); node != node2 { - t.Errorf("Incorrect least busy node %q", node) - } -} - func TestNodeRename(t *testing.T) { ccm := protocol.ClusterConfigMessage{ ClientName: "syncthing", diff --git a/internal/model/nodeactivity.go b/internal/model/nodeactivity.go new file mode 100644 index 000000000..4cabcdfa7 --- /dev/null +++ b/internal/model/nodeactivity.go @@ -0,0 +1,51 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package model + +import ( + "sync" + + "github.com/syncthing/syncthing/internal/protocol" +) + +// nodeActivity tracks the number of outstanding requests per node and can +// answer which node is least busy. It is safe for use from multiple +// goroutines. +type nodeActivity struct { + act map[protocol.NodeID]int + mut sync.Mutex +} + +func newNodeActivity() *nodeActivity { + return &nodeActivity{ + act: make(map[protocol.NodeID]int), + } +} + +func (m nodeActivity) leastBusy(availability []protocol.NodeID) protocol.NodeID { + m.mut.Lock() + var low int = 2<<30 - 1 + var selected protocol.NodeID + for _, node := range availability { + if usage := m.act[node]; usage < low { + low = usage + selected = node + } + } + m.mut.Unlock() + return selected +} + +func (m nodeActivity) using(node protocol.NodeID) { + m.mut.Lock() + defer m.mut.Unlock() + m.act[node]++ +} + +func (m nodeActivity) done(node protocol.NodeID) { + m.mut.Lock() + defer m.mut.Unlock() + m.act[node]-- +} diff --git a/internal/model/nodeactivity_test.go b/internal/model/nodeactivity_test.go new file mode 100644 index 000000000..5f7e91d1c --- /dev/null +++ b/internal/model/nodeactivity_test.go @@ -0,0 +1,56 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package model + +import ( + "testing" + + "github.com/syncthing/syncthing/internal/protocol" +) + +func TestNodeActivity(t *testing.T) { + n0 := protocol.NodeID{1, 2, 3, 4} + n1 := protocol.NodeID{5, 6, 7, 8} + n2 := protocol.NodeID{9, 10, 11, 12} + nodes := []protocol.NodeID{n0, n1, n2} + na := newNodeActivity() + + if lb := na.leastBusy(nodes); lb != n0 { + t.Errorf("Least busy node should be n0 (%v) not %v", n0, lb) + } + if lb := na.leastBusy(nodes); lb != n0 { + t.Errorf("Least busy node should still be n0 (%v) not %v", n0, lb) + } + + na.using(na.leastBusy(nodes)) + if lb := na.leastBusy(nodes); lb != n1 { + t.Errorf("Least busy node should be n1 (%v) not %v", n1, lb) + } + + na.using(na.leastBusy(nodes)) + if lb := na.leastBusy(nodes); lb != n2 { + t.Errorf("Least busy node should be n2 (%v) not %v", n2, lb) + } + + na.using(na.leastBusy(nodes)) + if lb := na.leastBusy(nodes); lb != n0 { + t.Errorf("Least busy node should be n0 (%v) not %v", n0, lb) + } + + na.done(n1) + if lb := na.leastBusy(nodes); lb != n1 { + t.Errorf("Least busy node should be n1 (%v) not %v", n1, lb) + } + + na.done(n2) + if lb := na.leastBusy(nodes); lb != n1 { + t.Errorf("Least busy node should still be n1 (%v) not %v", n1, lb) + } + + na.done(n0) + if lb := na.leastBusy(nodes); lb != n0 { + t.Errorf("Least busy node should be n0 (%v) not %v", n0, lb) + } +} diff --git a/internal/model/puller.go b/internal/model/puller.go index 9f1ecf681..d1701246f 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -2,34 +2,14 @@ // All rights reserved. Use of this source code is governed by an MIT-style // license that can be found in the LICENSE file. -/* -__ __ _ _ -\ \ / /_ _ _ __ _ __ (_)_ __ __ _| | - \ \ /\ / / _` | '__| '_ \| | '_ \ / _` | | - \ V V / (_| | | | | | | | | | | (_| |_| - \_/\_/ \__,_|_| |_| |_|_|_| |_|\__, (_) - |___/ - -The code in this file is a piece of crap. Don't base anything on it. -Refactorin ongoing in new-puller branch. - -__ __ _ _ -\ \ / /_ _ _ __ _ __ (_)_ __ __ _| | - \ \ /\ / / _` | '__| '_ \| | '_ \ / _` | | - \ V V / (_| | | | | | | | | | | (_| |_| - \_/\_/ \__,_|_| |_| |_|_|_| |_|\__, (_) - |___/ -*/ - package model import ( - "bytes" "errors" "fmt" - "math/rand" "os" "path/filepath" + "sync" "time" "github.com/syncthing/syncthing/internal/config" @@ -40,239 +20,551 @@ import ( "github.com/syncthing/syncthing/internal/versioner" ) -type requestResult struct { - node protocol.NodeID - file protocol.FileInfo - filepath string // full filepath name - offset int64 - data []byte - err error +// TODO: Stop on errors + +const ( + copiersPerRepo = 1 + pullersPerRepo = 16 + finishersPerRepo = 2 + pauseIntv = 60 * time.Second + nextPullIntv = 10 * time.Second + checkPullIntv = 1 * time.Second +) + +// A pullBlockState is passed to the puller routine for each block that needs +// to be fetched. +type pullBlockState struct { + *sharedPullerState + block protocol.BlockInfo } -type openFile struct { - filepath string // full filepath name - temp string // temporary filename - availability []protocol.NodeID - file *os.File - err error // error when opening or writing to file, all following operations are cancelled - outstanding int // number of requests we still have outstanding - done bool // we have sent all requests for this file +// A copyBlocksState is passed to copy routine if the file has blocks to be +// copied from the original. +type copyBlocksState struct { + *sharedPullerState + blocks []protocol.BlockInfo } -type activityMap map[protocol.NodeID]int +var ( + activity = newNodeActivity() + errNoNode = errors.New("no available source node") +) -// Queue about this many blocks each puller iteration. More blocks means -// longer iterations and better efficiency; fewer blocks reduce memory -// consumption. 1000 blocks ~= 1000 * 128 KiB ~= 125 MiB of data. -const pullIterationBlocks = 1000 - -func (m activityMap) leastBusyNode(availability []protocol.NodeID, isValid func(protocol.NodeID) bool) protocol.NodeID { - var low int = 2<<30 - 1 - var selected protocol.NodeID - for _, node := range availability { - usage := m[node] - if usage < low && isValid(node) { - low = usage - selected = node - } - } - m[selected]++ - return selected +type Puller struct { + repo string + dir string + scanIntv time.Duration + model *Model + stop chan struct{} + versioner versioner.Versioner } -func (m activityMap) decrease(node protocol.NodeID) { - m[node]-- -} - -var errNoNode = errors.New("no available source node") - -type puller struct { - cfg *config.Configuration - repoCfg config.RepositoryConfiguration - bq blockQueue - slots int - model *Model - oustandingPerNode activityMap - openFiles map[string]openFile - requestSlots chan bool - blocks chan bqBlock - requestResults chan requestResult - versioner versioner.Versioner - errors int -} - -func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller { - p := &puller{ - cfg: cfg, - repoCfg: repoCfg, - slots: slots, - model: model, - oustandingPerNode: make(activityMap), - openFiles: make(map[string]openFile), - requestSlots: make(chan bool, slots), - blocks: make(chan bqBlock), - requestResults: make(chan requestResult), +// Serve will run scans and pulls. It will return when Stop()ed or on a +// critical error. +func (p *Puller) Serve() { + if debug { + l.Debugln(p, "starting") + defer l.Debugln(p, "exiting") } - if len(repoCfg.Versioning.Type) > 0 { - factory, ok := versioner.Factories[repoCfg.Versioning.Type] - if !ok { - l.Fatalf("Requested versioning type %q that does not exist", repoCfg.Versioning.Type) - } - p.versioner = factory(repoCfg.ID, repoCfg.Directory, repoCfg.Versioning.Params) - } + p.stop = make(chan struct{}) - if slots > 0 { - // Read/write - if debug { - l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots) - } - go p.run() - } else { - // Read only - if debug { - l.Debugf("starting puller; repo %q dir %q (read only)", repoCfg.ID, repoCfg.Directory) - } - go p.runRO() - } - return p -} + pullTimer := time.NewTimer(checkPullIntv) + scanTimer := time.NewTimer(p.scanIntv) + + defer func() { + pullTimer.Stop() + scanTimer.Stop() + // TODO: Should there be an actual RepoStopped state? + p.model.setState(p.repo, RepoIdle) + }() -func (p *puller) run() { - changed := true - scanintv := time.Duration(p.repoCfg.RescanIntervalS) * time.Second - lastscan := time.Now() var prevVer uint64 - var queued int - // Load up the request slots - for i := 0; i < cap(p.requestSlots); i++ { - p.requestSlots <- true + // Clean out old temporaries before we start pulling + p.clean() + +loop: + for { + select { + case <-p.stop: + return + + // TODO: We could easily add a channel here for notifications from + // Index(), so that we immediately start a pull when new index + // information is available. Before that though, I'd like to build a + // repeatable benchmark of how long it takes to sync a change from + // node A to node B, so we have something to work against. + case <-pullTimer.C: + // RemoteLocalVersion() is a fast call, doesn't touch the database. + curVer := p.model.RemoteLocalVersion(p.repo) + if curVer == prevVer { + pullTimer.Reset(checkPullIntv) + continue + } + + if debug { + l.Debugln(p, "pulling", prevVer, curVer) + } + p.model.setState(p.repo, RepoSyncing) + tries := 0 + for { + tries++ + changed := p.pullerIteration(copiersPerRepo, pullersPerRepo, finishersPerRepo) + if debug { + l.Debugln(p, "changed", changed) + } + + if changed == 0 { + // No files were changed by the puller, so we are in + // sync. Remember the local version number and + // schedule a resync a little bit into the future. + prevVer = curVer + pullTimer.Reset(nextPullIntv) + break + } + + if tries > 10 { + // We've tried a bunch of times to get in sync, but + // we're not making it. Probably there are write + // errors preventing us. Flag this with a warning and + // wait a bit longer before retrying. + l.Warnf("Repo %q isn't making progress - check logs for possible root cause. Pausing puller for %v.", p.repo, pauseIntv) + pullTimer.Reset(pauseIntv) + break + } + } + p.model.setState(p.repo, RepoIdle) + + // The reason for running the scanner from within the puller is that + // this is the easiest way to make sure we are not doing both at the + // same time. + case <-scanTimer.C: + if debug { + l.Debugln(p, "rescan") + } + p.model.setState(p.repo, RepoScanning) + if err := p.model.ScanRepo(p.repo); err != nil { + invalidateRepo(p.model.cfg, p.repo, err) + break loop + } + p.model.setState(p.repo, RepoIdle) + scanTimer.Reset(p.scanIntv) + } + } +} + +func (p *Puller) Stop() { + close(p.stop) +} + +func (p *Puller) String() string { + return fmt.Sprintf("puller/%s@%p", p.repo, p) +} + +// pullerIteration runs a single puller iteration for the given repo and +// returns the number items that should have been synced (even those that +// might have failed). One puller iteration handles all files currently +// flagged as needed in the repo. The specified number of copier, puller and +// finisher routines are used. It's seldom efficient to use more than one +// copier routine, while multiple pullers are essential and multiple finishers +// may be useful (they are primarily CPU bound due to hashing). +func (p *Puller) pullerIteration(ncopiers, npullers, nfinishers int) int { + pullChan := make(chan pullBlockState) + copyChan := make(chan copyBlocksState) + finisherChan := make(chan *sharedPullerState) + + var wg sync.WaitGroup + var doneWg sync.WaitGroup + + for i := 0; i < ncopiers; i++ { + wg.Add(1) + go func() { + // copierRoutine finishes when copyChan is closed + p.copierRoutine(copyChan, finisherChan) + wg.Done() + }() } - for { - if sc, sl := cap(p.requestSlots), len(p.requestSlots); sl != sc { - panic(fmt.Sprintf("Incorrect number of slots; %d != %d", sl, sc)) + for i := 0; i < npullers; i++ { + wg.Add(1) + go func() { + // pullerRoutine finishes when pullChan is closed + p.pullerRoutine(pullChan, finisherChan) + wg.Done() + }() + } + + for i := 0; i < nfinishers; i++ { + doneWg.Add(1) + // finisherRoutine finishes when finisherChan is closed + go func() { + p.finisherRoutine(finisherChan) + doneWg.Done() + }() + } + + p.model.rmut.RLock() + files := p.model.repoFiles[p.repo] + p.model.rmut.RUnlock() + + // !!! + // WithNeed takes a database snapshot (by necessity). By the time we've + // handled a bunch of files it might have become out of date and we might + // be attempting to sync with an old version of a file... + // !!! + + changed := 0 + files.WithNeed(protocol.LocalNodeID, func(intf protocol.FileIntf) bool { + file := intf.(protocol.FileInfo) + + events.Default.Log(events.ItemStarted, map[string]string{ + "repo": p.repo, + "item": file.Name, + }) + + if debug { + l.Debugln(p, "handling", file.Name) } - // Run the pulling loop as long as there are blocks to fetch - prevVer, queued = p.queueNeededBlocks(prevVer) - if queued > 0 { - p.errors = 0 + switch { + case protocol.IsDirectory(file.Flags) && protocol.IsDeleted(file.Flags): + // A deleted directory + p.deleteDir(file) + case protocol.IsDirectory(file.Flags): + // A new or changed directory + p.handleDir(file) + case protocol.IsDeleted(file.Flags): + // A deleted file + p.deleteFile(file) + default: + // A new or changed file + p.handleFile(file, copyChan, pullChan) + } - pull: - for { - select { - case res := <-p.requestResults: - p.model.setState(p.repoCfg.ID, RepoSyncing) - changed = true - p.requestSlots <- true - p.handleRequestResult(res) + changed++ + return true + }) - case <-p.requestSlots: - b, ok := p.bq.get() + // Signal copy and puller routines that we are done with the in data for + // this iteration + close(copyChan) + close(pullChan) - if !ok { - if debug { - l.Debugf("%q: pulling loop needs more blocks", p.repoCfg.ID) - } + // Wait for them to finish, then signal the finisher chan that there will + // be no more input. + wg.Wait() + close(finisherChan) - if p.errors > 0 && p.errors >= queued { - p.requestSlots <- true - break pull - } + // Wait for the finisherChan to finish. + doneWg.Wait() - prevVer, _ = p.queueNeededBlocks(prevVer) - b, ok = p.bq.get() - } + return changed +} - if !ok && len(p.openFiles) == 0 { - // Nothing queued, nothing outstanding - if debug { - l.Debugf("%q: pulling loop done", p.repoCfg.ID) - } - p.requestSlots <- true - break pull - } +// handleDir creates or updates the given directory +func (p *Puller) handleDir(file protocol.FileInfo) { + realName := filepath.Join(p.dir, file.Name) + mode := os.FileMode(file.Flags & 0777) - if !ok { - // Nothing queued, but there are still open files. - // Give the situation a moment to change. - if debug { - l.Debugf("%q: pulling loop paused", p.repoCfg.ID) - } - p.requestSlots <- true - time.Sleep(100 * time.Millisecond) - continue pull - } + if debug { + curFile := p.model.CurrentRepoFile(p.repo, file.Name) + l.Debugf("need dir\n\t%v\n\t%v", file, curFile) + } - if debug { - l.Debugf("queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy)) - } - p.model.setState(p.repoCfg.ID, RepoSyncing) - changed = true - if p.handleBlock(b) { - // Block was fully handled, free up the slot - p.requestSlots <- true - } + var err error + if info, err := os.Stat(realName); err != nil && os.IsNotExist(err) { + err = os.MkdirAll(realName, mode) + } else if !info.IsDir() { + l.Infof("Puller (repo %q, file %q): should be dir, but is not", p.repo, file.Name) + return + } else { + err = os.Chmod(realName, mode) + } + + if err == nil { + p.model.updateLocal(p.repo, file) + } +} + +// deleteDir attempts to delete the given directory +func (p *Puller) deleteDir(file protocol.FileInfo) { + realName := filepath.Join(p.dir, file.Name) + err := os.Remove(realName) + if err == nil || os.IsNotExist(err) { + p.model.updateLocal(p.repo, file) + } +} + +// deleteFile attempts to delete the given file +func (p *Puller) deleteFile(file protocol.FileInfo) { + realName := filepath.Join(p.dir, file.Name) + realDir := filepath.Dir(realName) + if info, err := os.Stat(realDir); err == nil && info.IsDir() && info.Mode()&04 == 0 { + // A non-writeable directory (for this user; we assume that's the + // relevant part). Temporarily change the mode so we can delete the + // file inside it. + err = os.Chmod(realDir, 0755) + if err == nil { + defer func() { + err = os.Chmod(realDir, info.Mode()) + if err != nil { + panic(err) + } + }() + } + } + + var err error + if p.versioner != nil { + err = p.versioner.Archive(realName) + } else { + err = os.Remove(realName) + } + + if err != nil { + l.Infof("Puller (repo %q, file %q): delete: %v", p.repo, file.Name, err) + } else { + p.model.updateLocal(p.repo, file) + } +} + +// handleFile queues the copies and pulls as necessary for a single new or +// changed file. +func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, pullChan chan<- pullBlockState) { + curFile := p.model.CurrentRepoFile(p.repo, file.Name) + copyBlocks, pullBlocks := scanner.BlockDiff(curFile.Blocks, file.Blocks) + + if len(copyBlocks) == len(curFile.Blocks) && len(pullBlocks) == 0 { + // We are supposed to copy the entire file, and then fetch nothing. We + // are only updating metadata, so we don't actually *need* to make the + // copy. + if debug { + l.Debugln(p, "taking shortcut on", file.Name) + } + p.shortcutFile(file) + return + } + + // Figure out the absolute filenames we need once and for all + tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name)) + realName := filepath.Join(p.dir, file.Name) + + s := sharedPullerState{ + file: file, + repo: p.repo, + tempName: tempName, + realName: realName, + pullNeeded: len(pullBlocks), + } + if len(copyBlocks) > 0 { + s.copyNeeded = 1 + } + + if debug { + l.Debugf("%v need file %s; copy %d, pull %d", p, file.Name, len(copyBlocks), len(pullBlocks)) + } + + if len(copyBlocks) > 0 { + cs := copyBlocksState{ + sharedPullerState: &s, + blocks: copyBlocks, + } + copyChan <- cs + } + + if len(pullBlocks) > 0 { + for _, block := range pullBlocks { + ps := pullBlockState{ + sharedPullerState: &s, + block: block, + } + pullChan <- ps + } + } +} + +// shortcutFile sets file mode and modification time, when that's the only +// thing that has changed. +func (p *Puller) shortcutFile(file protocol.FileInfo) { + realName := filepath.Join(p.dir, file.Name) + err := os.Chmod(realName, os.FileMode(file.Flags&0777)) + if err != nil { + l.Infof("Puller (repo %q, file %q): shortcut: %v", p.repo, file.Name, err) + return + } + + t := time.Unix(file.Modified, 0) + err = os.Chtimes(realName, t, t) + if err != nil { + l.Infof("Puller (repo %q, file %q): shortcut: %v", p.repo, file.Name, err) + return + } + + p.model.updateLocal(p.repo, file) +} + +// copierRoutine reads pullerStates until the in channel closes and performs +// the relevant copy. +func (p *Puller) copierRoutine(in <-chan copyBlocksState, out chan<- *sharedPullerState) { + buf := make([]byte, scanner.StandardBlockSize) + +nextFile: + for state := range in { + dstFd, err := state.tempFile() + if err != nil { + // Nothing more to do for this failed file (the error was logged + // when it happened) + continue nextFile + } + + srcFd, err := state.sourceFile() + if err != nil { + // As above + continue nextFile + } + + for _, block := range state.blocks { + buf = buf[:int(block.Size)] + + _, err = srcFd.ReadAt(buf, block.Offset) + if err != nil { + state.earlyClose("src read", err) + srcFd.Close() + continue nextFile + } + + _, err = dstFd.WriteAt(buf, block.Offset) + if err != nil { + state.earlyClose("dst write", err) + srcFd.Close() + continue nextFile + } + } + + srcFd.Close() + state.copyDone() + out <- state.sharedPullerState + } +} + +func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) { +nextBlock: + for state := range in { + if state.failed() != nil { + continue nextBlock + } + + // Select the least busy node to pull the block frop.model. If we found no + // feasible node at all, fail the block (and in the long run, the + // file). + potentialNodes := p.model.availability(p.repo, state.file.Name) + selected := activity.leastBusy(potentialNodes) + if selected == (protocol.NodeID{}) { + state.earlyClose("pull", errNoNode) + continue nextBlock + } + + // Get an fd to the temporary file. Tehcnically we don't need it until + // after fetching the block, but if we run into an error here there is + // no point in issuing the request to the network. + fd, err := state.tempFile() + if err != nil { + continue nextBlock + } + + // Fetch the block, while marking the selected node as in use so that + // leastBusy can select another node when someone else asks. + activity.using(selected) + buf, err := p.model.requestGlobal(selected, p.repo, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash) + activity.done(selected) + if err != nil { + state.earlyClose("pull", err) + continue nextBlock + } + + // Save the block data we got from the cluster + _, err = fd.WriteAt(buf, state.block.Offset) + if err != nil { + state.earlyClose("save", err) + continue nextBlock + } + + state.pullDone() + out <- state.sharedPullerState + } +} + +func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) { + for state := range in { + if closed, err := state.finalClose(); closed { + if debug { + l.Debugln(p, "closing", state.file.Name) + } + if err != nil { + l.Warnln("puller: final:", err) + continue + } + + // Verify the file against expected hashes + fd, err := os.Open(state.tempName) + if err != nil { + l.Warnln("puller: final:", err) + continue + } + err = scanner.Verify(fd, scanner.StandardBlockSize, state.file.Blocks) + fd.Close() + if err != nil { + l.Warnln("puller: final:", state.file.Name, err) + continue + } + + // Set the correct permission bits on the new file + err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777)) + if err != nil { + os.Remove(state.tempName) + l.Warnln("puller: final:", err) + continue + } + + // Set the correct timestamp on the new file + t := time.Unix(state.file.Modified, 0) + err = os.Chtimes(state.tempName, t, t) + if err != nil { + os.Remove(state.tempName) + l.Warnln("puller: final:", err) + continue + } + + // If we should use versioning, let the versioner archive the old + // file before we replace it. Archiving a non-existent file is not + // an error. + if p.versioner != nil { + err = p.versioner.Archive(state.realName) + if err != nil { + os.Remove(state.tempName) + l.Warnln("puller: final:", err) + continue } } - if p.errors > 0 && p.errors >= queued { - l.Warnf("All remaining files failed to sync. Stopping repo %q.", p.repoCfg.ID) - invalidateRepo(p.cfg, p.repoCfg.ID, errors.New("too many errors, check logs")) - return - } - } - - if changed { - p.model.setState(p.repoCfg.ID, RepoCleaning) - p.clean() - changed = false - } - - p.model.setState(p.repoCfg.ID, RepoIdle) - - // Do a rescan if it's time for it - if time.Since(lastscan) > scanintv { - if debug { - l.Debugf("%q: time for rescan", p.repoCfg.ID) - } - - err := p.model.ScanRepo(p.repoCfg.ID) + // Replace the original file with the new one + err = osutil.Rename(state.tempName, state.realName) if err != nil { - invalidateRepo(p.cfg, p.repoCfg.ID, err) - return + os.Remove(state.tempName) + l.Warnln("puller: final:", err) + continue } - lastscan = time.Now() - } - time.Sleep(5 * time.Second) - } -} - -func (p *puller) runRO() { - walkTicker := time.Tick(time.Duration(p.repoCfg.RescanIntervalS) * time.Second) - - for _ = range walkTicker { - if debug { - l.Debugf("%q: time for rescan", p.repoCfg.ID) - } - err := p.model.ScanRepo(p.repoCfg.ID) - if err != nil { - invalidateRepo(p.cfg, p.repoCfg.ID, err) - return + // Record the updated file in the index + p.model.updateLocal(p.repo, state.file) } } } -// clean deletes orphaned temporary files and directories that should no -// longer exist. -func (p *puller) clean() { - var deleteDirs []string - var changed = 0 - - var walkFn = func(path string, info os.FileInfo, err error) error { +// clean deletes orphaned temporary files +func (p *Puller) clean() { + filepath.Walk(p.dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } @@ -281,539 +573,8 @@ func (p *puller) clean() { os.Remove(path) } - if !info.IsDir() { - return nil - } - - rn, err := filepath.Rel(p.repoCfg.Directory, path) - if err != nil { - return nil - } - - if rn == "." { - return nil - } - - if filepath.Base(rn) == ".stversions" { - return filepath.SkipDir - } - - cur := p.model.CurrentRepoFile(p.repoCfg.ID, rn) - if cur.Name != rn { - // No matching dir in current list; weird - if debug { - l.Debugf("missing dir: %s; %v", rn, cur) - } - return nil - } - - if protocol.IsDeleted(cur.Flags) { - if debug { - l.Debugf("queue delete dir: %v", cur) - } - - // We queue the directories to delete since we walk the - // tree in depth first order and need to remove the - // directories in the opposite order. - - deleteDirs = append(deleteDirs, path) - return nil - } - - if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(cur.Flags) && !scanner.PermsEqual(cur.Flags, uint32(info.Mode())) { - err := os.Chmod(path, os.FileMode(cur.Flags)&os.ModePerm) - if err != nil { - l.Warnf("Restoring folder flags: %q: %v", path, err) - } else { - changed++ - if debug { - l.Debugf("restored dir flags: %o -> %v", info.Mode()&os.ModePerm, cur) - } - } - } - return nil - } - - for { - deleteDirs = nil - changed = 0 - filepath.Walk(p.repoCfg.Directory, walkFn) - - var deleted = 0 - // Delete any queued directories - for i := len(deleteDirs) - 1; i >= 0; i-- { - dir := deleteDirs[i] - if debug { - l.Debugln("delete dir:", dir) - } - err := os.Remove(dir) - if err == nil { - deleted++ - } else { - l.Warnln("Delete dir:", err) - } - } - - if debug { - l.Debugf("changed %d, deleted %d dirs", changed, deleted) - } - - if changed+deleted == 0 { - return - } - } -} - -func (p *puller) handleRequestResult(res requestResult) { - p.oustandingPerNode.decrease(res.node) - f := res.file - - of, ok := p.openFiles[f.Name] - if !ok { - // no entry in openFiles means there was an error and we've cancelled the operation - return - } - - if res.err != nil { - // This request resulted in an error - of.err = res.err - if debug { - l.Debugf("pull: not writing %q / %q offset %d: %v; (done=%v, outstanding=%d)", p.repoCfg.ID, f.Name, res.offset, res.err, of.done, of.outstanding) - } - } else if of.err == nil { - // This request was sucessfull and nothing has failed previously either - _, of.err = of.file.WriteAt(res.data, res.offset) - if debug { - l.Debugf("pull: wrote %q / %q offset %d len %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, len(res.data), of.outstanding, of.done) - } - } - - of.outstanding-- - p.openFiles[f.Name] = of - - if of.done && of.outstanding == 0 { - p.closeFile(f) - } -} - -// handleBlock fulfills the block request by copying, ignoring or fetching -// from the network. Returns true if the block was fully handled -// synchronously, i.e. if the slot can be reused. -func (p *puller) handleBlock(b bqBlock) bool { - f := b.file - - // For directories, making sure they exist is enough. - // Deleted directories we mark as handled and delete later. - if protocol.IsDirectory(f.Flags) { - if !protocol.IsDeleted(f.Flags) { - path := filepath.Join(p.repoCfg.Directory, f.Name) - _, err := os.Stat(path) - if err != nil && os.IsNotExist(err) { - if debug { - l.Debugf("create dir: %v", f) - } - err = os.MkdirAll(path, os.FileMode(f.Flags&0777)) - if err != nil { - p.errors++ - l.Infof("mkdir: error: %q: %v", path, err) - } - } - } else if debug { - l.Debugf("ignore delete dir: %v", f) - } - p.model.updateLocal(p.repoCfg.ID, f) - return true - } - - if len(b.copy) > 0 && len(b.copy) == len(b.file.Blocks) && b.last { - // We are supposed to copy the entire file, and then fetch nothing. - // We don't actually need to make the copy. - if debug { - l.Debugln("taking shortcut:", f) - } - fp := filepath.Join(p.repoCfg.Directory, f.Name) - t := time.Unix(f.Modified, 0) - err := os.Chtimes(fp, t, t) - if err != nil { - l.Infof("chtimes: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - } - if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) { - err = os.Chmod(fp, os.FileMode(f.Flags&0777)) - if err != nil { - l.Infof("chmod: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - } - } - - events.Default.Log(events.ItemStarted, map[string]string{ - "repo": p.repoCfg.ID, - "item": f.Name, - }) - - p.model.updateLocal(p.repoCfg.ID, f) - return true - } - - of, ok := p.openFiles[f.Name] - of.done = b.last - - if !ok { - if debug { - l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name) - } - - events.Default.Log(events.ItemStarted, map[string]string{ - "repo": p.repoCfg.ID, - "item": f.Name, - }) - - of.availability = p.model.repoFiles[p.repoCfg.ID].Availability(f.Name) - of.filepath = filepath.Join(p.repoCfg.Directory, f.Name) - of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name)) - - dirName := filepath.Dir(of.filepath) - info, err := os.Stat(dirName) - if err != nil { - err = os.MkdirAll(dirName, 0777) - if debug && err != nil { - l.Debugf("mkdir: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - } - } else { - // We need to make sure the directory is writeable so we can create files in it - if dirName != p.repoCfg.Directory { - err = os.Chmod(dirName, 0777) - if debug && err != nil { - l.Debugf("make writeable: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - } - } - // Change it back after creating the file, to minimize the time window with incorrect permissions - defer os.Chmod(dirName, info.Mode()) - } - - of.file, of.err = os.Create(of.temp) - if of.err != nil { - p.errors++ - l.Infof("create: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err) - if !b.last { - p.openFiles[f.Name] = of - } - return true - } - osutil.HideFile(of.temp) - } - - if of.err != nil { - // We have already failed this file. - if debug { - l.Debugf("pull: error: %q / %q has already failed: %v", p.repoCfg.ID, f.Name, of.err) - } - if b.last { - delete(p.openFiles, f.Name) - } - - return true - } - - p.openFiles[f.Name] = of - - switch { - case len(b.copy) > 0: - p.handleCopyBlock(b) - return true - - case b.block.Size > 0: - return p.handleRequestBlock(b) - - default: - p.handleEmptyBlock(b) - return true - } -} - -func (p *puller) handleCopyBlock(b bqBlock) { - // We have blocks to copy from the existing file - f := b.file - of := p.openFiles[f.Name] - - if debug { - l.Debugf("pull: copying %d blocks for %q / %q", len(b.copy), p.repoCfg.ID, f.Name) - } - - var exfd *os.File - exfd, of.err = os.Open(of.filepath) - if of.err != nil { - p.errors++ - l.Infof("open: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err) - of.file.Close() - of.file = nil - - p.openFiles[f.Name] = of - return - } - defer exfd.Close() - - for _, b := range b.copy { - bs := make([]byte, b.Size) - _, of.err = exfd.ReadAt(bs, b.Offset) - if of.err == nil { - _, of.err = of.file.WriteAt(bs, b.Offset) - } - if of.err != nil { - p.errors++ - l.Infof("write: error: %q / %q: %v", p.repoCfg.ID, f.Name, of.err) - exfd.Close() - of.file.Close() - of.file = nil - - p.openFiles[f.Name] = of - return - } - } -} - -// handleRequestBlock tries to pull a block from the network. Returns true if -// the block could _not_ be fetched (i.e. it was fully handled, matching the -// return criteria of handleBlock) -func (p *puller) handleRequestBlock(b bqBlock) bool { - f := b.file - of, ok := p.openFiles[f.Name] - if !ok { - panic("bug: request for non-open file") - } - - node := p.oustandingPerNode.leastBusyNode(of.availability, p.model.ConnectedTo) - if node == (protocol.NodeID{}) { - of.err = errNoNode - if of.file != nil { - of.file.Close() - of.file = nil - os.Remove(of.temp) - if debug { - l.Debugf("pull: no source for %q / %q; closed", p.repoCfg.ID, f.Name) - } - } - if b.last { - if debug { - l.Debugf("pull: no source for %q / %q; deleting", p.repoCfg.ID, f.Name) - } - delete(p.openFiles, f.Name) - } else { - if debug { - l.Debugf("pull: no source for %q / %q; await more blocks", p.repoCfg.ID, f.Name) - } - p.openFiles[f.Name] = of - } - return true - } - - of.outstanding++ - p.openFiles[f.Name] = of - - go func(node protocol.NodeID, b bqBlock) { - if debug { - l.Debugf("pull: requesting %q / %q offset %d size %d from %q outstanding %d", p.repoCfg.ID, f.Name, b.block.Offset, b.block.Size, node, of.outstanding) - } - - bs, err := p.model.requestGlobal(node, p.repoCfg.ID, f.Name, b.block.Offset, int(b.block.Size), nil) - p.requestResults <- requestResult{ - node: node, - file: f, - filepath: of.filepath, - offset: b.block.Offset, - data: bs, - err: err, - } - }(node, b) - - return false -} - -func (p *puller) handleEmptyBlock(b bqBlock) { - f := b.file - of := p.openFiles[f.Name] - - if b.last { - if of.err == nil { - of.file.Close() - } - } - - if protocol.IsDeleted(f.Flags) { - if debug { - l.Debugf("pull: delete %q", f.Name) - } - os.Remove(of.temp) - - // Ensure the file and the directory it is in is writeable so we can remove the file - dirName := filepath.Dir(of.filepath) - err := os.Chmod(of.filepath, 0666) - if debug && err != nil { - l.Debugf("make writeable: error: %q: %v", of.filepath, err) - } - if dirName != p.repoCfg.Directory { - info, err := os.Stat(dirName) - if err != nil { - l.Debugln("weird! can't happen?", err) - } - err = os.Chmod(dirName, 0777) - if debug && err != nil { - l.Debugf("make writeable: error: %q: %v", dirName, err) - } - // Change it back after deleting the file, to minimize the time window with incorrect permissions - defer os.Chmod(dirName, info.Mode()) - } - if p.versioner != nil { - if debug { - l.Debugln("pull: deleting with versioner") - } - if err := p.versioner.Archive(of.filepath); err == nil { - p.model.updateLocal(p.repoCfg.ID, f) - } else if debug { - l.Debugln("pull: error:", err) - } - } else if err := os.Remove(of.filepath); err == nil || os.IsNotExist(err) { - p.model.updateLocal(p.repoCfg.ID, f) - } - } else { - if debug { - l.Debugf("pull: no blocks to fetch and nothing to copy for %q / %q", p.repoCfg.ID, f.Name) - } - t := time.Unix(f.Modified, 0) - if os.Chtimes(of.temp, t, t) != nil { - delete(p.openFiles, f.Name) - return - } - if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) && os.Chmod(of.temp, os.FileMode(f.Flags&0777)) != nil { - delete(p.openFiles, f.Name) - return - } - osutil.ShowFile(of.temp) - if osutil.Rename(of.temp, of.filepath) == nil { - p.model.updateLocal(p.repoCfg.ID, f) - } - } - delete(p.openFiles, f.Name) -} - -func (p *puller) queueNeededBlocks(prevVer uint64) (uint64, int) { - curVer := p.model.LocalVersion(p.repoCfg.ID) - if curVer == prevVer { - return curVer, 0 - } - - if debug { - l.Debugf("%q: checking for more needed blocks", p.repoCfg.ID) - } - - queued := 0 - files := make([]protocol.FileInfo, 0, indexBatchSize) - for _, f := range p.model.NeedFilesRepoLimited(p.repoCfg.ID, indexBatchSize, pullIterationBlocks) { - if _, ok := p.openFiles[f.Name]; ok { - continue - } - files = append(files, f) - } - - perm := rand.Perm(len(files)) - for _, idx := range perm { - f := files[idx] - lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name) - have, need := scanner.BlockDiff(lf.Blocks, f.Blocks) - if debug { - l.Debugf("need:\n local: %v\n global: %v\n haveBlocks: %v\n needBlocks: %v", lf, f, have, need) - } - queued++ - p.bq.put(bqAdd{ - file: f, - have: have, - need: need, - }) - } - - if debug && queued > 0 { - l.Debugf("%q: queued %d items", p.repoCfg.ID, queued) - } - - if queued > 0 { - return prevVer, queued - } else { - return curVer, 0 - } -} - -func (p *puller) closeFile(f protocol.FileInfo) { - if debug { - l.Debugf("pull: closing %q / %q", p.repoCfg.ID, f.Name) - } - - of := p.openFiles[f.Name] - err := of.file.Close() - if err != nil { - p.errors++ - l.Infof("close: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - } - defer os.Remove(of.temp) - - delete(p.openFiles, f.Name) - - fd, err := os.Open(of.temp) - if err != nil { - p.errors++ - l.Infof("open: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - return - } - hb, _ := scanner.Blocks(fd, scanner.StandardBlockSize, f.Size()) - fd.Close() - - if l0, l1 := len(hb), len(f.Blocks); l0 != l1 { - if debug { - l.Debugf("pull: %q / %q: nblocks %d != %d", p.repoCfg.ID, f.Name, l0, l1) - } - return - } - - for i := range hb { - if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 { - if debug { - l.Debugf("pull: %q / %q: block %d hash mismatch\n have: %x\n want: %x", p.repoCfg.ID, f.Name, i, hb[i].Hash, f.Blocks[i].Hash) - } - return - } - } - - t := time.Unix(f.Modified, 0) - err = os.Chtimes(of.temp, t, t) - if err != nil { - l.Infof("chtimes: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - } - if !p.repoCfg.IgnorePerms && protocol.HasPermissionBits(f.Flags) { - err = os.Chmod(of.temp, os.FileMode(f.Flags&0777)) - if err != nil { - l.Infof("chmod: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - } - } - - osutil.ShowFile(of.temp) - - if p.versioner != nil { - err := p.versioner.Archive(of.filepath) - if err != nil { - if debug { - l.Debugf("pull: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - } - return - } - } - - if debug { - l.Debugf("pull: rename %q / %q: %q", p.repoCfg.ID, f.Name, of.filepath) - } - if err := osutil.Rename(of.temp, of.filepath); err == nil { - p.model.updateLocal(p.repoCfg.ID, f) - } else { - p.errors++ - l.Infof("rename: error: %q / %q: %v", p.repoCfg.ID, f.Name, err) - } + }) } func invalidateRepo(cfg *config.Configuration, repoID string, err error) { diff --git a/internal/model/sharedpullerstate.go b/internal/model/sharedpullerstate.go new file mode 100644 index 000000000..e2dfafb51 --- /dev/null +++ b/internal/model/sharedpullerstate.go @@ -0,0 +1,183 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package model + +import ( + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/syncthing/syncthing/internal/protocol" +) + +// A sharedPullerState is kept for each file that is being synced and is kept +// updated along the way. +type sharedPullerState struct { + // Immutable, does not require locking + file protocol.FileInfo + repo string + tempName string + realName string + + // Mutable, must be locked for access + err error // The first error we hit + fd *os.File // The fd of the temp file + copyNeeded int // Number of copy actions we expect to happen + pullNeeded int // Number of block pulls we expect to happen + closed bool // Set when the file has been closed + mut sync.Mutex // Protects the above +} + +// tempFile returns the fd for the temporary file, reusing an open fd +// or creating the file as necessary. +func (s *sharedPullerState) tempFile() (*os.File, error) { + s.mut.Lock() + defer s.mut.Unlock() + + // If we've already hit an error, return early + if s.err != nil { + return nil, s.err + } + + // If the temp file is already open, return the file descriptor + if s.fd != nil { + return s.fd, nil + } + + // Ensure that the parent directory exists or can be created + dir := filepath.Dir(s.tempName) + if info, err := os.Stat(dir); err != nil && os.IsNotExist(err) { + err = os.MkdirAll(dir, 0755) + if err != nil { + s.earlyCloseLocked("dst mkdir", err) + return nil, err + } + } else if err != nil { + s.earlyCloseLocked("dst stat dir", err) + return nil, err + } else if !info.IsDir() { + err = fmt.Errorf("%q: not a directory", dir) + s.earlyCloseLocked("dst mkdir", err) + return nil, err + } else if info.Mode()&04 == 0 { + err := os.Chmod(dir, 0755) + if err == nil { + defer func() { + err := os.Chmod(dir, info.Mode().Perm()) + if err != nil { + panic(err) + } + }() + } + } + + // Attempt to create the temp file + fd, err := os.OpenFile(s.tempName, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0644) + if err != nil { + s.earlyCloseLocked("dst create", err) + return nil, err + } + + // Same fd will be used by all writers + s.fd = fd + + return fd, nil +} + +// sourceFile opens the existing source file for reading +func (s *sharedPullerState) sourceFile() (*os.File, error) { + s.mut.Lock() + defer s.mut.Unlock() + + // If we've already hit an error, return early + if s.err != nil { + return nil, s.err + } + + // Attempt to open the existing file + fd, err := os.Open(s.realName) + if err != nil { + s.earlyCloseLocked("src open", err) + return nil, err + } + + return fd, nil +} + +// earlyClose prints a warning message composed of the context and +// error, and marks the sharedPullerState as failed. Is a no-op when called on +// an already failed state. +func (s *sharedPullerState) earlyClose(context string, err error) { + s.mut.Lock() + defer s.mut.Unlock() + + s.earlyCloseLocked(context, err) +} + +func (s *sharedPullerState) earlyCloseLocked(context string, err error) { + if s.err != nil { + return + } + + l.Infof("Puller (repo %q, file %q): %s: %v", s.repo, s.file.Name, context, err) + s.err = err + if s.fd != nil { + s.fd.Close() + os.Remove(s.tempName) + } + s.closed = true +} + +func (s *sharedPullerState) failed() error { + s.mut.Lock() + defer s.mut.Unlock() + + return s.err +} + +func (s *sharedPullerState) copyDone() { + s.mut.Lock() + s.copyNeeded-- + if debug { + l.Debugln("sharedPullerState", s.repo, s.file.Name, "copyNeeded ->", s.pullNeeded) + } + s.mut.Unlock() +} + +func (s *sharedPullerState) pullDone() { + s.mut.Lock() + s.pullNeeded-- + if debug { + l.Debugln("sharedPullerState", s.repo, s.file.Name, "pullNeeded ->", s.pullNeeded) + } + s.mut.Unlock() +} + +// finalClose atomically closes and returns closed status of a file. A true +// first return value means the file was closed and should be finished, with +// the error indicating the success or failure of the close. A false first +// return value indicates the file is not ready to be closed, or is already +// closed and should in either case not be finished off now. +func (s *sharedPullerState) finalClose() (bool, error) { + s.mut.Lock() + defer s.mut.Unlock() + + if s.pullNeeded+s.copyNeeded != 0 { + // Not done yet. + return false, nil + } + if s.closed { + // Already handled. + return false, nil + } + + s.closed = true + if fd := s.fd; fd != nil { + s.fd = nil + return true, fd.Close() + } + return true, nil +} diff --git a/internal/model/sharedpullerstate_test.go b/internal/model/sharedpullerstate_test.go new file mode 100644 index 000000000..6147f741a --- /dev/null +++ b/internal/model/sharedpullerstate_test.go @@ -0,0 +1,52 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package model + +import "testing" + +func TestSourceFileOK(t *testing.T) { + s := sharedPullerState{ + realName: "testdata/foo", + } + + fd, err := s.sourceFile() + if err != nil { + t.Fatal(err) + } + if fd == nil { + t.Fatal("Unexpected nil fd") + } + + bs := make([]byte, 6) + n, err := fd.Read(bs) + + if n != len(bs) { + t.Fatal("Wrong read length %d != %d", n, len(bs)) + } + if string(bs) != "foobar" { + t.Fatal("Wrong contents %s != foobar", bs) + } + + if err := s.failed(); err != nil { + t.Fatal(err) + } +} + +func TestSourceFileBad(t *testing.T) { + s := sharedPullerState{ + realName: "nonexistent", + } + + fd, err := s.sourceFile() + if err == nil { + t.Fatal("Unexpected nil error") + } + if fd != nil { + t.Fatal("Unexpected non-nil fd") + } + if err := s.failed(); err == nil { + t.Fatal("Unexpected nil failed()") + } +} diff --git a/internal/scanner/blocks.go b/internal/scanner/blocks.go index 9400e10e0..22aa6596f 100644 --- a/internal/scanner/blocks.go +++ b/internal/scanner/blocks.go @@ -7,6 +7,7 @@ package scanner import ( "bytes" "crypto/sha256" + "fmt" "io" "github.com/syncthing/syncthing/internal/protocol" @@ -88,3 +89,32 @@ func BlockDiff(src, tgt []protocol.BlockInfo) (have, need []protocol.BlockInfo) return have, need } + +// Verify returns nil or an error describing the mismatch between the block +// list and actual reader contents +func Verify(r io.Reader, blocksize int, blocks []protocol.BlockInfo) error { + hf := sha256.New() + for i, block := range blocks { + lr := &io.LimitedReader{R: r, N: int64(blocksize)} + _, err := io.Copy(hf, lr) + if err != nil { + return err + } + + hash := hf.Sum(nil) + hf.Reset() + + if bytes.Compare(hash, block.Hash) != 0 { + return fmt.Errorf("hash mismatch %x != %x for block %d", hash, block.Hash, i) + } + } + + // We should have reached the end now + bs := make([]byte, 1) + n, err := r.Read(bs) + if n != 0 || err != io.EOF { + return fmt.Errorf("file continues past end of blocks") + } + + return nil +} diff --git a/internal/scanner/walk_test.go b/internal/scanner/walk_test.go index 6a15e989d..114e2b6c2 100644 --- a/internal/scanner/walk_test.go +++ b/internal/scanner/walk_test.go @@ -133,6 +133,50 @@ func TestWalkError(t *testing.T) { } } +func TestVerify(t *testing.T) { + blocksize := 16 + // data should be an even multiple of blocksize long + data := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut e") + buf := bytes.NewBuffer(data) + + blocks, err := Blocks(buf, blocksize, 0) + if err != nil { + t.Fatal(err) + } + if exp := len(data) / blocksize; len(blocks) != exp { + t.Fatalf("Incorrect number of blocks %d != %d", len(blocks), exp) + } + + buf = bytes.NewBuffer(data) + err = Verify(buf, blocksize, blocks) + t.Log(err) + if err != nil { + t.Fatal("Unexpected verify failure", err) + } + + buf = bytes.NewBuffer(append(data, '\n')) + err = Verify(buf, blocksize, blocks) + t.Log(err) + if err == nil { + t.Fatal("Unexpected verify success") + } + + buf = bytes.NewBuffer(data[:len(data)-1]) + err = Verify(buf, blocksize, blocks) + t.Log(err) + if err == nil { + t.Fatal("Unexpected verify success") + } + + data[42] = 42 + buf = bytes.NewBuffer(data) + err = Verify(buf, blocksize, blocks) + t.Log(err) + if err == nil { + t.Fatal("Unexpected verify success") + } +} + type fileList []protocol.FileInfo func (f fileList) Len() int { diff --git a/test/http.go b/test/http.go index e96b944b9..53f5f63bb 100644 --- a/test/http.go +++ b/test/http.go @@ -36,7 +36,6 @@ var jsonEndpoints = []string{ "/rest/errors", "/rest/events", "/rest/lang", - "/rest/model/version?repo=default", "/rest/model?repo=default", "/rest/need", "/rest/nodeid?id=I6KAH7666SLLLB5PFXSOAUFJCDZCYAOMLEKCP2GB32BV5RQST3PSROAU", diff --git a/test/test-delupd.sh b/test/test-delupd.sh index d1481fe21..9e0e30e49 100755 --- a/test/test-delupd.sh +++ b/test/test-delupd.sh @@ -19,7 +19,7 @@ go build json.go start() { echo "Starting..." for i in 1 2 3 ; do - STTRACE=files,model,puller,versioner,protocol STPROFILER=":909$i" syncthing -home "h$i" > "$i.out" 2>&1 & + STTRACE=model,scanner STPROFILER=":909$i" syncthing -home "h$i" > "$i.out" 2>&1 & done } @@ -100,7 +100,7 @@ alterFiles() { echo " $i: deleting $todelete files..." set +o pipefail find . -type f \ - | grep -v large \ + | grep -v timechanged \ | sort -k 1.16 \ | head -n "$todelete" \ | xargs rm -f @@ -110,11 +110,10 @@ alterFiles() { # Create some new files and alter existing ones echo " $i: random nonoverlapping" ../genfiles -maxexp 22 -files 200 - echo " $i: append to large file" - dd if=large-$i bs=1024k count=4 >> large-$i 2>/dev/null echo " $i: new files in ro directory" uuidgen > ro-test/$(uuidgen) chmod 500 ro-test + touch "timechanged-$i" ../md5r -l | sort | grep -v .stversions > ../md5-$i popd >/dev/null @@ -140,6 +139,7 @@ for i in 1 12-2 23-3; do mkdir ro-test uuidgen > ro-test/$(uuidgen) chmod 500 ro-test + dd if=/dev/urandom of="timechanged-$i" bs=1024k count=1 popd >/dev/null done