mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 22:58:25 +00:00
Use sync.Pool for response buffers
This commit is contained in:
parent
22e24fc387
commit
ebcdea63c0
@ -31,7 +31,7 @@ func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo, fl
|
||||
func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
|
||||
}
|
||||
|
||||
func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
|
||||
func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option, buf []byte) error {
|
||||
t.folder = folder
|
||||
t.name = name
|
||||
t.offset = offset
|
||||
@ -39,7 +39,8 @@ func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64
|
||||
t.hash = hash
|
||||
t.flags = flags
|
||||
t.options = options
|
||||
return t.data, nil
|
||||
copy(buf, t.data)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestModel) Close(deviceID DeviceID, err error) {
|
||||
|
@ -26,9 +26,9 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI
|
||||
m.next.IndexUpdate(deviceID, folder, files, flags, options)
|
||||
}
|
||||
|
||||
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
|
||||
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
|
||||
name = norm.NFD.String(name)
|
||||
return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options)
|
||||
return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
|
||||
}
|
||||
|
||||
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
|
||||
|
@ -18,8 +18,8 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI
|
||||
m.next.IndexUpdate(deviceID, folder, files, flags, options)
|
||||
}
|
||||
|
||||
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
|
||||
return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options)
|
||||
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
|
||||
return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
|
||||
}
|
||||
|
||||
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
|
||||
|
@ -34,9 +34,9 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI
|
||||
m.next.IndexUpdate(deviceID, folder, files, flags, options)
|
||||
}
|
||||
|
||||
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
|
||||
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
|
||||
name = filepath.FromSlash(name)
|
||||
return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options)
|
||||
return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
|
||||
}
|
||||
|
||||
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
|
||||
|
91
protocol.go
91
protocol.go
@ -81,7 +81,7 @@ type Model interface {
|
||||
// An index update was received from the peer device
|
||||
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
|
||||
// A request was made by the peer device
|
||||
Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error)
|
||||
Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error
|
||||
// A cluster configuration message was received
|
||||
ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
|
||||
// The peer device closed the connection
|
||||
@ -112,11 +112,11 @@ type rawConnection struct {
|
||||
|
||||
idxMut sync.Mutex // ensures serialization of Index calls
|
||||
|
||||
nextID chan int
|
||||
outbox chan hdrMsg
|
||||
closed chan struct{}
|
||||
once sync.Once
|
||||
|
||||
nextID chan int
|
||||
outbox chan hdrMsg
|
||||
closed chan struct{}
|
||||
once sync.Once
|
||||
pool sync.Pool
|
||||
compression Compression
|
||||
|
||||
rdbuf0 []byte // used & reused by readMessage
|
||||
@ -129,8 +129,9 @@ type asyncResult struct {
|
||||
}
|
||||
|
||||
type hdrMsg struct {
|
||||
hdr header
|
||||
msg encodable
|
||||
hdr header
|
||||
msg encodable
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
type encodable interface {
|
||||
@ -151,14 +152,19 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
|
||||
cw := &countingWriter{Writer: writer}
|
||||
|
||||
c := rawConnection{
|
||||
id: deviceID,
|
||||
name: name,
|
||||
receiver: nativeModel{receiver},
|
||||
cr: cr,
|
||||
cw: cw,
|
||||
outbox: make(chan hdrMsg),
|
||||
nextID: make(chan int),
|
||||
closed: make(chan struct{}),
|
||||
id: deviceID,
|
||||
name: name,
|
||||
receiver: nativeModel{receiver},
|
||||
cr: cr,
|
||||
cw: cw,
|
||||
outbox: make(chan hdrMsg),
|
||||
nextID: make(chan int),
|
||||
closed: make(chan struct{}),
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, BlockSize)
|
||||
},
|
||||
},
|
||||
compression: compress,
|
||||
}
|
||||
|
||||
@ -195,7 +201,7 @@ func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, optio
|
||||
Files: idx,
|
||||
Flags: flags,
|
||||
Options: options,
|
||||
})
|
||||
}, nil)
|
||||
c.idxMut.Unlock()
|
||||
return nil
|
||||
}
|
||||
@ -213,7 +219,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32,
|
||||
Files: idx,
|
||||
Flags: flags,
|
||||
Options: options,
|
||||
})
|
||||
}, nil)
|
||||
c.idxMut.Unlock()
|
||||
return nil
|
||||
}
|
||||
@ -243,7 +249,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
|
||||
Hash: hash,
|
||||
Flags: flags,
|
||||
Options: options,
|
||||
})
|
||||
}, nil)
|
||||
if !ok {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
@ -257,7 +263,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
|
||||
|
||||
// ClusterConfig send the cluster configuration message to the peer and returns any error
|
||||
func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
|
||||
c.send(-1, messageTypeClusterConfig, config)
|
||||
c.send(-1, messageTypeClusterConfig, config, nil)
|
||||
}
|
||||
|
||||
func (c *rawConnection) ping() bool {
|
||||
@ -273,7 +279,7 @@ func (c *rawConnection) ping() bool {
|
||||
c.awaiting[id] = rc
|
||||
c.awaitingMut.Unlock()
|
||||
|
||||
ok := c.send(id, messageTypePing, nil)
|
||||
ok := c.send(id, messageTypePing, nil, nil)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
@ -342,7 +348,7 @@ func (c *rawConnection) readerLoop() (err error) {
|
||||
if state != stateReady {
|
||||
return fmt.Errorf("protocol error: ping message in state %d", state)
|
||||
}
|
||||
c.send(hdr.msgID, messageTypePong, pongMessage{})
|
||||
c.send(hdr.msgID, messageTypePong, pongMessage{}, nil)
|
||||
|
||||
case pongMessage:
|
||||
if state != stateReady {
|
||||
@ -519,12 +525,36 @@ func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
|
||||
}
|
||||
|
||||
func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
|
||||
data, err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size), req.Hash, req.Flags, req.Options)
|
||||
size := int(req.Size)
|
||||
usePool := size <= BlockSize
|
||||
|
||||
c.send(msgID, messageTypeResponse, ResponseMessage{
|
||||
Data: data,
|
||||
Code: errorToCode(err),
|
||||
})
|
||||
var buf []byte
|
||||
var done chan struct{}
|
||||
|
||||
if usePool {
|
||||
buf = c.pool.Get().([]byte)[:size]
|
||||
done = make(chan struct{})
|
||||
} else {
|
||||
buf = make([]byte, size)
|
||||
}
|
||||
|
||||
err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf)
|
||||
if err != nil {
|
||||
c.send(msgID, messageTypeResponse, ResponseMessage{
|
||||
Data: nil,
|
||||
Code: errorToCode(err),
|
||||
}, done)
|
||||
} else {
|
||||
c.send(msgID, messageTypeResponse, ResponseMessage{
|
||||
Data: buf,
|
||||
Code: errorToCode(err),
|
||||
}, done)
|
||||
}
|
||||
|
||||
if usePool {
|
||||
<-done
|
||||
c.pool.Put(buf)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
|
||||
@ -547,7 +577,7 @@ func (c *rawConnection) handlePong(msgID int) {
|
||||
c.awaitingMut.Unlock()
|
||||
}
|
||||
|
||||
func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
|
||||
func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool {
|
||||
if msgID < 0 {
|
||||
select {
|
||||
case id := <-c.nextID:
|
||||
@ -564,7 +594,7 @@ func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
|
||||
}
|
||||
|
||||
select {
|
||||
case c.outbox <- hdrMsg{hdr, msg}:
|
||||
case c.outbox <- hdrMsg{hdr, msg, done}:
|
||||
return true
|
||||
case <-c.closed:
|
||||
return false
|
||||
@ -583,6 +613,9 @@ func (c *rawConnection) writerLoop() {
|
||||
if hm.msg != nil {
|
||||
// Uncompressed message in uncBuf
|
||||
uncBuf, err = hm.msg.AppendXDR(uncBuf[:0])
|
||||
if hm.done != nil {
|
||||
close(hm.done)
|
||||
}
|
||||
if err != nil {
|
||||
c.close(err)
|
||||
return
|
||||
|
Loading…
x
Reference in New Issue
Block a user