package redriver import ( "context" "encoding/json" "fmt" "io" "strconv" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/fgm/izidic" "code.osinet.fr/fgm/sqs_demo/services" ) const ( // MessageSystemAttributeNameDeadLetterQueueSourceArn is an undocumented // types.Message attribute, used by the SQS console to support redrive. MessageSystemAttributeNameDeadLetterQueueSourceArn types.MessageSystemAttributeName = "DeadLetterQueueSourceArn" ) type ItemsKeys struct { MessageID string ReceiptHandle string } // Redriver is a redrive-oriented facade in front of the sqs.Client API. type Redriver interface { ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) GetQueueItems(ctx context.Context, qName string) ([]Message, error) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error Purge(ctx context.Context, qName string) error RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error } type redriver struct { Wait int32 io.Writer *sqs.Client } func (r *redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) { lqi := &sqs.ListQueuesInput{ MaxResults: aws.Int32(1000), NextToken: nil, QueueNamePrefix: aws.String(prefix), } lqo, err := r.Client.ListQueues(ctx, lqi) if err != nil { return nil, fmt.Errorf("listing queues: %w", err) } return lqo.QueueUrls, nil } func (*redriver) parseQueueInfoRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) { srap, ok := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)] if !ok || srap == "" { return nil, nil } qrap = &QueueInfoAttributesRedriveAllowPolicy{} if err = json.Unmarshal([]byte(srap), qrap); err != nil { return nil, fmt.Errorf( "failed parsing redrive policy for queue %q %w", qName, err) } return qrap, nil } func (*redriver) parseQueueInfoRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) { srp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)] if !ok || srp == "" { return nil, nil } qrp = &QueueInfoAttributesRedrivePolicy{} if err = json.Unmarshal([]byte(srp), qrp); err != nil { return qrp, fmt.Errorf( "failed parsing redrive policy for queue %q %w", qName, err) } if _, err = URLFromARNString(qrp.DeadLetterTargetARN); err != nil { return qrp, fmt.Errorf( "failed converting queue %q ARN to URL: %w", qName, err) } return qrp, nil } func (r *redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) { qui := &sqs.GetQueueUrlInput{QueueName: &qName} qu, err := r.GetQueueUrl(ctx, qui) if err != nil { return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err) } qai := &sqs.GetQueueAttributesInput{ QueueUrl: qu.QueueUrl, AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy}, } qao, err := r.Client.GetQueueAttributes(ctx, qai) if err != nil { return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err) } if qao == nil { return "", fmt.Errorf("redrive policy for queue %q is empty", qName) } qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao) if err != nil { return "", fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err) } if qrp == nil { return "", nil // Queue has no DLQ: this is not an error } qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseQueueInfoRedrivePolicy return qURL, nil } func (r *redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) { qui := &sqs.GetQueueUrlInput{QueueName: &qName} qu, err := r.GetQueueUrl(ctx, qui) if err != nil { return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err) } if qu.QueueUrl == nil { return nil, fmt.Errorf("URL for queue %q is empty", qName) } qai := &sqs.GetQueueAttributesInput{ QueueUrl: qu.QueueUrl, AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll}, } qao, err := r.Client.GetQueueAttributes(ctx, qai) if err != nil { return nil, fmt.Errorf("failed getting all attributes for queue %q: %w", qName, err) } if qao == nil { return nil, fmt.Errorf("no attributes returned for queue %q", qName) } qi, err := r.parseQueueInfoAttributes(qName, qu, qao) if err != nil { return nil, err } return qi, err } func (r *redriver) parseQueueInfoAttributes(qName string, qu *sqs.GetQueueUrlOutput, qao *sqs.GetQueueAttributesOutput) (*QueueInfo, error) { var ( qi *QueueInfo errCount int anom, anomd, anomnv int64 created, changed int64 // timestamps delay, max, retention, vto, wait int64 ) u := *qu.QueueUrl a, err := ARNFromURL(u) if err != nil { return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err) } for _, field := range []struct { name types.QueueAttributeName value *int64 sDef string def int64 }{ {types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1}, {types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1}, {types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1}, {types.QueueAttributeNameCreatedTimestamp, &created, "0", 0}, {types.QueueAttributeNameDelaySeconds, &delay, "-1", -1}, {types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0}, {types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)}, // 256ko {types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks {types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0}, // short polling {types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0}, // short polling } { s, ok := qao.Attributes[string(field.name)] if !ok { errCount++ s = field.sDef } n, err := strconv.Atoi(s) if err != nil { errCount++ } *field.value = int64(n) } qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao) if err != nil { return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err) } qrap, err := r.parseQueueInfoRedriveAllowPolicy(qName, *qao) if err != nil { return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err) } qi = &QueueInfo{ Name: qName, Url: u, Attributes: &QueueInfoAttributes{ ApproximateNumberOfMessages: anom, ApproximateNumberOfMessagesDelayed: anomd, ApproximateNumberOfMessagesNotVisible: anomnv, CreatedTimestamp: created, DelaySeconds: delay, LastModifiedTimestamp: changed, MaximumMessageSize: max, MessageRetentionPeriod: retention, QueueARN: a.String(), ReceiveMessageWaitTimeSeconds: wait, VisibilityTimeout: vto, RedrivePolicy: qrp, RedriveAllowPolicy: qrap, }, } return qi, nil } func (*redriver) parseReceivedAttributes(msg types.Message) (*SystemMessageAttributes, error) { var ( afrt, arc, sent int64 errCount int err error ) for _, field := range []struct { name types.MessageSystemAttributeName value *int64 sDef string def int64 }{ {types.MessageSystemAttributeNameApproximateFirstReceiveTimestamp, &afrt, "0", 0}, {types.MessageSystemAttributeNameApproximateReceiveCount, &arc, "0", 0}, {types.MessageSystemAttributeNameSentTimestamp, &sent, "0", 0}, } { s, ok := msg.Attributes[string(field.name)] if !ok { errCount++ s = field.sDef } n, err := strconv.Atoi(s) if err != nil { errCount++ } *field.value = int64(n) } ma := SystemMessageAttributes{ ApproximateFirstReceiveTimestamp: afrt, ApproximateReceiveCount: arc, DeadLetterQueueSourceARN: msg.Attributes[string(MessageSystemAttributeNameDeadLetterQueueSourceArn)], SenderId: msg.Attributes[string(types.MessageSystemAttributeNameSenderId)], SentTimestamp: sent, } if errCount > 0 { err = fmt.Errorf("encounted %d errors parsing system message attributes", errCount) return nil, err } return &ma, nil } func (r *redriver) GetQueueItems(ctx context.Context, qName string) ([]Message, error) { qui := &sqs.GetQueueUrlInput{QueueName: &qName} qu, err := r.GetQueueUrl(ctx, qui) if err != nil { return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err) } if qu.QueueUrl == nil { return nil, fmt.Errorf("URL for queue %q is empty", qName) } rmi := sqs.ReceiveMessageInput{ QueueUrl: qu.QueueUrl, AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll}, MaxNumberOfMessages: 10, MessageAttributeNames: []string{".*"}, VisibilityTimeout: 0, WaitTimeSeconds: r.Wait, } rmo, err := r.Client.ReceiveMessage(ctx, &rmi) ms := make([]Message, 0, len(rmo.Messages)) for _, m := range rmo.Messages { ma, err := r.parseReceivedAttributes(m) if err != nil { return nil, err } for _, field := range []struct { name string value any }{ {"body", m.Body}, {"md5 of body", m.MD5OfBody}, {"md5 of message attributes", m.MD5OfMessageAttributes}, {"message ID", m.MessageId}, {"receipt handle", m.ReceiptHandle}, } { if field.value == nil { return nil, fmt.Errorf("missing field %s on message", field.name) } } j, err := JSONableFromMessageAttributeValues(m.MessageAttributes) if err != nil { return nil, fmt.Errorf("failed decoding message boby: %w", err) } m := Message{ Attributes: ma, Body: *m.Body, Md5OfBody: *m.MD5OfBody, Md5OfMessageAttributes: *m.MD5OfMessageAttributes, MessageAttributes: j, MessageId: *m.MessageId, ReceiptHandle: *m.ReceiptHandle, } ms = append(ms, m) } return ms, err } func (r *redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error { // TODO implement me panic("implement me") } func (r *redriver) Purge(ctx context.Context, qName string) error { // TODO implement me panic("implement me") } func (r *redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error { // TODO implement me panic("implement me") } func RedriverService(dic *izidic.Container) (any, error) { cli := dic.MustService(services.SvcClient).(*sqs.Client) w := dic.MustParam(services.PWriter).(io.Writer) wait := int32(dic.MustParam(services.PWait).(int)) return &redriver{ Wait: wait, Writer: w, Client: cli, }, nil }