package redriver import ( "context" "encoding/json" "fmt" "io" "strconv" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/fgm/izidic" "code.osinet.fr/fgm/sqs_demo/services" ) type ItemsKeys struct { MessageID string ReceiptHandle string } // Redriver is a redrive-oriented facade in front of the sqs.Client API. type Redriver interface { ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error Purge(ctx context.Context, qName string) error RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error } type redriver struct { io.Writer *sqs.Client } func (r redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) { lqi := &sqs.ListQueuesInput{ MaxResults: aws.Int32(1000), NextToken: nil, QueueNamePrefix: aws.String(prefix), } lqo, err := r.Client.ListQueues(ctx, lqi) if err != nil { return nil, fmt.Errorf("listing queues: %w", err) } return lqo.QueueUrls, nil } func (r redriver) parseRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) { srap, ok := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)] if !ok || srap == "" { return nil, nil } qrap = &QueueInfoAttributesRedriveAllowPolicy{} if err = json.Unmarshal([]byte(srap), qrap); err != nil { return nil, fmt.Errorf( "failed parsing redrive policy for queue %q %w", qName, err) } return qrap, nil } func (r redriver) parseRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) { srp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)] if !ok || srp == "" { return nil, nil } qrp = &QueueInfoAttributesRedrivePolicy{} if err = json.Unmarshal([]byte(srp), qrp); err != nil { return qrp, fmt.Errorf( "failed parsing redrive policy for queue %q %w", qName, err) } if _, err = URLFromARNString(qrp.DeadLetterTargetARN); err != nil { return qrp, fmt.Errorf( "failed converting queue %q ARN to URL: %w", qName, err) } return qrp, nil } func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) { qui := &sqs.GetQueueUrlInput{QueueName: &qName} qu, err := r.GetQueueUrl(ctx, qui) if err != nil { return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err) } qai := &sqs.GetQueueAttributesInput{ QueueUrl: qu.QueueUrl, AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy}, } qao, err := r.Client.GetQueueAttributes(ctx, qai) if err != nil { return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err) } if qao == nil { return "", fmt.Errorf("redrive policy for queue %q is empty", qName) } qrp, err := r.parseRedrivePolicy(qName, *qao) if err != nil { return "", fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err) } if qrp == nil { return "", nil // Queue has no DLQ: this is not an error } qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseRedrivePolicy return qURL, nil } func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) { qui := &sqs.GetQueueUrlInput{QueueName: &qName} qu, err := r.GetQueueUrl(ctx, qui) if err != nil { return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err) } if qu.QueueUrl == nil { return nil, fmt.Errorf("URL for queue %q is empty", qName) } u := *qu.QueueUrl a, err := ARNFromURL(u) if err != nil { return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err) } qai := &sqs.GetQueueAttributesInput{ QueueUrl: qu.QueueUrl, AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll}, } qao, err := r.Client.GetQueueAttributes(ctx, qai) if err != nil { return nil, fmt.Errorf("failed getting all attributes for queue %q: %w", qName, err) } if qao == nil { return nil, fmt.Errorf("no attributes returned for queue %q", qName) } var ( errCount = 0 qi *QueueInfo ) { var ( anom, anomd, anomnv int32 created, changed int32 // timestamps delay, max, retention, vto, wait int32 ) for _, field := range []struct { name types.QueueAttributeName value *int32 sDef string def int32 }{ {types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1}, {types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1}, {types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1}, {types.QueueAttributeNameCreatedTimestamp, &created, "0", 0}, {types.QueueAttributeNameDelaySeconds, &delay, "-1", -1}, {types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0}, {types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)}, // 256ko {types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks {types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0}, // short polling {types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0}, // short polling } { s, ok := qao.Attributes[string(field.name)] if !ok { errCount++ s = field.sDef } n, err := strconv.Atoi(s) if err != nil { errCount++ } *field.value = int32(n) } qrp, err := r.parseRedrivePolicy(qName, *qao) if err != nil { return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err) } qrap, err := r.parseRedriveAllowPolicy(qName, *qao) if err != nil { return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err) } qi = &QueueInfo{ Name: qName, Url: u, Attributes: &QueueInfoAttributes{ ApproximateNumberOfMessages: anom, ApproximateNumberOfMessagesDelayed: anomd, ApproximateNumberOfMessagesNotVisible: anomnv, CreatedTimestamp: created, DelaySeconds: delay, LastModifiedTimestamp: changed, MaximumMessageSize: max, MessageRetentionPeriod: retention, QueueARN: a.String(), ReceiveMessageWaitTimeSeconds: wait, VisibilityTimeout: vto, RedrivePolicy: qrp, RedriveAllowPolicy: qrap, }, } } return qi, err } func (r redriver) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) { // TODO implement me panic("implement me") } func (r redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error { // TODO implement me panic("implement me") } func (r redriver) Purge(ctx context.Context, qName string) error { // TODO implement me panic("implement me") } func (r redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error { // TODO implement me panic("implement me") } func RedriverService(dic *izidic.Container) (any, error) { cli := dic.MustService(services.SvcClient).(*sqs.Client) w := dic.MustParam(services.PWriter).(io.Writer) return &redriver{ Writer: w, Client: cli, }, nil }