package storage // Copyright 2017 Microsoft Corporation // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import ( "encoding/xml" "fmt" "io" "net/http" "net/url" "strconv" "time" ) const ( // casing is per Golang's http.Header canonicalizing the header names. approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count" ) // QueueAccessPolicy represents each access policy in the queue ACL. type QueueAccessPolicy struct { ID string StartTime time.Time ExpiryTime time.Time CanRead bool CanAdd bool CanUpdate bool CanProcess bool } // QueuePermissions represents the queue ACLs. type QueuePermissions struct { AccessPolicies []QueueAccessPolicy } // SetQueuePermissionOptions includes options for a set queue permissions operation type SetQueuePermissionOptions struct { Timeout uint RequestID string `header:"x-ms-client-request-id"` } // Queue represents an Azure queue. type Queue struct { qsc *QueueServiceClient Name string Metadata map[string]string AproxMessageCount uint64 } func (q *Queue) buildPath() string { return fmt.Sprintf("/%s", q.Name) } func (q *Queue) buildPathMessages() string { return fmt.Sprintf("%s/messages", q.buildPath()) } // QueueServiceOptions includes options for some queue service operations type QueueServiceOptions struct { Timeout uint RequestID string `header:"x-ms-client-request-id"` } // Create operation creates a queue under the given account. // // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Create-Queue4 func (q *Queue) Create(options *QueueServiceOptions) error { params := url.Values{} headers := q.qsc.client.getStandardHeaders() headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata) if options != nil { params = addTimeout(params, options.Timeout) headers = mergeHeaders(headers, headersFromStruct(*options)) } uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth) if err != nil { return err } defer readAndCloseBody(resp.Body) return checkRespCode(resp, []int{http.StatusCreated}) } // Delete operation permanently deletes the specified queue. // // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Delete-Queue3 func (q *Queue) Delete(options *QueueServiceOptions) error { params := url.Values{} headers := q.qsc.client.getStandardHeaders() if options != nil { params = addTimeout(params, options.Timeout) headers = mergeHeaders(headers, headersFromStruct(*options)) } uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth) if err != nil { return err } defer readAndCloseBody(resp.Body) return checkRespCode(resp, []int{http.StatusNoContent}) } // Exists returns true if a queue with given name exists. func (q *Queue) Exists() (bool, error) { uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), url.Values{"comp": {"metadata"}}) resp, err := q.qsc.client.exec(http.MethodGet, uri, q.qsc.client.getStandardHeaders(), nil, q.qsc.auth) if resp != nil { defer readAndCloseBody(resp.Body) if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotFound { return resp.StatusCode == http.StatusOK, nil } err = getErrorFromResponse(resp) } return false, err } // SetMetadata operation sets user-defined metadata on the specified queue. // Metadata is associated with the queue as name-value pairs. // // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata func (q *Queue) SetMetadata(options *QueueServiceOptions) error { params := url.Values{"comp": {"metadata"}} headers := q.qsc.client.getStandardHeaders() headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata) if options != nil { params = addTimeout(params, options.Timeout) headers = mergeHeaders(headers, headersFromStruct(*options)) } uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth) if err != nil { return err } defer readAndCloseBody(resp.Body) return checkRespCode(resp, []int{http.StatusNoContent}) } // GetMetadata operation retrieves user-defined metadata and queue // properties on the specified queue. Metadata is associated with // the queue as name-values pairs. // // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata // // Because the way Golang's http client (and http.Header in particular) // canonicalize header names, the returned metadata names would always // be all lower case. func (q *Queue) GetMetadata(options *QueueServiceOptions) error { params := url.Values{"comp": {"metadata"}} headers := q.qsc.client.getStandardHeaders() if options != nil { params = addTimeout(params, options.Timeout) headers = mergeHeaders(headers, headersFromStruct(*options)) } uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), url.Values{"comp": {"metadata"}}) resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth) if err != nil { return err } defer readAndCloseBody(resp.Body) if err := checkRespCode(resp, []int{http.StatusOK}); err != nil { return err } aproxMessagesStr := resp.Header.Get(http.CanonicalHeaderKey(approximateMessagesCountHeader)) if aproxMessagesStr != "" { aproxMessages, err := strconv.ParseUint(aproxMessagesStr, 10, 64) if err != nil { return err } q.AproxMessageCount = aproxMessages } q.Metadata = getMetadataFromHeaders(resp.Header) return nil } // GetMessageReference returns a message object with the specified text. func (q *Queue) GetMessageReference(text string) *Message { return &Message{ Queue: q, Text: text, } } // GetMessagesOptions is the set of options can be specified for Get // Messsages operation. A zero struct does not use any preferences for the // request. type GetMessagesOptions struct { Timeout uint NumOfMessages int VisibilityTimeout int RequestID string `header:"x-ms-client-request-id"` } type messages struct { XMLName xml.Name `xml:"QueueMessagesList"` Messages []Message `xml:"QueueMessage"` } // GetMessages operation retrieves one or more messages from the front of the // queue. // // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Get-Messages func (q *Queue) GetMessages(options *GetMessagesOptions) ([]Message, error) { query := url.Values{} headers := q.qsc.client.getStandardHeaders() if options != nil { if options.NumOfMessages != 0 { query.Set("numofmessages", strconv.Itoa(options.NumOfMessages)) } if options.VisibilityTimeout != 0 { query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout)) } query = addTimeout(query, options.Timeout) headers = mergeHeaders(headers, headersFromStruct(*options)) } uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query) resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth) if err != nil { return []Message{}, err } defer resp.Body.Close() var out messages err = xmlUnmarshal(resp.Body, &out) if err != nil { return []Message{}, err } for i := range out.Messages { out.Messages[i].Queue = q } return out.Messages, err } // PeekMessagesOptions is the set of options can be specified for Peek // Messsage operation. A zero struct does not use any preferences for the // request. type PeekMessagesOptions struct { Timeout uint NumOfMessages int RequestID string `header:"x-ms-client-request-id"` } // PeekMessages retrieves one or more messages from the front of the queue, but // does not alter the visibility of the message. // // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Peek-Messages func (q *Queue) PeekMessages(options *PeekMessagesOptions) ([]Message, error) { query := url.Values{"peekonly": {"true"}} // Required for peek operation headers := q.qsc.client.getStandardHeaders() if options != nil { if options.NumOfMessages != 0 { query.Set("numofmessages", strconv.Itoa(options.NumOfMessages)) } query = addTimeout(query, options.Timeout) headers = mergeHeaders(headers, headersFromStruct(*options)) } uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query) resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth) if err != nil { return []Message{}, err } defer resp.Body.Close() var out messages err = xmlUnmarshal(resp.Body, &out) if err != nil { return []Message{}, err } for i := range out.Messages { out.Messages[i].Queue = q } return out.Messages, err } // ClearMessages operation deletes all messages from the specified queue. // // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Clear-Messages func (q *Queue) ClearMessages(options *QueueServiceOptions) error { params := url.Values{} headers := q.qsc.client.getStandardHeaders() if options != nil { params = addTimeout(params, options.Timeout) headers = mergeHeaders(headers, headersFromStruct(*options)) } uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), params) resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth) if err != nil { return err } defer readAndCloseBody(resp.Body) return checkRespCode(resp, []int{http.StatusNoContent}) } // SetPermissions sets up queue permissions // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/set-queue-acl func (q *Queue) SetPermissions(permissions QueuePermissions, options *SetQueuePermissionOptions) error { body, length, err := generateQueueACLpayload(permissions.AccessPolicies) if err != nil { return err } params := url.Values{ "comp": {"acl"}, } headers := q.qsc.client.getStandardHeaders() headers["Content-Length"] = strconv.Itoa(length) if options != nil { params = addTimeout(params, options.Timeout) headers = mergeHeaders(headers, headersFromStruct(*options)) } uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, body, q.qsc.auth) if err != nil { return err } defer readAndCloseBody(resp.Body) return checkRespCode(resp, []int{http.StatusNoContent}) } func generateQueueACLpayload(policies []QueueAccessPolicy) (io.Reader, int, error) { sil := SignedIdentifiers{ SignedIdentifiers: []SignedIdentifier{}, } for _, qapd := range policies { permission := qapd.generateQueuePermissions() signedIdentifier := convertAccessPolicyToXMLStructs(qapd.ID, qapd.StartTime, qapd.ExpiryTime, permission) sil.SignedIdentifiers = append(sil.SignedIdentifiers, signedIdentifier) } return xmlMarshal(sil) } func (qapd *QueueAccessPolicy) generateQueuePermissions() (permissions string) { // generate the permissions string (raup). // still want the end user API to have bool flags. permissions = "" if qapd.CanRead { permissions += "r" } if qapd.CanAdd { permissions += "a" } if qapd.CanUpdate { permissions += "u" } if qapd.CanProcess { permissions += "p" } return permissions } // GetQueuePermissionOptions includes options for a get queue permissions operation type GetQueuePermissionOptions struct { Timeout uint RequestID string `header:"x-ms-client-request-id"` } // GetPermissions gets the queue permissions as per https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/get-queue-acl // If timeout is 0 then it will not be passed to Azure func (q *Queue) GetPermissions(options *GetQueuePermissionOptions) (*QueuePermissions, error) { params := url.Values{ "comp": {"acl"}, } headers := q.qsc.client.getStandardHeaders() if options != nil { params = addTimeout(params, options.Timeout) headers = mergeHeaders(headers, headersFromStruct(*options)) } uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth) if err != nil { return nil, err } defer resp.Body.Close() var ap AccessPolicy err = xmlUnmarshal(resp.Body, &ap.SignedIdentifiersList) if err != nil { return nil, err } return buildQueueAccessPolicy(ap, &resp.Header), nil } func buildQueueAccessPolicy(ap AccessPolicy, headers *http.Header) *QueuePermissions { permissions := QueuePermissions{ AccessPolicies: []QueueAccessPolicy{}, } for _, policy := range ap.SignedIdentifiersList.SignedIdentifiers { qapd := QueueAccessPolicy{ ID: policy.ID, StartTime: policy.AccessPolicy.StartTime, ExpiryTime: policy.AccessPolicy.ExpiryTime, } qapd.CanRead = updatePermissions(policy.AccessPolicy.Permission, "r") qapd.CanAdd = updatePermissions(policy.AccessPolicy.Permission, "a") qapd.CanUpdate = updatePermissions(policy.AccessPolicy.Permission, "u") qapd.CanProcess = updatePermissions(policy.AccessPolicy.Permission, "p") permissions.AccessPolicies = append(permissions.AccessPolicies, qapd) } return &permissions }