|
@@ -5,6 +5,7 @@ import (
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
+ "strconv"
|
|
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
|
"github.com/aws/aws-sdk-go-v2/service/sqs"
|
|
@@ -48,6 +49,36 @@ func (r redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []st
|
|
|
return lqo.QueueUrls, nil
|
|
|
}
|
|
|
|
|
|
+func (r redriver) parseRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) {
|
|
|
+ srap, ok := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]
|
|
|
+ if !ok || srap == "" {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ qrap = &QueueInfoAttributesRedriveAllowPolicy{}
|
|
|
+ if err = json.Unmarshal([]byte(srap), qrap); err != nil {
|
|
|
+ return nil, fmt.Errorf(
|
|
|
+ "failed parsing redrive policy for queue %q %w", qName, err)
|
|
|
+ }
|
|
|
+ return qrap, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (r redriver) parseRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) {
|
|
|
+ srp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
|
|
|
+ if !ok || srp == "" {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ qrp = &QueueInfoAttributesRedrivePolicy{}
|
|
|
+ if err = json.Unmarshal([]byte(srp), qrp); err != nil {
|
|
|
+ return qrp, fmt.Errorf(
|
|
|
+ "failed parsing redrive policy for queue %q %w", qName, err)
|
|
|
+ }
|
|
|
+ if _, err = URLFromARNString(qrp.DeadLetterTargetARN); err != nil {
|
|
|
+ return qrp, fmt.Errorf(
|
|
|
+ "failed converting queue %q ARN to URL: %w", qName, err)
|
|
|
+ }
|
|
|
+ return qrp, nil
|
|
|
+}
|
|
|
+
|
|
|
func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {
|
|
|
qui := &sqs.GetQueueUrlInput{QueueName: &qName}
|
|
|
qu, err := r.GetQueueUrl(ctx, qui)
|
|
@@ -62,26 +93,112 @@ func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, er
|
|
|
if err != nil {
|
|
|
return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err)
|
|
|
}
|
|
|
- rp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
|
|
|
- if !ok {
|
|
|
- return "", nil
|
|
|
+ if qao == nil {
|
|
|
+ return "", fmt.Errorf("redrive policy for queue %q is empty", qName)
|
|
|
}
|
|
|
- var qrp QueueInfoAttributesRedrivePolicy
|
|
|
- if err = json.Unmarshal([]byte(rp), &qrp); err != nil {
|
|
|
- return "", fmt.Errorf("failed parsing redrive policy for queue %q %w",
|
|
|
- qName, err)
|
|
|
- }
|
|
|
- qURL, err := URLFromARNString(qrp.DeadLetterTargetARN)
|
|
|
+ qrp, err := r.parseRedrivePolicy(qName, *qao)
|
|
|
if err != nil {
|
|
|
- return "", fmt.Errorf(
|
|
|
- "failed converting queue %q ARN to URL: %w", qName, err)
|
|
|
+ return "", fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
|
|
|
}
|
|
|
+ if qrp == nil {
|
|
|
+ return "", nil // Queue has no DLQ: this is not an error
|
|
|
+ }
|
|
|
+ qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseRedrivePolicy
|
|
|
return qURL, nil
|
|
|
}
|
|
|
|
|
|
func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
|
|
|
- // TODO implement me
|
|
|
- panic("implement me")
|
|
|
+ qui := &sqs.GetQueueUrlInput{QueueName: &qName}
|
|
|
+ qu, err := r.GetQueueUrl(ctx, qui)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
|
|
|
+ }
|
|
|
+ if qu.QueueUrl == nil {
|
|
|
+ return nil, fmt.Errorf("URL for queue %q is empty", qName)
|
|
|
+ }
|
|
|
+ u := *qu.QueueUrl
|
|
|
+ a, err := ARNFromURL(u)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err)
|
|
|
+ }
|
|
|
+ qai := &sqs.GetQueueAttributesInput{
|
|
|
+ QueueUrl: qu.QueueUrl,
|
|
|
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
|
|
|
+ }
|
|
|
+ qao, err := r.Client.GetQueueAttributes(ctx, qai)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed getting all attributes for queue %q: %w", qName, err)
|
|
|
+ }
|
|
|
+ if qao == nil {
|
|
|
+ return nil, fmt.Errorf("no attributes returned for queue %q", qName)
|
|
|
+ }
|
|
|
+ var (
|
|
|
+ errCount = 0
|
|
|
+ qi *QueueInfo
|
|
|
+ )
|
|
|
+ {
|
|
|
+ var (
|
|
|
+ anom, anomd, anomnv int32
|
|
|
+ created, changed int32 // timestamps
|
|
|
+ delay, max, retention, vto, wait int32
|
|
|
+ )
|
|
|
+ for _, field := range []struct {
|
|
|
+ name types.QueueAttributeName
|
|
|
+ value *int32
|
|
|
+ sDef string
|
|
|
+ def int32
|
|
|
+ }{
|
|
|
+ {types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1},
|
|
|
+ {types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1},
|
|
|
+ {types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1},
|
|
|
+ {types.QueueAttributeNameCreatedTimestamp, &created, "0", 0},
|
|
|
+ {types.QueueAttributeNameDelaySeconds, &delay, "-1", -1},
|
|
|
+ {types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0},
|
|
|
+ {types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)}, // 256ko
|
|
|
+ {types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks
|
|
|
+ {types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0}, // short polling
|
|
|
+ {types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0}, // short polling
|
|
|
+ } {
|
|
|
+ s, ok := qao.Attributes[string(field.name)]
|
|
|
+ if !ok {
|
|
|
+ errCount++
|
|
|
+ s = field.sDef
|
|
|
+ }
|
|
|
+ n, err := strconv.Atoi(s)
|
|
|
+ if err != nil {
|
|
|
+ errCount++
|
|
|
+ }
|
|
|
+ *field.value = int32(n)
|
|
|
+ }
|
|
|
+ qrp, err := r.parseRedrivePolicy(qName, *qao)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
|
|
|
+ }
|
|
|
+ qrap, err := r.parseRedriveAllowPolicy(qName, *qao)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err)
|
|
|
+ }
|
|
|
+ qi = &QueueInfo{
|
|
|
+ Name: qName,
|
|
|
+ Url: u,
|
|
|
+ Attributes: &QueueInfoAttributes{
|
|
|
+ ApproximateNumberOfMessages: anom,
|
|
|
+ ApproximateNumberOfMessagesDelayed: anomd,
|
|
|
+ ApproximateNumberOfMessagesNotVisible: anomnv,
|
|
|
+ CreatedTimestamp: created,
|
|
|
+ DelaySeconds: delay,
|
|
|
+ LastModifiedTimestamp: changed,
|
|
|
+ MaximumMessageSize: max,
|
|
|
+ MessageRetentionPeriod: retention,
|
|
|
+ QueueARN: a.String(),
|
|
|
+ ReceiveMessageWaitTimeSeconds: wait,
|
|
|
+ VisibilityTimeout: vto,
|
|
|
+ RedrivePolicy: qrp,
|
|
|
+ RedriveAllowPolicy: qrap,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return qi, err
|
|
|
}
|
|
|
|
|
|
func (r redriver) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) {
|