diff --git a/buffers/buffers.go b/buffers/buffers.go index 0ed1e7826..3112264b9 100644 --- a/buffers/buffers.go +++ b/buffers/buffers.go @@ -1,13 +1,26 @@ package buffers -var buffers = make(chan []byte, 32) +const ( + largeMin = 1024 +) + +var ( + smallBuffers = make(chan []byte, 32) + largeBuffers = make(chan []byte, 32) +) func Get(size int) []byte { + var ch = largeBuffers + if size < largeMin { + ch = smallBuffers + } + var buf []byte select { - case buf = <-buffers: + case buf = <-ch: default: } + if len(buf) < size { return make([]byte, size) } @@ -15,12 +28,18 @@ func Get(size int) []byte { } func Put(buf []byte) { - if cap(buf) == 0 { + buf = buf[:cap(buf)] + if len(buf) == 0 { return } - buf = buf[:cap(buf)] + + var ch = largeBuffers + if len(buf) < largeMin { + ch = smallBuffers + } + select { - case buffers <- buf: + case ch <- buf: default: } } diff --git a/model.go b/model.go index aa3b8744b..df6996924 100644 --- a/model.go +++ b/model.go @@ -111,7 +111,7 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { // Files marked as deleted do not even enter the model continue } - m.remote[nodeID][f.Name] = fileFromProtocol(f) + m.remote[nodeID][f.Name] = fileFromFileInfo(f) } m.recomputeGlobal() @@ -136,7 +136,7 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { // Files marked as deleted do not even enter the model continue } - repo[f.Name] = fileFromProtocol(f) + repo[f.Name] = fileFromFileInfo(f) } m.recomputeGlobal() @@ -149,7 +149,7 @@ func (m *Model) SeedIndex(fs []protocol.FileInfo) { m.local = make(map[string]File) for _, f := range fs { - m.local[f.Name] = fileFromProtocol(f) + m.local[f.Name] = fileFromFileInfo(f) } m.recomputeGlobal() @@ -396,17 +396,7 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo { func (m *Model) protocolIndex() []protocol.FileInfo { var index []protocol.FileInfo for _, f := range m.local { - mf := protocol.FileInfo{ - Name: f.Name, - Flags: f.Flags, - Modified: int64(f.Modified), - } - for _, b := range f.Blocks { - mf.Blocks = append(mf.Blocks, protocol.BlockInfo{ - Length: b.Length, - Hash: b.Hash, - }) - } + mf := fileInfoFromFile(f) if opts.Debug.TraceIdx { var flagComment string if mf.Flags&FlagDeleted != 0 { @@ -436,7 +426,7 @@ func (m *Model) AddNode(node *protocol.Connection) { go node.Index(idx) } -func fileFromProtocol(f protocol.FileInfo) File { +func fileFromFileInfo(f protocol.FileInfo) File { var blocks []Block var offset uint64 for _, b := range f.Blocks { @@ -445,6 +435,7 @@ func fileFromProtocol(f protocol.FileInfo) File { Length: b.Length, Hash: b.Hash, }) + buffers.Put(b.Hash) offset += uint64(b.Length) } return File{ @@ -454,3 +445,19 @@ func fileFromProtocol(f protocol.FileInfo) File { Blocks: blocks, } } + +func fileInfoFromFile(f File) protocol.FileInfo { + var blocks []protocol.BlockInfo + for _, b := range f.Blocks { + blocks = append(blocks, protocol.BlockInfo{ + Length: b.Length, + Hash: b.Hash, + }) + } + return protocol.FileInfo{ + Name: f.Name, + Flags: f.Flags, + Modified: int64(f.Modified), + Blocks: blocks, + } +} diff --git a/protocol/marshal.go b/protocol/marshal.go index bc97287f0..728b81845 100644 --- a/protocol/marshal.go +++ b/protocol/marshal.go @@ -21,6 +21,7 @@ type marshalWriter struct { w io.Writer tot int err error + b [8]byte } // We will never encode nor expect to decode blobs larger than 10 MB. Check @@ -57,12 +58,11 @@ func (w *marshalWriter) writeUint32(v uint32) { if w.err != nil { return } - var b [4]byte - b[0] = byte(v >> 24) - b[1] = byte(v >> 16) - b[2] = byte(v >> 8) - b[3] = byte(v) - _, w.err = w.w.Write(b[:]) + w.b[0] = byte(v >> 24) + w.b[1] = byte(v >> 16) + w.b[2] = byte(v >> 8) + w.b[3] = byte(v) + _, w.err = w.w.Write(w.b[:4]) w.tot += 4 } @@ -70,16 +70,15 @@ func (w *marshalWriter) writeUint64(v uint64) { if w.err != nil { return } - var b [8]byte - b[0] = byte(v >> 56) - b[1] = byte(v >> 48) - b[2] = byte(v >> 40) - b[3] = byte(v >> 32) - b[4] = byte(v >> 24) - b[5] = byte(v >> 16) - b[6] = byte(v >> 8) - b[7] = byte(v) - _, w.err = w.w.Write(b[:]) + w.b[0] = byte(v >> 56) + w.b[1] = byte(v >> 48) + w.b[2] = byte(v >> 40) + w.b[3] = byte(v >> 32) + w.b[4] = byte(v >> 24) + w.b[5] = byte(v >> 16) + w.b[6] = byte(v >> 8) + w.b[7] = byte(v) + _, w.err = w.w.Write(w.b[:8]) w.tot += 8 } @@ -87,6 +86,7 @@ type marshalReader struct { r io.Reader tot int err error + b [8]byte } func (r *marshalReader) readString() string { @@ -117,19 +117,17 @@ func (r *marshalReader) readUint32() uint32 { if r.err != nil { return 0 } - var b [4]byte - _, r.err = io.ReadFull(r.r, b[:]) + _, r.err = io.ReadFull(r.r, r.b[:4]) r.tot += 4 - return uint32(b[3]) | uint32(b[2])<<8 | uint32(b[1])<<16 | uint32(b[0])<<24 + return uint32(r.b[3]) | uint32(r.b[2])<<8 | uint32(r.b[1])<<16 | uint32(r.b[0])<<24 } func (r *marshalReader) readUint64() uint64 { if r.err != nil { return 0 } - var b [8]byte - _, r.err = io.ReadFull(r.r, b[:]) + _, r.err = io.ReadFull(r.r, r.b[:8]) r.tot += 8 - return uint64(b[7]) | uint64(b[6])<<8 | uint64(b[5])<<16 | uint64(b[4])<<24 | - uint64(b[3])<<32 | uint64(b[2])<<40 | uint64(b[1])<<48 | uint64(b[0])<<56 + return uint64(r.b[7]) | uint64(r.b[6])<<8 | uint64(r.b[5])<<16 | uint64(r.b[4])<<24 | + uint64(r.b[3])<<32 | uint64(r.b[2])<<40 | uint64(r.b[1])<<48 | uint64(r.b[0])<<56 } diff --git a/protocol/messages.go b/protocol/messages.go index 8a481d3b2..3328bddc5 100644 --- a/protocol/messages.go +++ b/protocol/messages.go @@ -48,7 +48,7 @@ func (w *marshalWriter) writeIndex(idx []FileInfo) { } func WriteIndex(w io.Writer, idx []FileInfo) (int, error) { - mw := marshalWriter{w, 0, nil} + mw := marshalWriter{w: w} mw.writeIndex(idx) return mw.tot, mw.err } @@ -90,7 +90,7 @@ func (r *marshalReader) readIndex() []FileInfo { } func ReadIndex(r io.Reader) ([]FileInfo, error) { - mr := marshalReader{r, 0, nil} + mr := marshalReader{r: r} idx := mr.readIndex() return idx, mr.err } diff --git a/protocol/protocol.go b/protocol/protocol.go index 564fd28f7..b272b6da9 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -57,7 +57,7 @@ type Connection struct { lastReceive time.Time peerLatency time.Duration lastStatistics Statistics - lastIndexSent map[string]FileInfo + indexSent map[string]int64 } var ErrClosed = errors.New("Connection closed") @@ -80,9 +80,9 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M c := Connection{ receiver: receiver, reader: flrd, - mreader: &marshalReader{flrd, 0, nil}, + mreader: &marshalReader{r: flrd}, writer: flwr, - mwriter: &marshalWriter{flwr, 0, nil}, + mwriter: &marshalWriter{w: flwr}, awaiting: make(map[int]chan asyncResult), lastReceive: time.Now(), ID: nodeID, @@ -100,22 +100,22 @@ func (c *Connection) Index(idx []FileInfo) { c.Lock() var msgType int - if c.lastIndexSent == nil { + if c.indexSent == nil { // This is the first time we send an index. msgType = messageTypeIndex - c.lastIndexSent = make(map[string]FileInfo) + c.indexSent = make(map[string]int64) for _, f := range idx { - c.lastIndexSent[f.Name] = f + c.indexSent[f.Name] = f.Modified } } else { // We have sent one full index. Only send updates now. msgType = messageTypeIndexUpdate var diff []FileInfo for _, f := range idx { - if ef, ok := c.lastIndexSent[f.Name]; !ok || ef.Modified != f.Modified { + if modified, ok := c.indexSent[f.Name]; !ok || f.Modified != modified { diff = append(diff, f) - c.lastIndexSent[f.Name] = f + c.indexSent[f.Name] = f.Modified } } idx = diff