diff --git a/logger.go b/logger.go index 54b3c1aba..1608b3bc8 100644 --- a/logger.go +++ b/logger.go @@ -6,7 +6,8 @@ import ( "os" ) -var logger = log.New(os.Stderr, "", log.Ltime) +// set in main() +var logger *log.Logger func debugln(vals ...interface{}) { s := fmt.Sprintln(vals...) diff --git a/main.go b/main.go index f5c845ebc..2b5e12bf0 100644 --- a/main.go +++ b/main.go @@ -97,8 +97,11 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU()) } + log.SetOutput(os.Stderr) + logger = log.New(os.Stderr, "", log.Flags()) if len(opts.Debug.TraceModel) > 0 || opts.Debug.LogSource { - logger = log.New(os.Stderr, "", log.Lshortfile|log.Ldate|log.Ltime|log.Lmicroseconds) + log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds) + logger.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds) } opts.ConfDir = expandTilde(opts.ConfDir) diff --git a/model/filemonitor.go b/model/filemonitor.go index bf4c7194b..b1b7c5a16 100644 --- a/model/filemonitor.go +++ b/model/filemonitor.go @@ -29,7 +29,10 @@ func (m *fileMonitor) FileBegins(cc <-chan content) error { dir := path.Dir(tmp) _, err := os.Stat(dir) if err != nil && os.IsNotExist(err) { - os.MkdirAll(dir, 0777) + err = os.MkdirAll(dir, 0777) + if err != nil { + return err + } } outFile, err := os.Create(tmp) @@ -106,6 +109,13 @@ func (m *fileMonitor) FileDone() error { tmp := tempName(m.path, m.global.Modified) defer os.Remove(tmp) + if m.copyError != nil { + return m.copyError + } + if m.writeError != nil { + return m.writeError + } + err := hashCheck(tmp, m.global.Blocks) if err != nil { return fmt.Errorf("%s: %s (tmp) (deleting)", path.Base(m.name), err.Error()) diff --git a/model/filequeue.go b/model/filequeue.go index 8a787f418..08588b15a 100644 --- a/model/filequeue.go +++ b/model/filequeue.go @@ -7,20 +7,16 @@ import ( "time" ) -type Resolver interface { - WhoHas(string) []string -} - type Monitor interface { FileBegins(<-chan content) error FileDone() error } type FileQueue struct { - resolver Resolver - files queuedFileList - lock sync.Mutex - sorted bool + files queuedFileList + lock sync.Mutex + sorted bool + availability map[string][]string } type queuedFile struct { @@ -92,32 +88,28 @@ func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) { } for i := range q.files { - if time.Since(q.files[i].nodesChecked) > 5*time.Second { - // Refresh node list every now and then - q.files[i].nodes = q.resolver.WhoHas(q.files[i].name) - } + qf := &q.files[i] - if len(q.files[i].nodes) == 0 { + if len(q.availability[qf.name]) == 0 { // Noone has the file we want; abort. - if q.files[i].remaining != len(q.files[i].blocks) { + if qf.remaining != len(qf.blocks) { // We have already started on this file; close it down - close(q.files[i].channel) - if mon := q.files[i].monitor; mon != nil { + close(qf.channel) + if mon := qf.monitor; mon != nil { mon.FileDone() } } - q.deleteIndex(i) + q.deleteAt(i) return queuedBlock{}, false } - qf := q.files[i] - for _, ni := range qf.nodes { + for _, ni := range q.availability[qf.name] { // Find and return the next block in the queue if ni == nodeID { for j, b := range qf.blocks { if !qf.activeBlocks[j] { - q.files[i].activeBlocks[j] = true - q.files[i].given++ + qf.activeBlocks[j] = true + qf.given++ return queuedBlock{ name: qf.name, block: b, @@ -142,29 +134,31 @@ func (q *FileQueue) Done(file string, offset int64, data []byte) { offset: offset, data: data, } - for i, qf := range q.files { + for i := range q.files { + qf := &q.files[i] + if qf.name == file { if qf.monitor != nil && qf.remaining == len(qf.blocks) { err := qf.monitor.FileBegins(qf.channel) if err != nil { log.Printf("WARNING: %s: %v (not synced)", qf.name, err) - q.deleteIndex(i) + q.deleteAt(i) return } } qf.channel <- c - q.files[i].remaining-- + qf.remaining-- - if q.files[i].remaining == 0 { + if qf.remaining == 0 { close(qf.channel) - q.deleteIndex(i) if qf.monitor != nil { err := qf.monitor.FileDone() if err != nil { log.Printf("WARNING: %s: %v", qf.name, err) } } + q.deleteAt(i) } return } @@ -194,6 +188,24 @@ func (q *FileQueue) QueuedFiles() (files []string) { return } -func (q *FileQueue) deleteIndex(i int) { +func (q *FileQueue) deleteAt(i int) { q.files = q.files[:i+copy(q.files[i:], q.files[i+1:])] } + +func (q *FileQueue) SetAvailable(file, node string) { + q.lock.Lock() + defer q.lock.Unlock() + if q.availability == nil { + q.availability = make(map[string][]string) + } + q.availability[file] = []string{node} +} + +func (q *FileQueue) AddAvailable(file, node string) { + q.lock.Lock() + defer q.lock.Unlock() + if q.availability == nil { + q.availability = make(map[string][]string) + } + q.availability[file] = append(q.availability[file], node) +} diff --git a/model/model.go b/model/model.go index ee5279a23..092277931 100644 --- a/model/model.go +++ b/model/model.go @@ -94,7 +94,6 @@ func NewModel(dir string) *Model { fileWasSuppressed: make(map[string]int), dq: make(chan File), } - m.fq.resolver = m go m.broadcastIndexLoop() return m @@ -618,13 +617,16 @@ func (m *Model) recomputeGlobal() { } var highestMod int64 - for _, fs := range m.remote { + for nodeID, fs := range m.remote { for n, nf := range fs { if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) { newGlobal[n] = nf + m.fq.SetAvailable(n, nodeID) if nf.Modified > highestMod { highestMod = nf.Modified } + } else if lf.Equals(nf) { + m.fq.AddAvailable(n, nodeID) } } } @@ -690,8 +692,14 @@ func (m *Model) recomputeNeed() { } } -// Must be called with the read lock held. func (m *Model) WhoHas(name string) []string { + m.RLock() + defer m.RUnlock() + return m.whoHas(name) +} + +// Must be called with the read lock held. +func (m *Model) whoHas(name string) []string { var remote []string gf := m.global[name]