Index Updates

This commit is contained in:
Jakob Borg 2013-12-28 08:10:36 -05:00
parent cf04e101b9
commit 74c27ad4e2
4 changed files with 149 additions and 35 deletions

View File

@ -9,11 +9,10 @@ The model has read and write locks. These must be acquired as appropriate by
public methods. To prevent deadlock situations, private methods should never public methods. To prevent deadlock situations, private methods should never
acquire locks, but document what locks they require. acquire locks, but document what locks they require.
TODO(jb): Keep global and per node transfer and performance statistics.
*/ */
import ( import (
"errors"
"fmt" "fmt"
"os" "os"
"path" "path"
@ -38,15 +37,16 @@ type Model struct {
lastIdxBcastRequest time.Time lastIdxBcastRequest time.Time
} }
var (
errNoSuchNode = errors.New("no such node")
)
const ( const (
RemoteFetchers = 4 RemoteFetchers = 4
FlagDeleted = 1 << 12 FlagDeleted = 1 << 12
// Index is broadcasted when a broadcast has been requested and the index idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
// has been quiscent for idxBcastHoldtime, or it was at least idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
// idxBcastMaxDelay since we last sent an index.
idxBcastHoldtime = 15 * time.Second
idxBcastMaxDelay = 120 * time.Second
) )
func NewModel(dir string) *Model { func NewModel(dir string) *Model {
@ -111,21 +111,32 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
// Files marked as deleted do not even enter the model // Files marked as deleted do not even enter the model
continue continue
} }
mf := File{ m.remote[nodeID][f.Name] = fileFromProtocol(f)
Name: f.Name, }
Flags: f.Flags,
Modified: int64(f.Modified), m.recomputeGlobal()
m.recomputeNeed()
}
func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
if opts.Debug.TraceNet {
debugf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
}
repo, ok := m.remote[nodeID]
if !ok {
return
}
for _, f := range fs {
if f.Flags&FlagDeleted != 0 && !opts.Delete {
// Files marked as deleted do not even enter the model
continue
} }
var offset uint64 repo[f.Name] = fileFromProtocol(f)
for _, b := range f.Blocks {
mf.Blocks = append(mf.Blocks, Block{
Offset: offset,
Length: b.Length,
Hash: b.Hash,
})
offset += uint64(b.Length)
}
m.remote[nodeID][f.Name] = mf
} }
m.recomputeGlobal() m.recomputeGlobal()
@ -196,8 +207,11 @@ func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []
func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
m.RLock() m.RLock()
nc := m.nodes[nodeID] nc, ok := m.nodes[nodeID]
m.RUnlock() m.RUnlock()
if !ok {
return nil, errNoSuchNode
}
if opts.Debug.TraceNet { if opts.Debug.TraceNet {
debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
@ -242,22 +256,29 @@ func (m *Model) broadcastIndexLoop() {
m.RLock() m.RLock()
bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast) bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
m.RUnlock()
maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
if bcastRequested && (holdtimeExceeded || maxDelayExceeded) { if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
m.Lock()
var indexWg sync.WaitGroup
indexWg.Add(len(m.nodes))
idx := m.protocolIndex() idx := m.protocolIndex()
m.lastIdxBcast = time.Now()
for _, node := range m.nodes { for _, node := range m.nodes {
node := node node := node
if opts.Debug.TraceNet { if opts.Debug.TraceNet {
debugf("NET IDX(out): %s: %d files", node.ID, len(idx)) debugf("NET IDX(out): %s: %d files", node.ID, len(idx))
} }
go node.Index(idx) go func() {
node.Index(idx)
indexWg.Done()
}()
} }
// We write here without the write lock because we are the only m.Unlock()
// goroutine that accesses lastIdxBcast indexWg.Wait()
m.lastIdxBcast = time.Now()
} }
m.RUnlock() time.Sleep(idxBcastHoldtime)
time.Sleep(idxBcastHoldtime / 2)
} }
} }
@ -428,3 +449,21 @@ func (m *Model) AddNode(node *protocol.Connection) {
// uplink. Return from AddNode in the meantime. // uplink. Return from AddNode in the meantime.
go node.Index(idx) go node.Index(idx)
} }
func fileFromProtocol(f protocol.FileInfo) File {
mf := File{
Name: f.Name,
Flags: f.Flags,
Modified: int64(f.Modified),
}
var offset uint64
for _, b := range f.Blocks {
mf.Blocks = append(mf.Blocks, Block{
Offset: offset,
Length: b.Length,
Hash: b.Hash,
})
offset += uint64(b.Length)
}
return mf
}

View File

