diff --git a/model.go b/model.go index 1e832010c..1ab0aa2a1 100644 --- a/model.go +++ b/model.go @@ -9,11 +9,10 @@ The model has read and write locks. These must be acquired as appropriate by public methods. To prevent deadlock situations, private methods should never acquire locks, but document what locks they require. -TODO(jb): Keep global and per node transfer and performance statistics. - */ import ( + "errors" "fmt" "os" "path" @@ -38,15 +37,16 @@ type Model struct { lastIdxBcastRequest time.Time } +var ( + errNoSuchNode = errors.New("no such node") +) + const ( RemoteFetchers = 4 FlagDeleted = 1 << 12 - // Index is broadcasted when a broadcast has been requested and the index - // has been quiscent for idxBcastHoldtime, or it was at least - // idxBcastMaxDelay since we last sent an index. - idxBcastHoldtime = 15 * time.Second - idxBcastMaxDelay = 120 * time.Second + idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification + idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long ) func NewModel(dir string) *Model { @@ -111,21 +111,32 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { // Files marked as deleted do not even enter the model continue } - mf := File{ - Name: f.Name, - Flags: f.Flags, - Modified: int64(f.Modified), + m.remote[nodeID][f.Name] = fileFromProtocol(f) + } + + m.recomputeGlobal() + m.recomputeNeed() +} + +func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { + m.Lock() + defer m.Unlock() + + if opts.Debug.TraceNet { + debugf("NET IDXUP(in): %s: %d files", nodeID, len(fs)) + } + + repo, ok := m.remote[nodeID] + if !ok { + return + } + + for _, f := range fs { + if f.Flags&FlagDeleted != 0 && !opts.Delete { + // Files marked as deleted do not even enter the model + continue } - var offset uint64 - for _, b := range f.Blocks { - mf.Blocks = append(mf.Blocks, Block{ - Offset: offset, - Length: b.Length, - Hash: b.Hash, - }) - offset += uint64(b.Length) - } - m.remote[nodeID][f.Name] = mf + repo[f.Name] = fileFromProtocol(f) } m.recomputeGlobal() @@ -196,8 +207,11 @@ func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash [] func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { m.RLock() - nc := m.nodes[nodeID] + nc, ok := m.nodes[nodeID] m.RUnlock() + if !ok { + return nil, errNoSuchNode + } if opts.Debug.TraceNet { debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) @@ -242,22 +256,29 @@ func (m *Model) broadcastIndexLoop() { m.RLock() bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast) holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime + m.RUnlock() + maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay if bcastRequested && (holdtimeExceeded || maxDelayExceeded) { + m.Lock() + var indexWg sync.WaitGroup + indexWg.Add(len(m.nodes)) idx := m.protocolIndex() + m.lastIdxBcast = time.Now() for _, node := range m.nodes { node := node if opts.Debug.TraceNet { debugf("NET IDX(out): %s: %d files", node.ID, len(idx)) } - go node.Index(idx) + go func() { + node.Index(idx) + indexWg.Done() + }() } - // We write here without the write lock because we are the only - // goroutine that accesses lastIdxBcast - m.lastIdxBcast = time.Now() + m.Unlock() + indexWg.Wait() } - m.RUnlock() - time.Sleep(idxBcastHoldtime / 2) + time.Sleep(idxBcastHoldtime) } } @@ -428,3 +449,21 @@ func (m *Model) AddNode(node *protocol.Connection) { // uplink. Return from AddNode in the meantime. go node.Index(idx) } + +func fileFromProtocol(f protocol.FileInfo) File { + mf := File{ + Name: f.Name, + Flags: f.Flags, + Modified: int64(f.Modified), + } + var offset uint64 + for _, b := range f.Blocks { + mf.Blocks = append(mf.Blocks, Block{ + Offset: offset, + Length: b.Length, + Hash: b.Hash, + }) + offset += uint64(b.Length) + } + return mf +} diff --git a/model_test.go b/model_test.go index 1d41cfca5..7b8982254 100644 --- a/model_test.go +++ b/model_test.go @@ -139,6 +139,39 @@ func TestRemoteUpdateOld(t *testing.T) { } } +func TestRemoteIndexUpdate(t *testing.T) { + m := NewModel("foo") + fs := Walk("testdata", m, false) + m.ReplaceLocal(fs) + + foo := protocol.FileInfo{ + Name: "foo", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + + bar := protocol.FileInfo{ + Name: "bar", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + + m.Index("42", []protocol.FileInfo{foo}) + + if _, ok := m.need["foo"]; !ok { + t.Error("Model doesn't need 'foo'") + } + + m.IndexUpdate("42", []protocol.FileInfo{bar}) + + if _, ok := m.need["foo"]; !ok { + t.Error("Model doesn't need 'foo'") + } + if _, ok := m.need["bar"]; !ok { + t.Error("Model doesn't need 'bar'") + } +} + func TestDelete(t *testing.T) { m := NewModel("foo") fs := Walk("testdata", m, false) diff --git a/protocol/PROTOCOL.md b/protocol/PROTOCOL.md index c8bc19198..c00f448d4 100644 --- a/protocol/PROTOCOL.md +++ b/protocol/PROTOCOL.md @@ -177,6 +177,14 @@ contents, but copies the Message ID from the Ping. struct PongMessage { } +### IndexUpdate (Type = 6) + +This message has exactly the same structure as the Index message. +However instead of replacing the contents of the repository in the +model, the Index Update merely amends it with new or updated file +information. Any files not mentioned in an Index Update are left +unchanged. + Example Exchange ---------------- diff --git a/protocol/protocol.go b/protocol/protocol.go index a34f5eb21..e510e9fec 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -12,12 +12,12 @@ import ( ) const ( - messageTypeReserved = iota - messageTypeIndex - messageTypeRequest - messageTypeResponse - messageTypePing - messageTypePong + messageTypeIndex = 1 + messageTypeRequest = 2 + messageTypeResponse = 3 + messageTypePing = 4 + messageTypePong = 5 + messageTypeIndexUpdate = 6 ) type FileInfo struct { @@ -35,6 +35,8 @@ type BlockInfo struct { type Model interface { // An index was received from the peer node Index(nodeID string, files []FileInfo) + // An index update was received from the peer node + IndexUpdate(nodeID string, files []FileInfo) // A request was made by the peer node Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) // The peer node closed the connection @@ -55,6 +57,7 @@ type Connection struct { lastReceive time.Time peerLatency time.Duration lastStatistics Statistics + lastIndexSent map[string]FileInfo } var ErrClosed = errors.New("Connection closed") @@ -95,7 +98,30 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M // Index writes the list of file information to the connected peer node func (c *Connection) Index(idx []FileInfo) { c.Lock() - c.mwriter.writeHeader(header{0, c.nextId, messageTypeIndex}) + + var msgType int + if c.lastIndexSent == nil { + // This is the first time we send an index. + msgType = messageTypeIndex + + c.lastIndexSent = make(map[string]FileInfo) + for _, f := range idx { + c.lastIndexSent[f.Name] = f + } + } else { + // We have sent one full index. Only send updates now. + msgType = messageTypeIndexUpdate + var diff []FileInfo + for _, f := range idx { + if ef, ok := c.lastIndexSent[f.Name]; !ok || ef.Modified != f.Modified { + diff = append(diff, f) + c.lastIndexSent[f.Name] = f + } + } + idx = diff + } + + c.mwriter.writeHeader(header{0, c.nextId, msgType}) c.mwriter.writeIndex(idx) err := c.flush() c.nextId = (c.nextId + 1) & 0xfff @@ -215,6 +241,14 @@ func (c *Connection) readerLoop() { c.receiver.Index(c.ID, files) } + case messageTypeIndexUpdate: + files := c.mreader.readIndex() + if c.mreader.err != nil { + c.close() + } else { + c.receiver.IndexUpdate(c.ID, files) + } + case messageTypeRequest: c.processRequest(hdr.msgID) if c.mreader.err != nil || c.mwriter.err != nil {