From 4a6b43bcaec3f6deaf823e96746f725ef3e77a6b Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 3 Jul 2014 13:37:20 +0200 Subject: [PATCH] Clean up protocol locking and closing --- protocol/protocol.go | 74 ++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 47 deletions(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index 564220695..9977737d3 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -81,15 +81,14 @@ type rawConnection struct { reader io.ReadCloser cr *countingReader xr *xdr.Reader + writer io.WriteCloser + cw *countingWriter + wb *bufio.Writer + xw *xdr.Writer - cw *countingWriter - wb *bufio.Writer - xw *xdr.Writer - wmut sync.Mutex - - awaiting []chan asyncResult - imut sync.Mutex + awaiting []chan asyncResult + awaitingMut sync.Mutex idxSent map[string]map[string]uint64 idxMut sync.Mutex // ensures serialization of Index calls @@ -97,6 +96,7 @@ type rawConnection struct { nextID chan int outbox chan []encodable closed chan struct{} + once sync.Once } type asyncResult struct { @@ -192,13 +192,13 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int return nil, ErrClosed } - c.imut.Lock() + c.awaitingMut.Lock() if ch := c.awaiting[id]; ch != nil { panic("id taken") } - rc := make(chan asyncResult) + rc := make(chan asyncResult, 1) c.awaiting[id] = rc - c.imut.Unlock() + c.awaitingMut.Unlock() ok := c.send(header{0, id, messageTypeRequest}, RequestMessage{repo, name, uint64(offset), uint32(size)}) @@ -227,9 +227,9 @@ func (c *rawConnection) ping() bool { } rc := make(chan asyncResult, 1) - c.imut.Lock() + c.awaitingMut.Lock() c.awaiting[id] = rc - c.imut.Unlock() + c.awaitingMut.Unlock() ok := c.send(header{0, id, messageTypePing}) if !ok { @@ -388,32 +388,25 @@ func (c *rawConnection) handleResponse(hdr header) error { return err } - go func(hdr header, err error) { - c.imut.Lock() - rc := c.awaiting[hdr.msgID] + c.awaitingMut.Lock() + if rc := c.awaiting[hdr.msgID]; rc != nil { c.awaiting[hdr.msgID] = nil - c.imut.Unlock() - - if rc != nil { - rc <- asyncResult{data, err} - close(rc) - } - }(hdr, c.xr.Error()) + rc <- asyncResult{data, nil} + close(rc) + } + c.awaitingMut.Unlock() return nil } func (c *rawConnection) handlePong(hdr header) { - c.imut.Lock() + c.awaitingMut.Lock() if rc := c.awaiting[hdr.msgID]; rc != nil { - go func() { - rc <- asyncResult{} - close(rc) - }() - c.awaiting[hdr.msgID] = nil + rc <- asyncResult{} + close(rc) } - c.imut.Unlock() + c.awaitingMut.Unlock() } func (c *rawConnection) handleClusterConfig() error { @@ -458,17 +451,14 @@ func (c *rawConnection) send(h header, es ...encodable) bool { func (c *rawConnection) writerLoop() { var err error for es := range c.outbox { - c.wmut.Lock() for _, e := range es { e.encodeXDR(c.xw) } if err = c.flush(); err != nil { - c.wmut.Unlock() c.close(err) return } - c.wmut.Unlock() } } @@ -493,29 +483,20 @@ func (c *rawConnection) flush() error { } func (c *rawConnection) close(err error) { - c.imut.Lock() - c.wmut.Lock() - defer c.imut.Unlock() - defer c.wmut.Unlock() - - select { - case <-c.closed: - return - default: + c.once.Do(func() { close(c.closed) + c.awaitingMut.Lock() for i, ch := range c.awaiting { if ch != nil { close(ch) c.awaiting[i] = nil } } - - c.writer.Close() - c.reader.Close() + c.awaitingMut.Unlock() go c.receiver.Close(c.id, err) - } + }) } func (c *rawConnection) idGenerator() { @@ -577,8 +558,7 @@ func (c *rawConnection) pingerLoop() { func (c *rawConnection) processRequest(msgID int, req RequestMessage) { data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) - c.send(header{0, msgID, messageTypeResponse}, - encodableBytes(data)) + c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data)) } type Statistics struct {