lib/protocol: Require at least 3.125% savings from compression (#8133)

* lib/protocol: Require at least 3.125% savings from compression

The new lz4 library doesn't need its output buffer to be the maximum
size, unlike the old one (which would allocate if it weren't). It can
take a buffer that is of a smaller size and will report if compressed
data can fit inside the buffer (with a small chance of reporting a false
negative). Use that property to our advantage by requiring compressed
data to be at most n-n/32 = .96875*n bytes long for n input bytes.

* lib/protocol: Remove unused receivers

To make DeepSource happy.

* lib/protocol: Micro-optimize lz4Compress

Only write the length if compression was successful. This is a memory
write, so the compiler can't reorder it.

Only check the return value of lz4.CompressBlock.  Length-zero inputs
are always expanded by LZ4 compression (the library documents this),
so the check on len(src) isn't needed.
This commit is contained in:
greatroar 2022-01-24 19:36:58 +01:00 committed by GitHub
parent 1af87577e1
commit a0fd619df3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 15 deletions

View File

@ -540,7 +540,7 @@ func (c *rawConnection) readMessageAfterHeader(hdr Header, fourByteBuf []byte) (
// ... and is then unmarshalled // ... and is then unmarshalled
msg, err := c.newMessage(hdr.Type) msg, err := newMessage(hdr.Type)
if err != nil { if err != nil {
BufferPool.Put(buf) BufferPool.Put(buf)
return nil, err return nil, err
@ -747,7 +747,7 @@ func (c *rawConnection) writeMessage(msg message) error {
size := msg.ProtoSize() size := msg.ProtoSize()
hdr := Header{ hdr := Header{
Type: c.typeOf(msg), Type: typeOf(msg),
} }
hdrSize := hdr.ProtoSize() hdrSize := hdr.ProtoSize()
if hdrSize > 1<<16-1 { if hdrSize > 1<<16-1 {
@ -765,7 +765,7 @@ func (c *rawConnection) writeMessage(msg message) error {
} }
if c.shouldCompressMessage(msg) { if c.shouldCompressMessage(msg) {
ok, err := c.writeCompressedMessage(msg, buf[overhead:], overhead) ok, err := c.writeCompressedMessage(msg, buf[overhead:])
if ok { if ok {
return err return err
} }
@ -789,13 +789,13 @@ func (c *rawConnection) writeMessage(msg message) error {
return nil return nil
} }
// Write msg out compressed, given its uncompressed marshaled payload and overhead. // Write msg out compressed, given its uncompressed marshaled payload.
// //
// The first return value indicates whether compression succeeded. // The first return value indicates whether compression succeeded.
// If not, the caller should retry without compression. // If not, the caller should retry without compression.
func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte, overhead int) (ok bool, err error) { func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte) (ok bool, err error) {
hdr := Header{ hdr := Header{
Type: c.typeOf(msg), Type: typeOf(msg),
Compression: MessageCompressionLZ4, Compression: MessageCompressionLZ4,
} }
hdrSize := hdr.ProtoSize() hdrSize := hdr.ProtoSize()
@ -804,13 +804,16 @@ func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte, ov
} }
cOverhead := 2 + hdrSize + 4 cOverhead := 2 + hdrSize + 4
maxCompressed := cOverhead + lz4.CompressBlockBound(len(marshaled)) // The compressed size may be at most n-n/32 = .96875*n bytes,
// I.e., if we can't save at least 3.125% bandwidth, we forgo compression.
// This number is arbitrary but cheap to compute.
maxCompressed := cOverhead + len(marshaled) - len(marshaled)/32
buf := BufferPool.Get(maxCompressed) buf := BufferPool.Get(maxCompressed)
defer BufferPool.Put(buf) defer BufferPool.Put(buf)
compressedSize, err := lz4Compress(marshaled, buf[cOverhead:]) compressedSize, err := lz4Compress(marshaled, buf[cOverhead:])
totSize := compressedSize + cOverhead totSize := compressedSize + cOverhead
if err != nil || totSize >= len(marshaled)+overhead { if err != nil {
return false, nil return false, nil
} }
@ -831,7 +834,7 @@ func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte, ov
return true, nil return true, nil
} }
func (c *rawConnection) typeOf(msg message) MessageType { func typeOf(msg message) MessageType {
switch msg.(type) { switch msg.(type) {
case *ClusterConfig: case *ClusterConfig:
return MessageTypeClusterConfig return MessageTypeClusterConfig
@ -854,7 +857,7 @@ func (c *rawConnection) typeOf(msg message) MessageType {
} }
} }
func (c *rawConnection) newMessage(t MessageType) (message, error) { func newMessage(t MessageType) (message, error) {
switch t { switch t {
case MessageTypeClusterConfig: case MessageTypeClusterConfig:
return new(ClusterConfig), nil return new(ClusterConfig), nil
@ -1014,16 +1017,16 @@ func (c *rawConnection) Statistics() Statistics {
} }
func lz4Compress(src, buf []byte) (int, error) { func lz4Compress(src, buf []byte) (int, error) {
// The compressed block is prefixed by the size of the uncompressed data.
binary.BigEndian.PutUint32(buf, uint32(len(src)))
n, err := lz4.CompressBlock(src, buf[4:], nil) n, err := lz4.CompressBlock(src, buf[4:], nil)
if err != nil { if err != nil {
return -1, err return -1, err
} else if len(src) > 0 && n == 0 { } else if n == 0 {
return -1, errNotCompressible return -1, errNotCompressible
} }
// The compressed block is prefixed by the size of the uncompressed data.
binary.BigEndian.PutUint32(buf, uint32(len(src)))
return n + 4, nil return n + 4, nil
} }

View File

@ -466,7 +466,7 @@ func TestWriteCompressed(t *testing.T) {
t.Error("received the wrong message") t.Error("received the wrong message")
} }
hdr := Header{Type: c.typeOf(msg)} hdr := Header{Type: typeOf(msg)}
size := int64(2 + hdr.ProtoSize() + 4 + msg.ProtoSize()) size := int64(2 + hdr.ProtoSize() + 4 + msg.ProtoSize())
if c.cr.tot > size { if c.cr.tot > size {
t.Errorf("compression enlarged message from %d to %d", t.Errorf("compression enlarged message from %d to %d",