From 2935aebe53435f3e7dd36dd739b6e7e601ba18d3 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 9 Jan 2014 13:58:35 +0100 Subject: [PATCH] Benchmarking --- main.go | 6 ++- model/model.go | 68 +++++++++++++---------- model/model_test.go | 111 ++++++++++++++++++++++++++++++++++++++ protocol/protocol.go | 63 +++++++++++----------- protocol/protocol_test.go | 10 ++-- 5 files changed, 191 insertions(+), 67 deletions(-) diff --git a/main.go b/main.go index 376902794..34411a04b 100644 --- a/main.go +++ b/main.go @@ -282,7 +282,8 @@ listen: for nodeID := range nodeAddrs { if nodeID == remoteID { - m.AddConnection(conn, remoteID) + protoConn := protocol.NewConnection(remoteID, conn, conn, m) + m.AddConnection(conn, protoConn) continue listen } } @@ -351,7 +352,8 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M continue } - m.AddConnection(conn, remoteID) + protoConn := protocol.NewConnection(remoteID, conn, conn, m) + m.AddConnection(conn, protoConn) continue nextNode } } diff --git a/model/model.go b/model/model.go index d23949c1f..24fca3d4d 100644 --- a/model/model.go +++ b/model/model.go @@ -31,12 +31,12 @@ type Model struct { sync.RWMutex dir string - global map[string]File // the latest version of each file as it exists in the cluster - local map[string]File // the files we currently have locally on disk - remote map[string]map[string]File - need map[string]bool // the files we need to update - nodes map[string]*protocol.Connection - rawConn map[string]io.ReadWriteCloser + global map[string]File // the latest version of each file as it exists in the cluster + local map[string]File // the files we currently have locally on disk + remote map[string]map[string]File + need map[string]bool // the files we need to update + protoConn map[string]Connection + rawConn map[string]io.Closer updatedLocal int64 // timestamp of last update to local updateGlobal int64 // timestamp of last update to remote @@ -55,6 +55,13 @@ type Model struct { fileWasSuppressed map[string]int } +type Connection interface { + ID() string + Index([]protocol.FileInfo) + Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) + Statistics() protocol.Statistics +} + const ( idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long @@ -78,8 +85,8 @@ func NewModel(dir string) *Model { local: make(map[string]File), remote: make(map[string]map[string]File), need: make(map[string]bool), - nodes: make(map[string]*protocol.Connection), - rawConn: make(map[string]io.ReadWriteCloser), + protoConn: make(map[string]Connection), + rawConn: make(map[string]io.Closer), lastIdxBcast: time.Now(), trace: make(map[string]bool), fileLastChanged: make(map[string]time.Time), @@ -141,7 +148,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { defer m.RUnlock() var res = make(map[string]ConnectionInfo) - for node, conn := range m.nodes { + for node, conn := range m.protoConn { ci := ConnectionInfo{ Statistics: conn.Statistics(), } @@ -288,7 +295,7 @@ func (m *Model) Close(node string, err error) { } delete(m.remote, node) - delete(m.nodes, node) + delete(m.protoConn, node) delete(m.rawConn, node) m.recomputeGlobal() @@ -383,7 +390,7 @@ func (m *Model) SeedLocal(fs []protocol.FileInfo) { func (m *Model) ConnectedTo(nodeID string) bool { m.RLock() defer m.RUnlock() - _, ok := m.nodes[nodeID] + _, ok := m.protoConn[nodeID] return ok } @@ -402,12 +409,11 @@ func (m *Model) RepoID() string { // AddConnection adds a new peer connection to the model. An initial index will // be sent to the connected peer, thereafter index updates whenever the local // repository changes. -func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) { - node := protocol.NewConnection(nodeID, conn, conn, m) - +func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { + nodeID := protoConn.ID() m.Lock() - m.nodes[nodeID] = node - m.rawConn[nodeID] = conn + m.protoConn[nodeID] = protoConn + m.rawConn[nodeID] = rawConn m.Unlock() m.RLock() @@ -415,7 +421,7 @@ func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) { m.RUnlock() go func() { - node.Index(idx) + protoConn.Index(idx) }() } @@ -461,7 +467,7 @@ func (m *Model) protocolIndex() []protocol.FileInfo { func (m *Model) requestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { m.RLock() - nc, ok := m.nodes[nodeID] + nc, ok := m.protoConn[nodeID] m.RUnlock() if !ok { return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID) @@ -485,10 +491,10 @@ func (m *Model) broadcastIndexLoop() { if bcastRequested && (holdtimeExceeded || maxDelayExceeded) { m.Lock() var indexWg sync.WaitGroup - indexWg.Add(len(m.nodes)) + indexWg.Add(len(m.protoConn)) idx := m.protocolIndex() m.lastIdxBcast = time.Now() - for _, node := range m.nodes { + for _, node := range m.protoConn { node := node if m.trace["net"] { log.Printf("NET IDX(out/loop): %s: %d files", node.ID, len(idx)) @@ -547,10 +553,14 @@ func (m *Model) recomputeGlobal() { newGlobal[n] = f } + var highestMod int64 for _, fs := range m.remote { for n, nf := range fs { if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) { newGlobal[n] = nf + if nf.Modified > highestMod { + highestMod = nf.Modified + } } } } @@ -558,7 +568,7 @@ func (m *Model) recomputeGlobal() { // Figure out if anything actually changed var updated bool - if len(newGlobal) != len(m.global) { + if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) { updated = true } else { for n, f0 := range newGlobal { @@ -616,14 +626,14 @@ func (m *Model) whoHas(name string) []string { } func fileFromFileInfo(f protocol.FileInfo) File { - var blocks []Block + var blocks = make([]Block, len(f.Blocks)) var offset uint64 - for _, b := range f.Blocks { - blocks = append(blocks, Block{ + for i, b := range f.Blocks { + blocks[i] = Block{ Offset: offset, Length: b.Length, Hash: b.Hash, - }) + } offset += uint64(b.Length) } return File{ @@ -636,12 +646,12 @@ func fileFromFileInfo(f protocol.FileInfo) File { } func fileInfoFromFile(f File) protocol.FileInfo { - var blocks []protocol.BlockInfo - for _, b := range f.Blocks { - blocks = append(blocks, protocol.BlockInfo{ + var blocks = make([]protocol.BlockInfo, len(f.Blocks)) + for i, b := range f.Blocks { + blocks[i] = protocol.BlockInfo{ Length: b.Length, Hash: b.Hash, - }) + } } return protocol.FileInfo{ Name: f.Name, diff --git a/model/model_test.go b/model/model_test.go index cb2cae838..472b92317 100644 --- a/model/model_test.go +++ b/model/model_test.go @@ -2,6 +2,7 @@ package model import ( "bytes" + "fmt" "os" "reflect" "testing" @@ -406,3 +407,113 @@ func TestIgnoreWithUnknownFlags(t *testing.T) { t.Error("Model not should include", invalid) } } + +func prepareModel(n int, m *Model) []protocol.FileInfo { + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + files := make([]protocol.FileInfo, n) + t := time.Now().Unix() + for i := 0; i < n; i++ { + files[i] = protocol.FileInfo{ + Name: fmt.Sprintf("file%d", i), + Modified: t, + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + } + + m.Index("42", files) + return files +} + +func BenchmarkRecomputeGlobal10k(b *testing.B) { + m := NewModel("testdata") + prepareModel(10000, m) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.recomputeGlobal() + } +} + +func BenchmarkRecomputeNeed10K(b *testing.B) { + m := NewModel("testdata") + prepareModel(10000, m) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.recomputeNeed() + } +} + +func BenchmarkIndexUpdate10000(b *testing.B) { + m := NewModel("testdata") + files := prepareModel(10000, m) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.IndexUpdate("42", files) + } +} + +type FakeConnection struct { + id string + requestData []byte +} + +func (FakeConnection) Close() error { + return nil +} + +func (f FakeConnection) ID() string { + return string(f.id) +} + +func (FakeConnection) Index([]protocol.FileInfo) {} + +func (f FakeConnection) Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) { + return f.requestData, nil +} + +func (FakeConnection) Ping() bool { + return true +} + +func (FakeConnection) Statistics() protocol.Statistics { + return protocol.Statistics{} +} + +func BenchmarkRequest(b *testing.B) { + m := NewModel("testdata") + fs, _ := m.Walk(false) + m.ReplaceLocal(fs) + + const n = 1000 + files := make([]protocol.FileInfo, n) + t := time.Now().Unix() + for i := 0; i < n; i++ { + files[i] = protocol.FileInfo{ + Name: fmt.Sprintf("file%d", i), + Modified: t, + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + } + + fc := FakeConnection{ + id: "42", + requestData: []byte("some data to return"), + } + m.AddConnection(fc, fc) + m.Index("42", files) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + data, err := m.requestGlobal("42", files[i%n].Name, 0, 32, nil) + if err != nil { + b.Error(err) + } + if data == nil { + b.Error("nil data") + } + } +} diff --git a/protocol/protocol.go b/protocol/protocol.go index 146a16784..d059d9283 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -52,7 +52,7 @@ type Model interface { type Connection struct { sync.RWMutex - ID string + id string receiver Model reader io.Reader mreader *marshalReader @@ -89,13 +89,13 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M } c := Connection{ + id: nodeID, receiver: receiver, reader: flrd, mreader: &marshalReader{r: flrd}, writer: flwr, mwriter: &marshalWriter{w: flwr}, awaiting: make(map[int]chan asyncResult), - ID: nodeID, } go c.readerLoop() @@ -104,6 +104,10 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M return &c } +func (c *Connection) ID() string { + return c.id +} + // Index writes the list of file information to the connected peer node func (c *Connection) Index(idx []FileInfo) { c.Lock() @@ -137,10 +141,10 @@ func (c *Connection) Index(idx []FileInfo) { c.Unlock() if err != nil { - c.Close(err) + c.close(err) return } else if c.mwriter.err != nil { - c.Close(c.mwriter.err) + c.close(c.mwriter.err) return } } @@ -158,13 +162,13 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt c.mwriter.writeRequest(request{name, offset, size, hash}) if c.mwriter.err != nil { c.Unlock() - c.Close(c.mwriter.err) + c.close(c.mwriter.err) return nil, c.mwriter.err } err := c.flush() if err != nil { c.Unlock() - c.Close(err) + c.close(err) return nil, err } c.nextId = (c.nextId + 1) & 0xfff @@ -177,7 +181,7 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt return res.val, res.err } -func (c *Connection) Ping() bool { +func (c *Connection) ping() bool { c.Lock() if c.closed { c.Unlock() @@ -189,11 +193,11 @@ func (c *Connection) Ping() bool { err := c.flush() if err != nil { c.Unlock() - c.Close(err) + c.close(err) return false } else if c.mwriter.err != nil { c.Unlock() - c.Close(c.mwriter.err) + c.close(c.mwriter.err) return false } c.nextId = (c.nextId + 1) & 0xfff @@ -203,9 +207,6 @@ func (c *Connection) Ping() bool { return ok && res.err == nil } -func (c *Connection) Stop() { -} - type flusher interface { Flush() error } @@ -217,7 +218,7 @@ func (c *Connection) flush() error { return nil } -func (c *Connection) Close(err error) { +func (c *Connection) close(err error) { c.Lock() if c.closed { c.Unlock() @@ -230,7 +231,7 @@ func (c *Connection) Close(err error) { c.awaiting = nil c.Unlock() - c.receiver.Close(c.ID, err) + c.receiver.Close(c.id, err) } func (c *Connection) isClosed() bool { @@ -244,11 +245,11 @@ loop: for { hdr := c.mreader.readHeader() if c.mreader.err != nil { - c.Close(c.mreader.err) + c.close(c.mreader.err) break loop } if hdr.version != 0 { - c.Close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version)) + c.close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version)) break loop } @@ -256,10 +257,10 @@ loop: case messageTypeIndex: files := c.mreader.readIndex() if c.mreader.err != nil { - c.Close(c.mreader.err) + c.close(c.mreader.err) break loop } else { - c.receiver.Index(c.ID, files) + c.receiver.Index(c.id, files) } c.Lock() c.hasRecvdIndex = true @@ -268,16 +269,16 @@ loop: case messageTypeIndexUpdate: files := c.mreader.readIndex() if c.mreader.err != nil { - c.Close(c.mreader.err) + c.close(c.mreader.err) break loop } else { - c.receiver.IndexUpdate(c.ID, files) + c.receiver.IndexUpdate(c.id, files) } case messageTypeRequest: req := c.mreader.readRequest() if c.mreader.err != nil { - c.Close(c.mreader.err) + c.close(c.mreader.err) break loop } go c.processRequest(hdr.msgID, req) @@ -286,7 +287,7 @@ loop: data := c.mreader.readResponse() if c.mreader.err != nil { - c.Close(c.mreader.err) + c.close(c.mreader.err) break loop } else { c.Lock() @@ -306,10 +307,10 @@ loop: err := c.flush() c.Unlock() if err != nil { - c.Close(err) + c.close(err) break loop } else if c.mwriter.err != nil { - c.Close(c.mwriter.err) + c.close(c.mwriter.err) break loop } @@ -328,14 +329,14 @@ loop: } default: - c.Close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType)) + c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType)) break loop } } } func (c *Connection) processRequest(msgID int, req request) { - data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash) + data, _ := c.receiver.Request(c.id, req.name, req.offset, req.size, req.hash) c.Lock() c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse})) @@ -345,9 +346,9 @@ func (c *Connection) processRequest(msgID int, req request) { buffers.Put(data) if err != nil { - c.Close(err) + c.close(err) } else if c.mwriter.err != nil { - c.Close(c.mwriter.err) + c.close(c.mwriter.err) } } @@ -362,15 +363,15 @@ func (c *Connection) pingerLoop() { if ready { go func() { - rc <- c.Ping() + rc <- c.ping() }() select { case ok := <-rc: if !ok { - c.Close(fmt.Errorf("Ping failure")) + c.close(fmt.Errorf("Ping failure")) } case <-time.After(pingTimeout): - c.Close(fmt.Errorf("Ping timeout")) + c.close(fmt.Errorf("Ping timeout")) } } } diff --git a/protocol/protocol_test.go b/protocol/protocol_test.go index 9a037ff3a..3cd0f7203 100644 --- a/protocol/protocol_test.go +++ b/protocol/protocol_test.go @@ -46,10 +46,10 @@ func TestPing(t *testing.T) { c0 := NewConnection("c0", ar, bw, nil) c1 := NewConnection("c1", br, aw, nil) - if ok := c0.Ping(); !ok { + if ok := c0.ping(); !ok { t.Error("c0 ping failed") } - if ok := c1.Ping(); !ok { + if ok := c1.ping(); !ok { t.Error("c1 ping failed") } } @@ -70,7 +70,7 @@ func TestPingErr(t *testing.T) { c0 := NewConnection("c0", ar, ebw, m0) NewConnection("c1", br, eaw, m1) - res := c0.Ping() + res := c0.ping() if (i < 4 || j < 4) && res { t.Errorf("Unexpected ping success; i=%d, j=%d", i, j) } else if (i >= 8 && j >= 8) && !res { @@ -190,7 +190,7 @@ func TestClose(t *testing.T) { c0 := NewConnection("c0", ar, bw, m0) NewConnection("c1", br, aw, m1) - c0.Close(nil) + c0.close(nil) ok := c0.isClosed() if !ok { @@ -199,7 +199,7 @@ func TestClose(t *testing.T) { // None of these should panic, some should return an error - ok = c0.Ping() + ok = c0.ping() if ok { t.Error("Ping should not return true") }