|
@@ -28,10 +28,15 @@ type ItemsKeys struct {
|
|
|
ReceiptHandle string
|
|
|
}
|
|
|
|
|
|
+type QueueRedrivePolicies struct {
|
|
|
+ *QueueInfoAttributesRedrivePolicy
|
|
|
+ *QueueInfoAttributesRedriveAllowPolicy
|
|
|
+}
|
|
|
+
|
|
|
// Redriver is a redrive-oriented facade in front of the sqs.Client API.
|
|
|
type Redriver interface {
|
|
|
ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)
|
|
|
- GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error)
|
|
|
+ GetRedrivePolicies(ctx context.Context, qName string) (*QueueRedrivePolicies, error)
|
|
|
GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)
|
|
|
GetQueueItems(ctx context.Context, qName string) ([]Message, error)
|
|
|
DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
|
|
@@ -47,6 +52,7 @@ type redriver struct {
|
|
|
}
|
|
|
|
|
|
func (r *redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
|
|
|
+ // TODO implement pagination for the day we need more than 1000 queues to be reported.
|
|
|
lqi := &sqs.ListQueuesInput{
|
|
|
MaxResults: aws.Int32(1000),
|
|
|
NextToken: nil,
|
|
@@ -59,62 +65,70 @@ func (r *redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []s
|
|
|
return lqo.QueueUrls, nil
|
|
|
}
|
|
|
|
|
|
-func (*redriver) parseQueueInfoRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) {
|
|
|
- srap, ok := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]
|
|
|
- if !ok || srap == "" {
|
|
|
- return nil, nil
|
|
|
- }
|
|
|
- qrap = &QueueInfoAttributesRedriveAllowPolicy{}
|
|
|
- if err = json.Unmarshal([]byte(srap), qrap); err != nil {
|
|
|
- return nil, fmt.Errorf(
|
|
|
- "failed parsing redrive policy for queue %q %w", qName, err)
|
|
|
- }
|
|
|
- return qrap, nil
|
|
|
-}
|
|
|
+func (*redriver) parseQueueInfoRedrivePolicies(qName string, qao sqs.GetQueueAttributesOutput) (*QueueRedrivePolicies, error) {
|
|
|
+ var qrp QueueRedrivePolicies
|
|
|
|
|
|
-func (*redriver) parseQueueInfoRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) {
|
|
|
- srp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
|
|
|
- if !ok || srp == "" {
|
|
|
- return nil, nil
|
|
|
- }
|
|
|
- qrp = &QueueInfoAttributesRedrivePolicy{}
|
|
|
- if err = json.Unmarshal([]byte(srp), qrp); err != nil {
|
|
|
- return qrp, fmt.Errorf(
|
|
|
- "failed parsing redrive policy for queue %q %w", qName, err)
|
|
|
+ srp := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
|
|
|
+ if srp != "" {
|
|
|
+ rp := QueueInfoAttributesRedrivePolicy{}
|
|
|
+ if err := json.Unmarshal([]byte(srp), &rp); err != nil {
|
|
|
+ return nil, fmt.Errorf(
|
|
|
+ "failed parsing redrive policy for queue %q %w", qName, err)
|
|
|
+ }
|
|
|
+ if _, err := URLFromARNString(rp.DeadLetterTargetARN); err != nil {
|
|
|
+ return nil, fmt.Errorf(
|
|
|
+ "failed converting queue %q ARN to URL: %w", qName, err)
|
|
|
+ }
|
|
|
+ qrp.QueueInfoAttributesRedrivePolicy = &rp
|
|
|
}
|
|
|
- if _, err = URLFromARNString(qrp.DeadLetterTargetARN); err != nil {
|
|
|
- return qrp, fmt.Errorf(
|
|
|
- "failed converting queue %q ARN to URL: %w", qName, err)
|
|
|
+
|
|
|
+ srap := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]
|
|
|
+ if srap != "" {
|
|
|
+ rap := QueueInfoAttributesRedriveAllowPolicy{}
|
|
|
+ if err := json.Unmarshal([]byte(srap), &rap); err != nil {
|
|
|
+ return nil, fmt.Errorf(
|
|
|
+ "failed parsing redrive allow policy for queue %q %w", qName, err)
|
|
|
+ }
|
|
|
+ for _, src := range rap.SourceQueueARNs {
|
|
|
+ if _, err := URLFromARNString(src); err != nil {
|
|
|
+ return nil, fmt.Errorf(
|
|
|
+ "failed converting queue %q ARN to URL: %w", qName, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ qrp.QueueInfoAttributesRedriveAllowPolicy = &rap
|
|
|
}
|
|
|
- return qrp, nil
|
|
|
+
|
|
|
+ return &qrp, nil
|
|
|
}
|
|
|
|
|
|
-func (r *redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {
|
|
|
+func (r *redriver) GetRedrivePolicies(ctx context.Context, qName string) (policies *QueueRedrivePolicies, err error) {
|
|
|
qui := &sqs.GetQueueUrlInput{QueueName: &qName}
|
|
|
qu, err := r.GetQueueUrl(ctx, qui)
|
|
|
if err != nil {
|
|
|
- return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
|
|
|
+ return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
|
|
|
}
|
|
|
qai := &sqs.GetQueueAttributesInput{
|
|
|
- QueueUrl: qu.QueueUrl,
|
|
|
- AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy},
|
|
|
+ QueueUrl: qu.QueueUrl,
|
|
|
+ AttributeNames: []types.QueueAttributeName{
|
|
|
+ types.QueueAttributeNameRedrivePolicy,
|
|
|
+ types.QueueAttributeNameRedriveAllowPolicy,
|
|
|
+ },
|
|
|
}
|
|
|
qao, err := r.Client.GetQueueAttributes(ctx, qai)
|
|
|
if err != nil {
|
|
|
- return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err)
|
|
|
+ return nil, fmt.Errorf("failed getting DLQ policies for queue %q: %w", qName, err)
|
|
|
}
|
|
|
if qao == nil {
|
|
|
- return "", fmt.Errorf("redrive policy for queue %q is empty", qName)
|
|
|
+ return nil, fmt.Errorf("redrive policy info for queue %q is empty", qName)
|
|
|
}
|
|
|
- qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao)
|
|
|
+ qrp, err := r.parseQueueInfoRedrivePolicies(qName, *qao)
|
|
|
if err != nil {
|
|
|
- return "", fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
|
|
|
+ return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
|
|
|
}
|
|
|
if qrp == nil {
|
|
|
- return "", nil // Queue has no DLQ: this is not an error
|
|
|
+ return nil, nil // Queue has no DLQ: this is not an error
|
|
|
}
|
|
|
- qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseQueueInfoRedrivePolicy
|
|
|
- return qURL, nil
|
|
|
+ return qrp, nil
|
|
|
}
|
|
|
|
|
|
func (r *redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
|
|
@@ -187,14 +201,10 @@ func (r *redriver) parseQueueInfoAttributes(qName string, qu *sqs.GetQueueUrlOut
|
|
|
}
|
|
|
*field.value = int64(n)
|
|
|
}
|
|
|
- qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao)
|
|
|
+ qrp, err := r.parseQueueInfoRedrivePolicies(qName, *qao)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
|
|
|
}
|
|
|
- qrap, err := r.parseQueueInfoRedriveAllowPolicy(qName, *qao)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err)
|
|
|
- }
|
|
|
qi = &QueueInfo{
|
|
|
Name: qName,
|
|
|
URL: u,
|
|
@@ -210,8 +220,8 @@ func (r *redriver) parseQueueInfoAttributes(qName string, qu *sqs.GetQueueUrlOut
|
|
|
QueueARN: a.String(),
|
|
|
ReceiveMessageWaitTimeSeconds: wait,
|
|
|
VisibilityTimeout: vto,
|
|
|
- RedrivePolicy: qrp,
|
|
|
- RedriveAllowPolicy: qrap,
|
|
|
+ RedrivePolicy: qrp.QueueInfoAttributesRedrivePolicy,
|
|
|
+ RedriveAllowPolicy: qrp.QueueInfoAttributesRedriveAllowPolicy,
|
|
|
},
|
|
|
}
|
|
|
return qi, nil
|