@ -139,6 +139,39 @@ func TestRemoteUpdateOld(t *testing.T) {
} }
} }
func TestRemoteIndexUpdate(t *testing.T) {
m := NewModel("foo")
fs := Walk("testdata", m, false)
m.ReplaceLocal(fs)
foo := protocol.FileInfo{
Name: "foo",
Modified: time.Now().Unix(),
Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}},
}
bar := protocol.FileInfo{
Name: "bar",
Modified: time.Now().Unix(),
Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}},
}
m.Index("42", []protocol.FileInfo{foo})
if _, ok := m.need["foo"]; !ok {
t.Error("Model doesn't need 'foo'")
}
m.IndexUpdate("42", []protocol.FileInfo{bar})
if _, ok := m.need["foo"]; !ok {
t.Error("Model doesn't need 'foo'")
}
if _, ok := m.need["bar"]; !ok {
t.Error("Model doesn't need 'bar'")
}
}
func TestDelete(t *testing.T) { func TestDelete(t *testing.T) {
m := NewModel("foo") m := NewModel("foo")
fs := Walk("testdata", m, false) fs := Walk("testdata", m, false)

View File

@ -177,6 +177,14 @@ contents, but copies the Message ID from the Ping.
struct PongMessage { struct PongMessage {
} }
### IndexUpdate (Type = 6)
This message has exactly the same structure as the Index message.
However instead of replacing the contents of the repository in the
model, the Index Update merely amends it with new or updated file
information. Any files not mentioned in an Index Update are left
unchanged.
Example Exchange Example Exchange
---------------- ----------------

View File

@ -12,12 +12,12 @@ import (
) )
const ( const (
messageTypeReserved = iota messageTypeIndex = 1
messageTypeIndex messageTypeRequest = 2
messageTypeRequest messageTypeResponse = 3
messageTypeResponse messageTypePing = 4
messageTypePing messageTypePong = 5
messageTypePong messageTypeIndexUpdate = 6
) )
type FileInfo struct { type FileInfo struct {
@ -35,6 +35,8 @@ type BlockInfo struct {
type Model interface { type Model interface {
// An index was received from the peer node // An index was received from the peer node
Index(nodeID string, files []FileInfo) Index(nodeID string, files []FileInfo)
// An index update was received from the peer node
IndexUpdate(nodeID string, files []FileInfo)
// A request was made by the peer node // A request was made by the peer node
Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error)
// The peer node closed the connection // The peer node closed the connection
@ -55,6 +57,7 @@ type Connection struct {
lastReceive time.Time lastReceive time.Time
peerLatency time.Duration peerLatency time.Duration
lastStatistics Statistics lastStatistics Statistics
lastIndexSent map[string]FileInfo
} }
var ErrClosed = errors.New("Connection closed") var ErrClosed = errors.New("Connection closed")
@ -95,7 +98,30 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
// Index writes the list of file information to the connected peer node // Index writes the list of file information to the connected peer node
func (c *Connection) Index(idx []FileInfo) { func (c *Connection) Index(idx []FileInfo) {
c.Lock() c.Lock()
c.mwriter.writeHeader(header{0, c.nextId, messageTypeIndex})
var msgType int
if c.lastIndexSent == nil {
// This is the first time we send an index.
msgType = messageTypeIndex
c.lastIndexSent = make(map[string]FileInfo)
for _, f := range idx {
c.lastIndexSent[f.Name] = f
}
} else {
// We have sent one full index. Only send updates now.
msgType = messageTypeIndexUpdate
var diff []FileInfo
for _, f := range idx {
if ef, ok := c.lastIndexSent[f.Name]; !ok || ef.Modified != f.Modified {
diff = append(diff, f)
c.lastIndexSent[f.Name] = f
}
}
idx = diff
}
c.mwriter.writeHeader(header{0, c.nextId, msgType})
c.mwriter.writeIndex(idx) c.mwriter.writeIndex(idx)
err := c.flush() err := c.flush()
c.nextId = (c.nextId + 1) & 0xfff c.nextId = (c.nextId + 1) & 0xfff
@ -215,6 +241,14 @@ func (c *Connection) readerLoop() {
c.receiver.Index(c.ID, files) c.receiver.Index(c.ID, files)
} }
case messageTypeIndexUpdate:
files := c.mreader.readIndex()
if c.mreader.err != nil {
c.close()
} else {
c.receiver.IndexUpdate(c.ID, files)
}
case messageTypeRequest: case messageTypeRequest:
c.processRequest(hdr.msgID) c.processRequest(hdr.msgID)
if c.mreader.err != nil || c.mwriter.err != nil { if c.mreader.err != nil || c.mwriter.err != nil {