From 38ac4e8f79f9faea469f405d060c7002ae0b58e8 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 10 Jun 2014 16:03:22 +0200 Subject: [PATCH] Serialize incoming indexes (fixes #344) --- protocol/protocol.go | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index e93f51a42..8eb6789f5 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -128,6 +128,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M closed: make(chan struct{}), } + go c.indexSerializerLoop() go c.readerLoop() go c.writerLoop() go c.pingerLoop() @@ -285,6 +286,31 @@ func (c *rawConnection) readerLoop() (err error) { } } +type incomingIndex struct { + update bool + id string + repo string + files []FileInfo +} + +var incomingIndexes = make(chan incomingIndex, 100) // should be enough for anyone, right? + +func (c *rawConnection) indexSerializerLoop() { + // We must avoid blocking the reader loop when processing large indexes. + // There is otherwise a potential deadlock where both sides has the model + // locked because it's sending a large index update and can't receive the + // large index update from the other side. But we must also ensure to + // process the indexes in the order they are received, hence the separate + // routine and buffered channel. + for ii := range incomingIndexes { + if ii.update { + c.receiver.IndexUpdate(ii.id, ii.repo, ii.files) + } else { + c.receiver.Index(ii.id, ii.repo, ii.files) + } + } +} + func (c *rawConnection) handleIndex() error { var im IndexMessage im.decodeXDR(c.xr) @@ -299,7 +325,7 @@ func (c *rawConnection) handleIndex() error { // update and can't receive the large index update from the // other side. - go c.receiver.Index(c.id, im.Repository, im.Files) + incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files} } return nil } @@ -310,7 +336,7 @@ func (c *rawConnection) handleIndexUpdate() error { if err := c.xr.Error(); err != nil { return err } else { - go c.receiver.IndexUpdate(c.id, im.Repository, im.Files) + incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files} } return nil }