Serialize incoming indexes (fixes #344)

This commit is contained in:
Jakob Borg 2014-06-10 16:03:22 +02:00
parent 70fc8a3064
commit 38ac4e8f79

View File

@ -128,6 +128,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
closed: make(chan struct{}),
}
go c.indexSerializerLoop()
go c.readerLoop()
go c.writerLoop()
go c.pingerLoop()
@ -285,6 +286,31 @@ func (c *rawConnection) readerLoop() (err error) {
}
}
type incomingIndex struct {
update bool
id string
repo string
files []FileInfo
}
var incomingIndexes = make(chan incomingIndex, 100) // should be enough for anyone, right?
func (c *rawConnection) indexSerializerLoop() {
// We must avoid blocking the reader loop when processing large indexes.
// There is otherwise a potential deadlock where both sides has the model
// locked because it's sending a large index update and can't receive the
// large index update from the other side. But we must also ensure to
// process the indexes in the order they are received, hence the separate
// routine and buffered channel.
for ii := range incomingIndexes {
if ii.update {
c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
} else {
c.receiver.Index(ii.id, ii.repo, ii.files)
}
}
}
func (c *rawConnection) handleIndex() error {
var im IndexMessage
im.decodeXDR(c.xr)
@ -299,7 +325,7 @@ func (c *rawConnection) handleIndex() error {
// update and can't receive the large index update from the
// other side.
go c.receiver.Index(c.id, im.Repository, im.Files)
incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files}
}
return nil
}
@ -310,7 +336,7 @@ func (c *rawConnection) handleIndexUpdate() error {
if err := c.xr.Error(); err != nil {
return err
} else {
go c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files}
}
return nil
}