// Copyright 2016 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub import ( "fmt" "math" "strings" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // maxPayload is the maximum number of bytes to devote to actual ids in // acknowledgement or modifyAckDeadline requests. A serialized // AcknowledgeRequest proto has a small constant overhead, plus the size of the // subscription name, plus 3 bytes per ID (a tag byte and two size bytes). A // ModifyAckDeadlineRequest has an additional few bytes for the deadline. We // don't know the subscription name here, so we just assume the size exclusive // of ids is 100 bytes. // // With gRPC there is no way for the client to know the server's max message size (it is // configurable on the server). We know from experience that it // it 512K. const ( maxPayload = 512 * 1024 reqFixedOverhead = 100 overheadPerID = 3 maxSendRecvBytes = 20 * 1024 * 1024 // 20M ) func convertMessages(rms []*pb.ReceivedMessage) ([]*Message, error) { msgs := make([]*Message, 0, len(rms)) for i, m := range rms { msg, err := toMessage(m) if err != nil { return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m) } msgs = append(msgs, msg) } return msgs, nil } func trunc32(i int64) int32 { if i > math.MaxInt32 { i = math.MaxInt32 } return int32(i) } // func newStreamingPuller(ctx context.Context, subc *vkit.SubscriberClient, subName string, ackDeadlineSecs int32) *streamingPuller { // p := &streamingPuller{ // ctx: ctx, // subName: subName, // ackDeadlineSecs: ackDeadlineSecs, // subc: subc, // } // p.c = sync.NewCond(&p.mu) // return p // } // type streamingPuller struct { // ctx context.Context // subName string // ackDeadlineSecs int32 // subc *vkit.SubscriberClient // mu sync.Mutex // c *sync.Cond // inFlight bool // closed bool // set after CloseSend called // spc pb.Subscriber_StreamingPullClient // err error // } // // open establishes (or re-establishes) a stream for pulling messages. // // It takes care that only one RPC is in flight at a time. // func (p *streamingPuller) open() error { // p.c.L.Lock() // defer p.c.L.Unlock() // p.openLocked() // return p.err // } // func (p *streamingPuller) openLocked() { // if p.inFlight { // // Another goroutine is opening; wait for it. // for p.inFlight { // p.c.Wait() // } // return // } // // No opens in flight; start one. // // Keep the lock held, to avoid a race where we // // close the old stream while opening a new one. // p.inFlight = true // spc, err := p.subc.StreamingPull(p.ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes))) // if err == nil { // err = spc.Send(&pb.StreamingPullRequest{ // Subscription: p.subName, // StreamAckDeadlineSeconds: p.ackDeadlineSecs, // }) // } // p.spc = spc // p.err = err // p.inFlight = false // p.c.Broadcast() // } // func (p *streamingPuller) call(f func(pb.Subscriber_StreamingPullClient) error) error { // p.c.L.Lock() // defer p.c.L.Unlock() // // Wait for an open in flight. // for p.inFlight { // p.c.Wait() // } // var err error // var bo gax.Backoff // for { // select { // case <-p.ctx.Done(): // p.err = p.ctx.Err() // default: // } // if p.err != nil { // return p.err // } // spc := p.spc // // Do not call f with the lock held. Only one goroutine calls Send // // (streamingMessageIterator.sender) and only one calls Recv // // (streamingMessageIterator.receiver). If we locked, then a // // blocked Recv would prevent a Send from happening. // p.c.L.Unlock() // err = f(spc) // p.c.L.Lock() // if !p.closed && err != nil && isRetryable(err) { // // Sleep with exponential backoff. Normally we wouldn't hold the lock while sleeping, // // but here it can't do any harm, since the stream is broken anyway. // gax.Sleep(p.ctx, bo.Pause()) // p.openLocked() // continue // } // // Not an error, or not a retryable error; stop retrying. // p.err = err // return err // } // } // Logic from https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java. func isRetryable(err error) bool { s, ok := status.FromError(err) if !ok { // includes io.EOF, normal stream close, which causes us to reopen return true } switch s.Code() { case codes.DeadlineExceeded, codes.Internal, codes.Canceled, codes.ResourceExhausted: return true case codes.Unavailable: return !strings.Contains(s.Message(), "Server shutdownNow invoked") default: return false } } // func (p *streamingPuller) fetchMessages() ([]*Message, error) { // var res *pb.StreamingPullResponse // err := p.call(func(spc pb.Subscriber_StreamingPullClient) error { // var err error // res, err = spc.Recv() // return err // }) // if err != nil { // return nil, err // } // return convertMessages(res.ReceivedMessages) // } // func (p *streamingPuller) send(req *pb.StreamingPullRequest) error { // // Note: len(modAckIDs) == len(modSecs) // var rest *pb.StreamingPullRequest // for len(req.AckIds) > 0 || len(req.ModifyDeadlineAckIds) > 0 { // req, rest = splitRequest(req, maxPayload) // err := p.call(func(spc pb.Subscriber_StreamingPullClient) error { // x := spc.Send(req) // return x // }) // if err != nil { // return err // } // req = rest // } // return nil // } // func (p *streamingPuller) closeSend() { // p.mu.Lock() // p.closed = true // p.spc.CloseSend() // p.mu.Unlock() // } // Split req into a prefix that is smaller than maxSize, and a remainder. func splitRequest(req *pb.StreamingPullRequest, maxSize int) (prefix, remainder *pb.StreamingPullRequest) { const int32Bytes = 4 // Copy all fields before splitting the variable-sized ones. remainder = &pb.StreamingPullRequest{} *remainder = *req // Split message so it isn't too big. size := reqFixedOverhead i := 0 for size < maxSize && (i < len(req.AckIds) || i < len(req.ModifyDeadlineAckIds)) { if i < len(req.AckIds) { size += overheadPerID + len(req.AckIds[i]) } if i < len(req.ModifyDeadlineAckIds) { size += overheadPerID + len(req.ModifyDeadlineAckIds[i]) + int32Bytes } i++ } min := func(a, b int) int { if a < b { return a } return b } j := i if size > maxSize { j-- } k := min(j, len(req.AckIds)) remainder.AckIds = req.AckIds[k:] req.AckIds = req.AckIds[:k] k = min(j, len(req.ModifyDeadlineAckIds)) remainder.ModifyDeadlineAckIds = req.ModifyDeadlineAckIds[k:] remainder.ModifyDeadlineSeconds = req.ModifyDeadlineSeconds[k:] req.ModifyDeadlineAckIds = req.ModifyDeadlineAckIds[:k] req.ModifyDeadlineSeconds = req.ModifyDeadlineSeconds[:k] return req, remainder }