Merge pull request #2100 from AudriusButkevicius/memory

Use protocol provided buffer for requests (fixes #1157)
This commit is contained in:
Jakob Borg 2015-08-02 08:07:52 +02:00
commit bbe1de3119
8 changed files with 99 additions and 86 deletions

2
Godeps/Godeps.json generated
View File

@ -35,7 +35,7 @@
}, },
{ {
"ImportPath": "github.com/syncthing/protocol", "ImportPath": "github.com/syncthing/protocol",
"Rev": "22e24fc3879b1665077389f96862e222b2cdd8d3" "Rev": "ebcdea63c07327a342f65415bbadc497462b8f1f"
}, },
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",

View File

@ -31,7 +31,7 @@ func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo, fl
func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
} }
func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option, buf []byte) error {
t.folder = folder t.folder = folder
t.name = name t.name = name
t.offset = offset t.offset = offset
@ -39,7 +39,8 @@ func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64
t.hash = hash t.hash = hash
t.flags = flags t.flags = flags
t.options = options t.options = options
return t.data, nil copy(buf, t.data)
return nil
} }
func (t *TestModel) Close(deviceID DeviceID, err error) { func (t *TestModel) Close(deviceID DeviceID, err error) {

View File

@ -26,9 +26,9 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI
m.next.IndexUpdate(deviceID, folder, files, flags, options) m.next.IndexUpdate(deviceID, folder, files, flags, options)
} }
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
name = norm.NFD.String(name) name = norm.NFD.String(name)
return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options) return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
} }
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {

View File

@ -18,8 +18,8 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI
m.next.IndexUpdate(deviceID, folder, files, flags, options) m.next.IndexUpdate(deviceID, folder, files, flags, options)
} }
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options) return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
} }
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {

View File

@ -34,9 +34,9 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI
m.next.IndexUpdate(deviceID, folder, files, flags, options) m.next.IndexUpdate(deviceID, folder, files, flags, options)
} }
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error {
name = filepath.FromSlash(name) name = filepath.FromSlash(name)
return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options) return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf)
} }
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {

View File

@ -81,7 +81,7 @@ type Model interface {
// An index update was received from the peer device // An index update was received from the peer device
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
// A request was made by the peer device // A request was made by the peer device
Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error
// A cluster configuration message was received // A cluster configuration message was received
ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
// The peer device closed the connection // The peer device closed the connection
@ -112,11 +112,11 @@ type rawConnection struct {
idxMut sync.Mutex // ensures serialization of Index calls idxMut sync.Mutex // ensures serialization of Index calls
nextID chan int nextID chan int
outbox chan hdrMsg outbox chan hdrMsg
closed chan struct{} closed chan struct{}
once sync.Once once sync.Once
pool sync.Pool
compression Compression compression Compression
rdbuf0 []byte // used & reused by readMessage rdbuf0 []byte // used & reused by readMessage
@ -129,8 +129,9 @@ type asyncResult struct {
} }
type hdrMsg struct { type hdrMsg struct {
hdr header hdr header
msg encodable msg encodable
done chan struct{}
} }
type encodable interface { type encodable interface {
@ -151,14 +152,19 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
cw := &countingWriter{Writer: writer} cw := &countingWriter{Writer: writer}
c := rawConnection{ c := rawConnection{
id: deviceID, id: deviceID,
name: name, name: name,
receiver: nativeModel{receiver}, receiver: nativeModel{receiver},
cr: cr, cr: cr,
cw: cw, cw: cw,
outbox: make(chan hdrMsg), outbox: make(chan hdrMsg),
nextID: make(chan int), nextID: make(chan int),
closed: make(chan struct{}), closed: make(chan struct{}),
pool: sync.Pool{
New: func() interface{} {
return make([]byte, BlockSize)
},
},
compression: compress, compression: compress,
} }
@ -195,7 +201,7 @@ func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, optio
Files: idx, Files: idx,
Flags: flags, Flags: flags,
Options: options, Options: options,
}) }, nil)
c.idxMut.Unlock() c.idxMut.Unlock()
return nil return nil
} }
@ -213,7 +219,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32,
Files: idx, Files: idx,
Flags: flags, Flags: flags,
Options: options, Options: options,
}) }, nil)
c.idxMut.Unlock() c.idxMut.Unlock()
return nil return nil
} }
@ -243,7 +249,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
Hash: hash, Hash: hash,
Flags: flags, Flags: flags,
Options: options, Options: options,
}) }, nil)
if !ok { if !ok {
return nil, ErrClosed return nil, ErrClosed
} }
@ -257,7 +263,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
// ClusterConfig send the cluster configuration message to the peer and returns any error // ClusterConfig send the cluster configuration message to the peer and returns any error
func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) { func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
c.send(-1, messageTypeClusterConfig, config) c.send(-1, messageTypeClusterConfig, config, nil)
} }
func (c *rawConnection) ping() bool { func (c *rawConnection) ping() bool {
@ -273,7 +279,7 @@ func (c *rawConnection) ping() bool {
c.awaiting[id] = rc c.awaiting[id] = rc
c.awaitingMut.Unlock() c.awaitingMut.Unlock()
ok := c.send(id, messageTypePing, nil) ok := c.send(id, messageTypePing, nil, nil)
if !ok { if !ok {
return false return false
} }
@ -342,7 +348,7 @@ func (c *rawConnection) readerLoop() (err error) {
if state != stateReady { if state != stateReady {
return fmt.Errorf("protocol error: ping message in state %d", state) return fmt.Errorf("protocol error: ping message in state %d", state)
} }
c.send(hdr.msgID, messageTypePong, pongMessage{}) c.send(hdr.msgID, messageTypePong, pongMessage{}, nil)
case pongMessage: case pongMessage:
if state != stateReady { if state != stateReady {
@ -519,12 +525,36 @@ func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
} }
func (c *rawConnection) handleRequest(msgID int, req RequestMessage) { func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
data, err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size), req.Hash, req.Flags, req.Options) size := int(req.Size)
usePool := size <= BlockSize
c.send(msgID, messageTypeResponse, ResponseMessage{ var buf []byte
Data: data, var done chan struct{}
Code: errorToCode(err),
}) if usePool {
buf = c.pool.Get().([]byte)[:size]
done = make(chan struct{})
} else {
buf = make([]byte, size)
}
err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf)
if err != nil {
c.send(msgID, messageTypeResponse, ResponseMessage{
Data: nil,
Code: errorToCode(err),
}, done)
} else {
c.send(msgID, messageTypeResponse, ResponseMessage{
Data: buf,
Code: errorToCode(err),
}, done)
}
if usePool {
<-done
c.pool.Put(buf)
}
} }
func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) { func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
@ -547,7 +577,7 @@ func (c *rawConnection) handlePong(msgID int) {
c.awaitingMut.Unlock() c.awaitingMut.Unlock()
} }
func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool { func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool {
if msgID < 0 { if msgID < 0 {
select { select {
case id := <-c.nextID: case id := <-c.nextID:
@ -564,7 +594,7 @@ func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
} }
select { select {
case c.outbox <- hdrMsg{hdr, msg}: case c.outbox <- hdrMsg{hdr, msg, done}:
return true return true
case <-c.closed: case <-c.closed:
return false return false
@ -583,6 +613,9 @@ func (c *rawConnection) writerLoop() {
if hm.msg != nil { if hm.msg != nil {
// Uncompressed message in uncBuf // Uncompressed message in uncBuf
uncBuf, err = hm.msg.AppendXDR(uncBuf[:0]) uncBuf, err = hm.msg.AppendXDR(uncBuf[:0])
if hm.done != nil {
close(hm.done)
}
if err != nil { if err != nil {
c.close(err) c.close(err)
return return

View File

@ -705,19 +705,19 @@ func (m *Model) Close(device protocol.DeviceID, err error) {
// Request returns the specified data segment by reading it from local disk. // Request returns the specified data segment by reading it from local disk.
// Implements the protocol.Model interface. // Implements the protocol.Model interface.
func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []protocol.Option) ([]byte, error) { func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset int64, hash []byte, flags uint32, options []protocol.Option, buf []byte) error {
if offset < 0 || size < 0 { if offset < 0 {
return nil, protocol.ErrNoSuchFile return protocol.ErrNoSuchFile
} }
if !m.folderSharedWith(folder, deviceID) { if !m.folderSharedWith(folder, deviceID) {
l.Warnf("Request from %s for file %s in unshared folder %q", deviceID, name, folder) l.Warnf("Request from %s for file %s in unshared folder %q", deviceID, name, folder)
return nil, protocol.ErrNoSuchFile return protocol.ErrNoSuchFile
} }
if flags != 0 { if flags != 0 {
// We don't currently support or expect any flags. // We don't currently support or expect any flags.
return nil, fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags) return fmt.Errorf("protocol error: unknown flags 0x%x in Request message", flags)
} }
// Verify that the requested file exists in the local model. We only need // Verify that the requested file exists in the local model. We only need
@ -739,7 +739,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
if !ok { if !ok {
l.Warnf("Request from %s for file %s in nonexistent folder %q", deviceID, name, folder) l.Warnf("Request from %s for file %s in nonexistent folder %q", deviceID, name, folder)
return nil, protocol.ErrNoSuchFile return protocol.ErrNoSuchFile
} }
// This call is really expensive for large files, as we load the full // This call is really expensive for large files, as we load the full
@ -747,21 +747,21 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
// space for, read, and deserialize. // space for, read, and deserialize.
lf, ok := folderFiles.Get(protocol.LocalDeviceID, name) lf, ok := folderFiles.Get(protocol.LocalDeviceID, name)
if !ok { if !ok {
return nil, protocol.ErrNoSuchFile return protocol.ErrNoSuchFile
} }
if lf.IsInvalid() || lf.IsDeleted() { if lf.IsInvalid() || lf.IsDeleted() {
if debug { if debug {
l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, size, lf) l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, deviceID, folder, name, offset, len(buf), lf)
} }
return nil, protocol.ErrInvalid return protocol.ErrInvalid
} }
if offset > lf.Size() { if offset > lf.Size() {
if debug { if debug {
l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, size) l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, deviceID, name, offset, len(buf))
} }
return nil, protocol.ErrNoSuchFile return protocol.ErrNoSuchFile
} }
m.rvmut.Lock() m.rvmut.Lock()
@ -792,7 +792,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
} }
if debug && deviceID != protocol.LocalDeviceID { if debug && deviceID != protocol.LocalDeviceID {
l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, deviceID, folder, name, offset, size) l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, deviceID, folder, name, offset, len(buf))
} }
m.fmut.RLock() m.fmut.RLock()
fn := filepath.Join(m.folderCfgs[folder].Path(), name) fn := filepath.Join(m.folderCfgs[folder].Path(), name)
@ -803,7 +803,7 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 { if info, err := os.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 {
target, _, err := symlinks.Read(fn) target, _, err := symlinks.Read(fn)
if err != nil { if err != nil {
return nil, err return err
} }
reader = strings.NewReader(target) reader = strings.NewReader(target)
} else { } else {
@ -811,19 +811,18 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset
// at any moment. // at any moment.
reader, err = os.Open(fn) reader, err = os.Open(fn)
if err != nil { if err != nil {
return nil, err return err
} }
defer reader.(*os.File).Close() defer reader.(*os.File).Close()
} }
buf := make([]byte, size)
_, err = reader.ReadAt(buf, offset) _, err = reader.ReadAt(buf, offset)
if err != nil { if err != nil {
return nil, err return err
} }
return buf, nil return nil
} }
func (m *Model) CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool) { func (m *Model) CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool) {

View File

@ -99,8 +99,11 @@ func TestRequest(t *testing.T) {
m.ServeBackground() m.ServeBackground()
m.ScanFolder("default") m.ScanFolder("default")
bs := make([]byte, protocol.BlockSize)
// Existing, shared file // Existing, shared file
bs, err := m.Request(device1, "default", "foo", 0, 6, nil, 0, nil) bs = bs[:6]
err := m.Request(device1, "default", "foo", 0, nil, 0, nil, bs)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -109,58 +112,35 @@ func TestRequest(t *testing.T) {
} }
// Existing, nonshared file // Existing, nonshared file
bs, err = m.Request(device2, "default", "foo", 0, 6, nil, 0, nil) err = m.Request(device2, "default", "foo", 0, nil, 0, nil, bs)
if err == nil { if err == nil {
t.Error("Unexpected nil error on insecure file read") t.Error("Unexpected nil error on insecure file read")
} }
if bs != nil {
t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs))
}
// Nonexistent file // Nonexistent file
bs, err = m.Request(device1, "default", "nonexistent", 0, 6, nil, 0, nil) err = m.Request(device1, "default", "nonexistent", 0, nil, 0, nil, bs)
if err == nil { if err == nil {
t.Error("Unexpected nil error on insecure file read") t.Error("Unexpected nil error on insecure file read")
} }
if bs != nil {
t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs))
}
// Shared folder, but disallowed file name // Shared folder, but disallowed file name
bs, err = m.Request(device1, "default", "../walk.go", 0, 6, nil, 0, nil) err = m.Request(device1, "default", "../walk.go", 0, nil, 0, nil, bs)
if err == nil { if err == nil {
t.Error("Unexpected nil error on insecure file read") t.Error("Unexpected nil error on insecure file read")
} }
if bs != nil {
t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs))
}
// Larger block than available
bs, err = m.Request(device1, "default", "foo", 0, 42, nil, 0, nil)
if err == nil {
t.Error("Unexpected nil error on insecure file read")
}
if bs != nil {
t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs))
}
// Negative offset // Negative offset
bs, err = m.Request(device1, "default", "foo", -4, 6, nil, 0, nil) err = m.Request(device1, "default", "foo", -4, nil, 0, nil, bs[:0])
if err == nil { if err == nil {
t.Error("Unexpected nil error on insecure file read") t.Error("Unexpected nil error on insecure file read")
} }
if bs != nil {
t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs))
}
// Negative size // Larger block than available
bs, err = m.Request(device1, "default", "foo", 4, -4, nil, 0, nil) bs = bs[:42]
err = m.Request(device1, "default", "foo", 0, nil, 0, nil, bs)
if err == nil { if err == nil {
t.Error("Unexpected nil error on insecure file read") t.Error("Unexpected nil error on insecure file read")
} }
if bs != nil {
t.Errorf("Unexpected non nil data on insecure file read: %q", string(bs))
}
} }
func genFiles(n int) []protocol.FileInfo { func genFiles(n int) []protocol.FileInfo {