From 45fcf4bc849105c9a2e07fe6a648eeed94350d03 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 9 Jan 2014 16:35:49 +0100 Subject: [PATCH] Implement new puller routine (fixes #33) --- main.go | 7 +- model/blocks.go | 10 +- model/blocks_test.go | 10 +- model/filemonitor.go | 154 +++++++++++++++++++++ model/filequeue.go | 199 +++++++++++++++++++++++++++ model/filequeue_test.go | 276 ++++++++++++++++++++++++++++++++++++++ model/model.go | 126 +++++++++++++---- model/model_puller.go | 258 ----------------------------------- model/model_test.go | 49 ++++--- model/walk.go | 7 +- protocol/common_test.go | 4 +- protocol/messages.go | 10 +- protocol/messages_test.go | 2 +- protocol/protocol.go | 8 +- 14 files changed, 787 insertions(+), 333 deletions(-) create mode 100644 model/filemonitor.go create mode 100644 model/filequeue.go create mode 100644 model/filequeue_test.go delete mode 100644 model/model_puller.go diff --git a/main.go b/main.go index 084ca02da..0cc34f6ed 100644 --- a/main.go +++ b/main.go @@ -41,7 +41,7 @@ type Options struct { type DebugOptions struct { LogSource bool `long:"log-source"` - TraceModel []string `long:"trace-model" value-name:"TRACE" description:"idx, net, file, need"` + TraceModel []string `long:"trace-model" value-name:"TRACE" description:"idx, net, file, need, pull"` TraceConnect bool `long:"trace-connect"` Profiler string `long:"profiler" value-name:"ADDR"` } @@ -54,8 +54,7 @@ type DiscoveryOptions struct { } type AdvancedOptions struct { - RequestsInFlight int `long:"reqs-in-flight" description:"Parallell in flight requests per file" default:"4" value-name:"REQS"` - FilesInFlight int `long:"files-in-flight" description:"Parallell in flight file pulls" default:"8" value-name:"FILES"` + RequestsInFlight int `long:"reqs-in-flight" description:"Parallell in flight requests per node" default:"8" value-name:"REQS"` LimitRate int `long:"send-rate" description:"Rate limit for outgoing data" default:"0" value-name:"KBPS"` ScanInterval time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"` ConnInterval time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"` @@ -205,7 +204,7 @@ func main() { infoln("Deletes from peer nodes are allowed") } okln("Ready to synchronize (read-write)") - m.StartRW(!opts.NoDelete, opts.Advanced.FilesInFlight, opts.Advanced.RequestsInFlight) + m.StartRW(!opts.NoDelete, opts.Advanced.RequestsInFlight) } else { okln("Ready to synchronize (read only; no external updates accepted)") } diff --git a/model/blocks.go b/model/blocks.go index ecc0bb163..5c14eaa19 100644 --- a/model/blocks.go +++ b/model/blocks.go @@ -7,15 +7,15 @@ import ( ) type Block struct { - Offset uint64 - Length uint32 + Offset int64 + Size uint32 Hash []byte } // Blocks returns the blockwise hash of the reader. func Blocks(r io.Reader, blocksize int) ([]Block, error) { var blocks []Block - var offset uint64 + var offset int64 for { lr := &io.LimitedReader{r, int64(blocksize)} hf := sha256.New() @@ -30,11 +30,11 @@ func Blocks(r io.Reader, blocksize int) ([]Block, error) { b := Block{ Offset: offset, - Length: uint32(n), + Size: uint32(n), Hash: hf.Sum(nil), } blocks = append(blocks, b) - offset += uint64(n) + offset += int64(n) } return blocks, nil diff --git a/model/blocks_test.go b/model/blocks_test.go index cc56549c9..f3ed6599f 100644 --- a/model/blocks_test.go +++ b/model/blocks_test.go @@ -52,7 +52,7 @@ func TestBlocks(t *testing.T) { t.Fatalf("Incorrect number of blocks %d != %d", l, len(test.hash)) } else { i := 0 - for off := uint64(0); off < uint64(len(test.data)); off += uint64(test.blocksize) { + for off := int64(0); off < int64(len(test.data)); off += int64(test.blocksize) { if blocks[i].Offset != off { t.Errorf("Incorrect offset for block %d: %d != %d", i, blocks[i].Offset, off) } @@ -61,8 +61,8 @@ func TestBlocks(t *testing.T) { if rem := len(test.data) - int(off); bs > rem { bs = rem } - if int(blocks[i].Length) != bs { - t.Errorf("Incorrect length for block %d: %d != %d", i, blocks[i].Length, bs) + if int(blocks[i].Size) != bs { + t.Errorf("Incorrect length for block %d: %d != %d", i, blocks[i].Size, bs) } if h := fmt.Sprintf("%x", blocks[i].Hash); h != test.hash[i] { t.Errorf("Incorrect block hash %q != %q", h, test.hash[i]) @@ -106,8 +106,8 @@ func TestDiff(t *testing.T) { if d[j].Offset != test.d[j].Offset { t.Errorf("Incorrect offset for diff %d block %d; %d != %d", i, j, d[j].Offset, test.d[j].Offset) } - if d[j].Length != test.d[j].Length { - t.Errorf("Incorrect length for diff %d block %d; %d != %d", i, j, d[j].Length, test.d[j].Length) + if d[j].Size != test.d[j].Size { + t.Errorf("Incorrect length for diff %d block %d; %d != %d", i, j, d[j].Size, test.d[j].Size) } } } diff --git a/model/filemonitor.go b/model/filemonitor.go new file mode 100644 index 000000000..bf4c7194b --- /dev/null +++ b/model/filemonitor.go @@ -0,0 +1,154 @@ +package model + +import ( + "bytes" + "errors" + "fmt" + "os" + "path" + "sync" + "time" + + "github.com/calmh/syncthing/buffers" +) + +type fileMonitor struct { + name string // in-repo name + path string // full path + writeDone sync.WaitGroup + model *Model + global File + localBlocks []Block + copyError error + writeError error +} + +func (m *fileMonitor) FileBegins(cc <-chan content) error { + tmp := tempName(m.path, m.global.Modified) + + dir := path.Dir(tmp) + _, err := os.Stat(dir) + if err != nil && os.IsNotExist(err) { + os.MkdirAll(dir, 0777) + } + + outFile, err := os.Create(tmp) + if err != nil { + return err + } + + m.writeDone.Add(1) + + var writeWg sync.WaitGroup + if len(m.localBlocks) > 0 { + writeWg.Add(1) + inFile, err := os.Open(m.path) + if err != nil { + return err + } + + // Copy local blocks, close infile when done + go m.copyLocalBlocks(inFile, outFile, &writeWg) + } + + // Write remote blocks, + writeWg.Add(1) + go m.copyRemoteBlocks(cc, outFile, &writeWg) + + // Wait for both writing routines, then close the outfile + go func() { + writeWg.Wait() + outFile.Close() + m.writeDone.Done() + }() + + return nil +} + +func (m *fileMonitor) copyLocalBlocks(inFile, outFile *os.File, writeWg *sync.WaitGroup) { + defer inFile.Close() + defer writeWg.Done() + + var buf = buffers.Get(BlockSize) + defer buffers.Put(buf) + + for _, lb := range m.localBlocks { + buf = buf[:lb.Size] + _, err := inFile.ReadAt(buf, lb.Offset) + if err != nil { + m.copyError = err + return + } + _, err = outFile.WriteAt(buf, lb.Offset) + if err != nil { + m.copyError = err + return + } + } +} + +func (m *fileMonitor) copyRemoteBlocks(cc <-chan content, outFile *os.File, writeWg *sync.WaitGroup) { + defer writeWg.Done() + + for content := range cc { + _, err := outFile.WriteAt(content.data, content.offset) + buffers.Put(content.data) + if err != nil { + m.writeError = err + return + } + } +} + +func (m *fileMonitor) FileDone() error { + m.writeDone.Wait() + + tmp := tempName(m.path, m.global.Modified) + defer os.Remove(tmp) + + err := hashCheck(tmp, m.global.Blocks) + if err != nil { + return fmt.Errorf("%s: %s (tmp) (deleting)", path.Base(m.name), err.Error()) + } + + err = os.Chtimes(tmp, time.Unix(m.global.Modified, 0), time.Unix(m.global.Modified, 0)) + if err != nil { + return err + } + + err = os.Chmod(tmp, os.FileMode(m.global.Flags&0777)) + if err != nil { + return err + } + + err = os.Rename(tmp, m.path) + if err != nil { + return err + } + + go m.model.updateLocalLocked(m.global) + return nil +} + +func hashCheck(name string, correct []Block) error { + rf, err := os.Open(name) + if err != nil { + return err + } + defer rf.Close() + + current, err := Blocks(rf, BlockSize) + if err != nil { + return err + } + if len(current) != len(correct) { + return errors.New("incorrect number of blocks") + } + for i := range current { + if bytes.Compare(current[i].Hash, correct[i].Hash) != 0 { + return fmt.Errorf("hash mismatch: %x != %x", current[i], correct[i]) + } + } + + return nil +} diff --git a/model/filequeue.go b/model/filequeue.go new file mode 100644 index 000000000..8a787f418 --- /dev/null +++ b/model/filequeue.go @@ -0,0 +1,199 @@ +package model + +import ( + "log" + "sort" + "sync" + "time" +) + +type Resolver interface { + WhoHas(string) []string +} + +type Monitor interface { + FileBegins(<-chan content) error + FileDone() error +} + +type FileQueue struct { + resolver Resolver + files queuedFileList + lock sync.Mutex + sorted bool +} + +type queuedFile struct { + name string + blocks []Block + activeBlocks []bool + given int + remaining int + channel chan content + nodes []string + nodesChecked time.Time + monitor Monitor +} + +type content struct { + offset int64 + data []byte +} + +type queuedFileList []queuedFile + +func (l queuedFileList) Len() int { return len(l) } + +func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] } + +func (l queuedFileList) Less(a, b int) bool { + // Sort by most blocks already given out, then alphabetically + if l[a].given != l[b].given { + return l[a].given > l[b].given + } + return l[a].name < l[b].name +} + +type queuedBlock struct { + name string + block Block + index int +} + +func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) { + q.lock.Lock() + defer q.lock.Unlock() + + q.files = append(q.files, queuedFile{ + name: name, + blocks: blocks, + activeBlocks: make([]bool, len(blocks)), + remaining: len(blocks), + channel: make(chan content), + monitor: monitor, + }) + q.sorted = false +} + +func (q *FileQueue) Len() int { + q.lock.Lock() + defer q.lock.Unlock() + + return len(q.files) +} + +func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) { + q.lock.Lock() + defer q.lock.Unlock() + + if !q.sorted { + sort.Sort(q.files) + q.sorted = true + } + + for i := range q.files { + if time.Since(q.files[i].nodesChecked) > 5*time.Second { + // Refresh node list every now and then + q.files[i].nodes = q.resolver.WhoHas(q.files[i].name) + } + + if len(q.files[i].nodes) == 0 { + // Noone has the file we want; abort. + if q.files[i].remaining != len(q.files[i].blocks) { + // We have already started on this file; close it down + close(q.files[i].channel) + if mon := q.files[i].monitor; mon != nil { + mon.FileDone() + } + } + q.deleteIndex(i) + return queuedBlock{}, false + } + + qf := q.files[i] + for _, ni := range qf.nodes { + // Find and return the next block in the queue + if ni == nodeID { + for j, b := range qf.blocks { + if !qf.activeBlocks[j] { + q.files[i].activeBlocks[j] = true + q.files[i].given++ + return queuedBlock{ + name: qf.name, + block: b, + index: j, + }, true + } + } + break + } + } + } + + // We found nothing to do + return queuedBlock{}, false +} + +func (q *FileQueue) Done(file string, offset int64, data []byte) { + q.lock.Lock() + defer q.lock.Unlock() + + c := content{ + offset: offset, + data: data, + } + for i, qf := range q.files { + if qf.name == file { + if qf.monitor != nil && qf.remaining == len(qf.blocks) { + err := qf.monitor.FileBegins(qf.channel) + if err != nil { + log.Printf("WARNING: %s: %v (not synced)", qf.name, err) + q.deleteIndex(i) + return + } + } + + qf.channel <- c + q.files[i].remaining-- + + if q.files[i].remaining == 0 { + close(qf.channel) + q.deleteIndex(i) + if qf.monitor != nil { + err := qf.monitor.FileDone() + if err != nil { + log.Printf("WARNING: %s: %v", qf.name, err) + } + } + } + return + } + } + panic("unreachable") +} + +func (q *FileQueue) Queued(file string) bool { + q.lock.Lock() + defer q.lock.Unlock() + + for _, qf := range q.files { + if qf.name == file { + return true + } + } + return false +} + +func (q *FileQueue) QueuedFiles() (files []string) { + q.lock.Lock() + defer q.lock.Unlock() + + for _, qf := range q.files { + files = append(files, qf.name) + } + return +} + +func (q *FileQueue) deleteIndex(i int) { + q.files = q.files[:i+copy(q.files[i:], q.files[i+1:])] +} diff --git a/model/filequeue_test.go b/model/filequeue_test.go new file mode 100644 index 000000000..fcb44870a --- /dev/null +++ b/model/filequeue_test.go @@ -0,0 +1,276 @@ +package model + +import ( + "reflect" + "strings" + "sync" + "sync/atomic" + "testing" +) + +type fakeResolver struct{} + +func (fakeResolver) WhoHas(n string) []string { + if strings.HasPrefix(n, "a-") { + return []string{"a", "nodeID"} + } else if strings.HasPrefix(n, "b-") { + return []string{"b", "nodeID"} + } + return []string{"a", "b", "nodeID"} +} + +func TestFileQueueAdd(t *testing.T) { + q := FileQueue{} + q.Add("foo", nil, nil) +} + +func TestFileQueueAddSorting(t *testing.T) { + q := FileQueue{resolver: fakeResolver{}} + q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) + q.Add("aaa", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) + b, _ := q.Get("nodeID") + if b.name != "aaa" { + t.Errorf("Incorrectly sorted get: %+v", b) + } + + q = FileQueue{resolver: fakeResolver{}} + q.Add("zzz", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) + b, _ = q.Get("nodeID") // Start on zzzz + if b.name != "zzz" { + t.Errorf("Incorrectly sorted get: %+v", b) + } + q.Add("aaa", []Block{{Offset: 0, Size: 128}, {Offset: 128, Size: 128}}, nil) + b, _ = q.Get("nodeID") + if b.name != "zzz" { + // Continue rather than starting a new file + t.Errorf("Incorrectly sorted get: %+v", b) + } +} + +func TestFileQueueLen(t *testing.T) { + q := FileQueue{} + q.Add("foo", nil, nil) + q.Add("bar", nil, nil) + + if l := q.Len(); l != 2 { + t.Errorf("Incorrect len %d != 2 after adds", l) + } +} + +func TestFileQueueGet(t *testing.T) { + q := FileQueue{resolver: fakeResolver{}} + q.Add("foo", []Block{ + {Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")}, + {Offset: 128, Size: 128, Hash: []byte("some other foo hash bytes")}, + {Offset: 256, Size: 128, Hash: []byte("more foo hash bytes")}, + }, nil) + q.Add("bar", []Block{ + {Offset: 0, Size: 128, Hash: []byte("some bar hash bytes")}, + {Offset: 128, Size: 128, Hash: []byte("some other bar hash bytes")}, + }, nil) + + // First get should return the first block of the first file + + expected := queuedBlock{ + name: "bar", + block: Block{ + Offset: 0, + Size: 128, + Hash: []byte("some bar hash bytes"), + }, + } + actual, ok := q.Get("nodeID") + + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned (first)\n E: %+v\n A: %+v", expected, actual) + } + + // Second get should return the next block of the first file + + expected = queuedBlock{ + name: "bar", + block: Block{ + Offset: 128, + Size: 128, + Hash: []byte("some other bar hash bytes"), + }, + index: 1, + } + actual, ok = q.Get("nodeID") + + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned (second)\n E: %+v\n A: %+v", expected, actual) + } + + // Third get should return the first block of the second file + + expected = queuedBlock{ + name: "foo", + block: Block{ + Offset: 0, + Size: 128, + Hash: []byte("some foo hash bytes"), + }, + } + actual, ok = q.Get("nodeID") + + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned (third)\n E: %+v\n A: %+v", expected, actual) + } +} + +/* +func TestFileQueueDone(t *testing.T) { + ch := make(chan content) + var recv sync.WaitGroup + recv.Add(1) + go func() { + content := <-ch + if bytes.Compare(content.data, []byte("first block bytes")) != 0 { + t.Error("Incorrect data in first content block") + } + + content = <-ch + if bytes.Compare(content.data, []byte("second block bytes")) != 0 { + t.Error("Incorrect data in second content block") + } + + _, ok := <-ch + if ok { + t.Error("Content channel not closed") + } + + recv.Done() + }() + + q := FileQueue{resolver: fakeResolver{}} + q.Add("foo", []Block{ + {Offset: 0, Length: 128, Hash: []byte("some foo hash bytes")}, + {Offset: 128, Length: 128, Hash: []byte("some other foo hash bytes")}, + }, ch) + + b0, _ := q.Get("nodeID") + b1, _ := q.Get("nodeID") + + q.Done(b0.name, b0.block.Offset, []byte("first block bytes")) + q.Done(b1.name, b1.block.Offset, []byte("second block bytes")) + + recv.Wait() + + // Queue should now have one file less + + if l := q.Len(); l != 0 { + t.Error("Queue not empty") + } + + _, ok := q.Get("nodeID") + if ok { + t.Error("Unexpected OK Get()") + } +} +*/ + +func TestFileQueueGetNodeIDs(t *testing.T) { + q := FileQueue{resolver: fakeResolver{}} + q.Add("a-foo", []Block{ + {Offset: 0, Size: 128, Hash: []byte("some foo hash bytes")}, + {Offset: 128, Size: 128, Hash: []byte("some other foo hash bytes")}, + {Offset: 256, Size: 128, Hash: []byte("more foo hash bytes")}, + }, nil) + q.Add("b-bar", []Block{ + {Offset: 0, Size: 128, Hash: []byte("some bar hash bytes")}, + {Offset: 128, Size: 128, Hash: []byte("some other bar hash bytes")}, + }, nil) + + expected := queuedBlock{ + name: "b-bar", + block: Block{ + Offset: 0, + Size: 128, + Hash: []byte("some bar hash bytes"), + }, + } + actual, ok := q.Get("b") + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned\n E: %+v\n A: %+v", expected, actual) + } + + expected = queuedBlock{ + name: "a-foo", + block: Block{ + Offset: 0, + Size: 128, + Hash: []byte("some foo hash bytes"), + }, + } + actual, ok = q.Get("a") + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned\n E: %+v\n A: %+v", expected, actual) + } + + expected = queuedBlock{ + name: "a-foo", + block: Block{ + Offset: 128, + Size: 128, + Hash: []byte("some other foo hash bytes"), + }, + index: 1, + } + actual, ok = q.Get("nodeID") + if !ok { + t.Error("Unexpected non-OK Get()") + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Incorrect block returned\n E: %+v\n A: %+v", expected, actual) + } +} + +func TestFileQueueThreadHandling(t *testing.T) { + // This should pass with go test -race + + const n = 100 + var total int + var blocks []Block + for i := 1; i <= n; i++ { + blocks = append(blocks, Block{Offset: int64(i), Size: 1}) + total += i + } + + q := FileQueue{resolver: fakeResolver{}} + q.Add("foo", blocks, nil) + + var start = make(chan bool) + var gotTot uint32 + var wg sync.WaitGroup + wg.Add(n) + for i := 1; i <= n; i++ { + go func() { + <-start + b, _ := q.Get("nodeID") + atomic.AddUint32(&gotTot, uint32(b.block.Offset)) + wg.Done() + }() + } + + close(start) + wg.Wait() + if int(gotTot) != total { + t.Error("Total mismatch; %d != %d", gotTot, total) + } +} diff --git a/model/model.go b/model/model.go index 050bf19bc..ee5279a23 100644 --- a/model/model.go +++ b/model/model.go @@ -34,9 +34,10 @@ type Model struct { global map[string]File // the latest version of each file as it exists in the cluster local map[string]File // the files we currently have locally on disk remote map[string]map[string]File - need map[string]bool // the files we need to update protoConn map[string]Connection rawConn map[string]io.Closer + fq FileQueue // queue for files to fetch + dq chan File // queue for files to delete updatedLocal int64 // timestamp of last update to local updateGlobal int64 // timestamp of last update to remote @@ -44,23 +45,22 @@ type Model struct { lastIdxBcast time.Time lastIdxBcastRequest time.Time - rwRunning bool - parallellFiles int - paralllelReqs int - delete bool + rwRunning bool + delete bool trace map[string]bool fileLastChanged map[string]time.Time fileWasSuppressed map[string]int - limitRequestRate chan struct{} + parallellRequests int + limitRequestRate chan struct{} } type Connection interface { ID() string Index([]protocol.FileInfo) - Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) + Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) Statistics() protocol.Statistics } @@ -86,14 +86,15 @@ func NewModel(dir string) *Model { global: make(map[string]File), local: make(map[string]File), remote: make(map[string]map[string]File), - need: make(map[string]bool), protoConn: make(map[string]Connection), rawConn: make(map[string]io.Closer), lastIdxBcast: time.Now(), trace: make(map[string]bool), fileLastChanged: make(map[string]time.Time), fileWasSuppressed: make(map[string]int), + dq: make(chan File), } + m.fq.resolver = m go m.broadcastIndexLoop() return m @@ -124,7 +125,7 @@ func (m *Model) Trace(t string) { // StartRW starts read/write processing on the current model. When in // read/write mode the model will attempt to keep in sync with the cluster by // pulling needed files from peer nodes. -func (m *Model) StartRW(del bool, pfiles, preqs int) { +func (m *Model) StartRW(del bool, threads int) { m.Lock() defer m.Unlock() @@ -134,11 +135,12 @@ func (m *Model) StartRW(del bool, pfiles, preqs int) { m.rwRunning = true m.delete = del - m.parallellFiles = pfiles - m.paralllelReqs = preqs + m.parallellRequests = threads go m.cleanTempFiles() - go m.puller() + if del { + go m.deleteFiles() + } } // Generation returns an opaque integer that is guaranteed to increment on @@ -231,7 +233,7 @@ func (m *Model) NeedFiles() (files []File, bytes int) { m.RLock() defer m.RUnlock() - for n := range m.need { + for _, n := range m.fq.QueuedFiles() { f := m.global[n] files = append(files, f) bytes += f.Size() @@ -321,7 +323,7 @@ func (m *Model) Close(node string, err error) { // Request returns the specified data segment by reading it from local disk. // Implements the protocol.Model interface. -func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { +func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) { // Verify that the requested file exists in the local and global model. m.RLock() lf, localOk := m.local[name] @@ -346,7 +348,7 @@ func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash [] defer fd.Close() buf := buffers.Get(int(size)) - _, err = fd.ReadAt(buf, int64(offset)) + _, err = fd.ReadAt(buf, offset) if err != nil { return nil, err } @@ -446,6 +448,39 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { go func() { protoConn.Index(idx) }() + + if m.rwRunning { + for i := 0; i < m.parallellRequests; i++ { + i := i + go func() { + if m.trace["pull"] { + log.Println("PULL: Starting", nodeID, i) + } + for { + m.RLock() + if _, ok := m.protoConn[nodeID]; !ok { + if m.trace["pull"] { + log.Println("PULL: Exiting", nodeID, i) + } + m.RUnlock() + return + } + m.RUnlock() + + qb, ok := m.fq.Get(nodeID) + if ok { + if m.trace["pull"] { + log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset) + } + data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash) + m.fq.Done(qb.name, qb.block.Offset, data) + } else { + time.Sleep(1 * time.Second) + } + } + }() + } + } } func (m *Model) shouldSuppressChange(name string) bool { @@ -488,7 +523,7 @@ func (m *Model) protocolIndex() []protocol.FileInfo { return index } -func (m *Model) requestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { +func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) { m.RLock() nc, ok := m.protoConn[nodeID] m.RUnlock() @@ -520,7 +555,7 @@ func (m *Model) broadcastIndexLoop() { for _, node := range m.protoConn { node := node if m.trace["net"] { - log.Printf("NET IDX(out/loop): %s: %d files", node.ID, len(idx)) + log.Printf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx)) } go func() { node.Index(idx) @@ -558,6 +593,12 @@ func (m *Model) markDeletedLocals(newLocal map[string]File) bool { return updated } +func (m *Model) updateLocalLocked(f File) { + m.Lock() + m.updateLocal(f) + m.Unlock() +} + func (m *Model) updateLocal(f File) { if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) { m.local[f.Name] = f @@ -610,8 +651,10 @@ func (m *Model) recomputeGlobal() { // Must be called with the write lock held. func (m *Model) recomputeNeed() { - m.need = make(map[string]bool) for n, gf := range m.global { + if m.fq.Queued(n) { + continue + } lf, ok := m.local[n] if !ok || gf.NewerThan(lf) { if gf.Flags&protocol.FlagInvalid != 0 { @@ -627,15 +670,28 @@ func (m *Model) recomputeNeed() { continue } if m.trace["need"] { - log.Println("NEED:", ok, lf, gf) + log.Printf("NEED: lf:%v gf:%v", lf, gf) + } + + if gf.Flags&protocol.FlagDeleted != 0 { + m.dq <- gf + } else { + local, remote := BlockDiff(lf.Blocks, gf.Blocks) + fm := fileMonitor{ + name: n, + path: path.Clean(path.Join(m.dir, n)), + global: gf, + model: m, + localBlocks: local, + } + m.fq.Add(n, remote, &fm) } - m.need[n] = true } } } // Must be called with the read lock held. -func (m *Model) whoHas(name string) []string { +func (m *Model) WhoHas(name string) []string { var remote []string gf := m.global[name] @@ -648,21 +704,35 @@ func (m *Model) whoHas(name string) []string { return remote } +func (m *Model) deleteFiles() { + for file := range m.dq { + if m.trace["file"] { + log.Println("FILE: Delete", file.Name) + } + path := path.Clean(path.Join(m.dir, file.Name)) + err := os.Remove(path) + if err != nil { + log.Printf("WARNING: %s: %v", file.Name, err) + } + m.updateLocalLocked(file) + } +} + func fileFromFileInfo(f protocol.FileInfo) File { var blocks = make([]Block, len(f.Blocks)) - var offset uint64 + var offset int64 for i, b := range f.Blocks { blocks[i] = Block{ Offset: offset, - Length: b.Length, + Size: b.Size, Hash: b.Hash, } - offset += uint64(b.Length) + offset += int64(b.Size) } return File{ Name: f.Name, Flags: f.Flags, - Modified: int64(f.Modified), + Modified: f.Modified, Version: f.Version, Blocks: blocks, } @@ -672,14 +742,14 @@ func fileInfoFromFile(f File) protocol.FileInfo { var blocks = make([]protocol.BlockInfo, len(f.Blocks)) for i, b := range f.Blocks { blocks[i] = protocol.BlockInfo{ - Length: b.Length, - Hash: b.Hash, + Size: b.Size, + Hash: b.Hash, } } return protocol.FileInfo{ Name: f.Name, Flags: f.Flags, - Modified: int64(f.Modified), + Modified: f.Modified, Version: f.Version, Blocks: blocks, } diff --git a/model/model_puller.go b/model/model_puller.go deleted file mode 100644 index 42fc8bbe0..000000000 --- a/model/model_puller.go +++ /dev/null @@ -1,258 +0,0 @@ -package model - -/* - -Locking -======= - -These methods are never called from the outside so don't follow the locking -policy in model.go. - -TODO(jb): Refactor this into smaller and cleaner pieces. -TODO(jb): Increase performance by taking apparent peer bandwidth into account. - -*/ - -import ( - "bytes" - "errors" - "fmt" - "io" - "log" - "os" - "path" - "sync" - "time" - - "github.com/calmh/syncthing/buffers" - "github.com/calmh/syncthing/protocol" -) - -func (m *Model) pullFile(name string) error { - m.RLock() - var localFile = m.local[name] - var globalFile = m.global[name] - var nodeIDs = m.whoHas(name) - m.RUnlock() - - if len(nodeIDs) == 0 { - return fmt.Errorf("%s: no connected nodes with file available", name) - } - - filename := path.Join(m.dir, name) - sdir := path.Dir(filename) - - _, err := os.Stat(sdir) - if err != nil && os.IsNotExist(err) { - os.MkdirAll(sdir, 0777) - } - - tmpFilename := tempName(filename, globalFile.Modified) - tmpFile, err := os.Create(tmpFilename) - if err != nil { - return err - } - - contentChan := make(chan content, 32) - var applyDone sync.WaitGroup - applyDone.Add(1) - go func() { - applyContent(contentChan, tmpFile) - tmpFile.Close() - applyDone.Done() - }() - - local, remote := BlockDiff(localFile.Blocks, globalFile.Blocks) - var fetchDone sync.WaitGroup - - // One local copy routine - - fetchDone.Add(1) - go func() { - for _, block := range local { - data, err := m.Request("", name, block.Offset, block.Length, block.Hash) - if err != nil { - break - } - contentChan <- content{ - offset: int64(block.Offset), - data: data, - } - } - fetchDone.Done() - }() - - // N remote copy routines - - var remoteBlocks = blockIterator{blocks: remote} - for i := 0; i < m.paralllelReqs; i++ { - curNode := nodeIDs[i%len(nodeIDs)] - fetchDone.Add(1) - - go func(nodeID string) { - for { - block, ok := remoteBlocks.Next() - if !ok { - break - } - data, err := m.requestGlobal(nodeID, name, block.Offset, block.Length, block.Hash) - if err != nil { - break - } - contentChan <- content{ - offset: int64(block.Offset), - data: data, - } - } - fetchDone.Done() - }(curNode) - } - - fetchDone.Wait() - close(contentChan) - applyDone.Wait() - - err = hashCheck(tmpFilename, globalFile.Blocks) - if err != nil { - return fmt.Errorf("%s: %s (deleting)", path.Base(name), err.Error()) - } - - err = os.Chtimes(tmpFilename, time.Unix(globalFile.Modified, 0), time.Unix(globalFile.Modified, 0)) - if err != nil { - return err - } - - err = os.Chmod(tmpFilename, os.FileMode(globalFile.Flags&0777)) - if err != nil { - return err - } - - err = os.Rename(tmpFilename, filename) - if err != nil { - return err - } - - return nil -} - -func (m *Model) puller() { - for { - time.Sleep(time.Second) - - var ns []string - m.RLock() - for n := range m.need { - ns = append(ns, n) - } - m.RUnlock() - - if len(ns) == 0 { - continue - } - - var limiter = make(chan bool, m.parallellFiles) - var allDone sync.WaitGroup - - for _, n := range ns { - limiter <- true - allDone.Add(1) - - go func(n string) { - defer func() { - allDone.Done() - <-limiter - }() - - m.RLock() - f, ok := m.global[n] - m.RUnlock() - - if !ok { - return - } - - var err error - if f.Flags&protocol.FlagDeleted == 0 { - if m.trace["file"] { - log.Printf("FILE: Pull %q", n) - } - err = m.pullFile(n) - } else { - if m.trace["file"] { - log.Printf("FILE: Remove %q", n) - } - // Cheerfully ignore errors here - _ = os.Remove(path.Join(m.dir, n)) - } - if err == nil { - m.Lock() - m.updateLocal(f) - m.Unlock() - } - }(n) - } - - allDone.Wait() - } -} - -type content struct { - offset int64 - data []byte -} - -func applyContent(cc <-chan content, dst io.WriterAt) error { - var err error - - for c := range cc { - _, err = dst.WriteAt(c.data, c.offset) - buffers.Put(c.data) - if err != nil { - return err - } - } - - return nil -} - -func hashCheck(name string, correct []Block) error { - rf, err := os.Open(name) - if err != nil { - return err - } - defer rf.Close() - - current, err := Blocks(rf, BlockSize) - if err != nil { - return err - } - if len(current) != len(correct) { - return errors.New("incorrect number of blocks") - } - for i := range current { - if bytes.Compare(current[i].Hash, correct[i].Hash) != 0 { - return fmt.Errorf("hash mismatch: %x != %x", current[i], correct[i]) - } - } - - return nil -} - -type blockIterator struct { - sync.Mutex - blocks []Block -} - -func (i *blockIterator) Next() (b Block, ok bool) { - i.Lock() - defer i.Unlock() - - if len(i.blocks) == 0 { - return - } - - b, i.blocks = i.blocks[0], i.blocks[1:] - ok = true - - return -} diff --git a/model/model_test.go b/model/model_test.go index 3323baf19..0dd0c54d1 100644 --- a/model/model_test.go +++ b/model/model_test.go @@ -18,7 +18,7 @@ func TestNewModel(t *testing.T) { t.Fatalf("NewModel returned nil") } - if len(m.need) > 0 { + if fs, _ := m.NeedFiles(); len(fs) > 0 { t.Errorf("New model should have no Need") } @@ -32,13 +32,13 @@ var testDataExpected = map[string]File{ Name: "foo", Flags: 0, Modified: 0, - Blocks: []Block{{Offset: 0x0, Length: 0x7, Hash: []uint8{0xae, 0xc0, 0x70, 0x64, 0x5f, 0xe5, 0x3e, 0xe3, 0xb3, 0x76, 0x30, 0x59, 0x37, 0x61, 0x34, 0xf0, 0x58, 0xcc, 0x33, 0x72, 0x47, 0xc9, 0x78, 0xad, 0xd1, 0x78, 0xb6, 0xcc, 0xdf, 0xb0, 0x1, 0x9f}}}, + Blocks: []Block{{Offset: 0x0, Size: 0x7, Hash: []uint8{0xae, 0xc0, 0x70, 0x64, 0x5f, 0xe5, 0x3e, 0xe3, 0xb3, 0x76, 0x30, 0x59, 0x37, 0x61, 0x34, 0xf0, 0x58, 0xcc, 0x33, 0x72, 0x47, 0xc9, 0x78, 0xad, 0xd1, 0x78, 0xb6, 0xcc, 0xdf, 0xb0, 0x1, 0x9f}}}, }, "bar": File{ Name: "bar", Flags: 0, Modified: 0, - Blocks: []Block{{Offset: 0x0, Length: 0xa, Hash: []uint8{0x2f, 0x72, 0xcc, 0x11, 0xa6, 0xfc, 0xd0, 0x27, 0x1e, 0xce, 0xf8, 0xc6, 0x10, 0x56, 0xee, 0x1e, 0xb1, 0x24, 0x3b, 0xe3, 0x80, 0x5b, 0xf9, 0xa9, 0xdf, 0x98, 0xf9, 0x2f, 0x76, 0x36, 0xb0, 0x5c}}}, + Blocks: []Block{{Offset: 0x0, Size: 0xa, Hash: []uint8{0x2f, 0x72, 0xcc, 0x11, 0xa6, 0xfc, 0xd0, 0x27, 0x1e, 0xce, 0xf8, 0xc6, 0x10, 0x56, 0xee, 0x1e, 0xb1, 0x24, 0x3b, 0xe3, 0x80, 0x5b, 0xf9, 0xa9, 0xdf, 0x98, 0xf9, 0x2f, 0x76, 0x36, 0xb0, 0x5c}}}, }, } @@ -57,7 +57,7 @@ func TestUpdateLocal(t *testing.T) { fs, _ := m.Walk(false) m.ReplaceLocal(fs) - if len(m.need) > 0 { + if fs, _ := m.NeedFiles(); len(fs) > 0 { t.Fatalf("Model with only local data should have no need") } @@ -106,8 +106,8 @@ func TestRemoteUpdateExisting(t *testing.T) { } m.Index("42", []protocol.FileInfo{newFile}) - if l := len(m.need); l != 1 { - t.Errorf("Model missing Need for one file (%d != 1)", l) + if fs, _ := m.NeedFiles(); len(fs) != 1 { + t.Errorf("Model missing Need for one file (%d != 1)", len(fs)) } } @@ -123,8 +123,8 @@ func TestRemoteAddNew(t *testing.T) { } m.Index("42", []protocol.FileInfo{newFile}) - if l1, l2 := len(m.need), 1; l1 != l2 { - t.Errorf("Model len(m.need) incorrect (%d != %d)", l1, l2) + if fs, _ := m.NeedFiles(); len(fs) != 1 { + t.Errorf("Model len(m.need) incorrect (%d != 1)", len(fs)) } } @@ -141,8 +141,8 @@ func TestRemoteUpdateOld(t *testing.T) { } m.Index("42", []protocol.FileInfo{newFile}) - if l1, l2 := len(m.need), 0; l1 != l2 { - t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2) + if fs, _ := m.NeedFiles(); len(fs) != 0 { + t.Errorf("Model len(need) incorrect (%d != 0)", len(fs)) } } @@ -165,16 +165,16 @@ func TestRemoteIndexUpdate(t *testing.T) { m.Index("42", []protocol.FileInfo{foo}) - if _, ok := m.need["foo"]; !ok { + if fs, _ := m.NeedFiles(); fs[0].Name != "foo" { t.Error("Model doesn't need 'foo'") } m.IndexUpdate("42", []protocol.FileInfo{bar}) - if _, ok := m.need["foo"]; !ok { + if fs, _ := m.NeedFiles(); fs[0].Name != "foo" { t.Error("Model doesn't need 'foo'") } - if _, ok := m.need["bar"]; !ok { + if fs, _ := m.NeedFiles(); fs[1].Name != "bar" { t.Error("Model doesn't need 'bar'") } } @@ -292,8 +292,8 @@ func TestForgetNode(t *testing.T) { if l1, l2 := len(m.global), len(fs); l1 != l2 { t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) } - if l1, l2 := len(m.need), 0; l1 != l2 { - t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2) + if fs, _ := m.NeedFiles(); len(fs) != 0 { + t.Errorf("Model len(need) incorrect (%d != 0)", len(fs)) } newFile := protocol.FileInfo{ @@ -309,8 +309,8 @@ func TestForgetNode(t *testing.T) { if l1, l2 := len(m.global), len(fs)+1; l1 != l2 { t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) } - if l1, l2 := len(m.need), 1; l1 != l2 { - t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2) + if fs, _ := m.NeedFiles(); len(fs) != 1 { + t.Errorf("Model len(need) incorrect (%d != 1)", len(fs)) } m.Close("42", nil) @@ -321,8 +321,17 @@ func TestForgetNode(t *testing.T) { if l1, l2 := len(m.global), len(fs); l1 != l2 { t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) } - if l1, l2 := len(m.need), 0; l1 != l2 { - t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2) + + if fs, _ := m.NeedFiles(); len(fs) != 1 { + t.Errorf("Model len(need) incorrect (%d != 1)", len(fs)) + } + // The file will be removed from the need list when we notice there are no nodes that can provide it + _, ok := m.fq.Get("42") + if ok { + t.Errorf("Unexpected successfull Get()") + } + if fs, _ := m.NeedFiles(); len(fs) != 0 { + t.Errorf("Model len(need) incorrect (%d != 0)", len(fs)) } } @@ -465,7 +474,7 @@ func (f FakeConnection) ID() string { func (FakeConnection) Index([]protocol.FileInfo) {} -func (f FakeConnection) Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) { +func (f FakeConnection) Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) { return f.requestData, nil } diff --git a/model/walk.go b/model/walk.go index 5f0f56410..088669f70 100644 --- a/model/walk.go +++ b/model/walk.go @@ -26,11 +26,16 @@ type File struct { func (f File) Size() (bytes int) { for _, b := range f.Blocks { - bytes += int(b.Length) + bytes += int(b.Size) } return } +func (f File) String() string { + return fmt.Sprintf("File{Name:%q, Flags:0x%x, Modified:%d, Version:%d:, NumBlocks:%d}", + f.Name, f.Flags, f.Modified, f.Version, len(f.Blocks)) +} + func (f File) Equals(o File) bool { return f.Modified == o.Modified && f.Version == o.Version } diff --git a/protocol/common_test.go b/protocol/common_test.go index d5b885dbd..a76e02633 100644 --- a/protocol/common_test.go +++ b/protocol/common_test.go @@ -5,7 +5,7 @@ import "io" type TestModel struct { data []byte name string - offset uint64 + offset int64 size uint32 hash []byte closed bool @@ -17,7 +17,7 @@ func (t *TestModel) Index(nodeID string, files []FileInfo) { func (t *TestModel) IndexUpdate(nodeID string, files []FileInfo) { } -func (t *TestModel) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { +func (t *TestModel) Request(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) { t.name = name t.offset = offset t.size = size diff --git a/protocol/messages.go b/protocol/messages.go index 663711309..9b6fd6da5 100644 --- a/protocol/messages.go +++ b/protocol/messages.go @@ -4,7 +4,7 @@ import "io" type request struct { name string - offset uint64 + offset int64 size uint32 hash []byte } @@ -42,7 +42,7 @@ func (w *marshalWriter) writeIndex(idx []FileInfo) { w.writeUint32(f.Version) w.writeUint32(uint32(len(f.Blocks))) for _, b := range f.Blocks { - w.writeUint32(b.Length) + w.writeUint32(b.Size) w.writeBytes(b.Hash) } } @@ -56,7 +56,7 @@ func WriteIndex(w io.Writer, idx []FileInfo) (int, error) { func (w *marshalWriter) writeRequest(r request) { w.writeString(r.name) - w.writeUint64(r.offset) + w.writeUint64(uint64(r.offset)) w.writeUint32(r.size) w.writeBytes(r.hash) } @@ -82,7 +82,7 @@ func (r *marshalReader) readIndex() []FileInfo { nblocks := r.readUint32() blocks := make([]BlockInfo, nblocks) for j := range blocks { - blocks[j].Length = r.readUint32() + blocks[j].Size = r.readUint32() blocks[j].Hash = r.readBytes() } files[i].Blocks = blocks @@ -100,7 +100,7 @@ func ReadIndex(r io.Reader) ([]FileInfo, error) { func (r *marshalReader) readRequest() request { var req request req.name = r.readString() - req.offset = r.readUint64() + req.offset = int64(r.readUint64()) req.size = r.readUint32() req.hash = r.readBytes() return req diff --git a/protocol/messages_test.go b/protocol/messages_test.go index 46e4fecac..d1c4ee83c 100644 --- a/protocol/messages_test.go +++ b/protocol/messages_test.go @@ -46,7 +46,7 @@ func TestIndex(t *testing.T) { } func TestRequest(t *testing.T) { - f := func(name string, offset uint64, size uint32, hash []byte) bool { + f := func(name string, offset int64, size uint32, hash []byte) bool { var buf = new(bytes.Buffer) var req = request{name, offset, size, hash} var wr = marshalWriter{w: buf} diff --git a/protocol/protocol.go b/protocol/protocol.go index d059d9283..8eb9d0676 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -34,8 +34,8 @@ type FileInfo struct { } type BlockInfo struct { - Length uint32 - Hash []byte + Size uint32 + Hash []byte } type Model interface { @@ -44,7 +44,7 @@ type Model interface { // An index update was received from the peer node IndexUpdate(nodeID string, files []FileInfo) // A request was made by the peer node - Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) + Request(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) // The peer node closed the connection Close(nodeID string, err error) } @@ -150,7 +150,7 @@ func (c *Connection) Index(idx []FileInfo) { } // Request returns the bytes for the specified block after fetching them from the connected peer. -func (c *Connection) Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) { +func (c *Connection) Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) { c.Lock() if c.closed { c.Unlock()