diff --git a/protocol/protocol.go b/protocol/protocol.go index 5dbc20cbd..ebcda5258 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -11,6 +11,7 @@ import ( "io" "sync" "time" + "github.com/calmh/syncthing/xdr" ) @@ -95,15 +96,6 @@ type rawConnection struct { outbox chan []encodable closed chan struct{} once sync.Once - - incomingIndexes chan incomingIndex -} - -type incomingIndex struct { - update bool - id NodeID - repo string - files []FileInfo } type asyncResult struct { @@ -124,23 +116,21 @@ func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver M wb := bufio.NewWriterSize(cw, 65536) c := rawConnection{ - id: nodeID, - name: name, - receiver: nativeModel{receiver}, - state: stateInitial, - cr: cr, - xr: xdr.NewReader(rb), - cw: cw, - wb: wb, - xw: xdr.NewWriter(wb), - awaiting: make([]chan asyncResult, 0x1000), - outbox: make(chan []encodable), - nextID: make(chan int), - closed: make(chan struct{}), - incomingIndexes: make(chan incomingIndex, 100), // should be enough for anyone, right? + id: nodeID, + name: name, + receiver: nativeModel{receiver}, + state: stateInitial, + cr: cr, + xr: xdr.NewReader(rb), + cw: cw, + wb: wb, + xw: xdr.NewWriter(wb), + awaiting: make([]chan asyncResult, 0x1000), + outbox: make(chan []encodable), + nextID: make(chan int), + closed: make(chan struct{}), } - go c.indexSerializerLoop() go c.readerLoop() go c.writerLoop() go c.pingerLoop() @@ -316,51 +306,16 @@ func (c *rawConnection) readerLoop() (err error) { } } -func (c *rawConnection) indexSerializerLoop() { - // We must avoid blocking the reader loop when processing large indexes. - // There is otherwise a potential deadlock where both sides has the model - // locked because it's sending a large index update and can't receive the - // large index update from the other side. But we must also ensure to - // process the indexes in the order they are received, hence the separate - // routine and buffered channel. - for { - select { - case ii := <-c.incomingIndexes: - if ii.update { - if debug { - l.Debugf("calling IndexUpdate(%v, %v, %d files)", ii.id, ii.repo, len(ii.files)) - } - c.receiver.IndexUpdate(ii.id, ii.repo, ii.files) - } else { - if debug { - l.Debugf("calling Index(%v, %v, %d files)", ii.id, ii.repo, len(ii.files)) - } - c.receiver.Index(ii.id, ii.repo, ii.files) - } - case <-c.closed: - return - } - } -} - func (c *rawConnection) handleIndex() error { var im IndexMessage im.decodeXDR(c.xr) if err := c.xr.Error(); err != nil { return err } else { - - // We run this (and the corresponding one for update, below) - // in a separate goroutine to avoid blocking the read loop. - // There is otherwise a potential deadlock where both sides - // has the model locked because it's sending a large index - // update and can't receive the large index update from the - // other side. - if debug { - l.Debugf("queueing Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) + l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) } - c.incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files} + c.receiver.Index(c.id, im.Repository, im.Files) } return nil } @@ -374,7 +329,7 @@ func (c *rawConnection) handleIndexUpdate() error { if debug { l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) } - c.incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files} + c.receiver.IndexUpdate(c.id, im.Repository, im.Files) } return nil }