package redriver import ( "context" "encoding/json" "fmt" "io" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/fgm/izidic" "code.osinet.fr/fgm/sqs_demo/services" ) type ItemsKeys struct { MessageID string ReceiptHandle string } // Redriver is a redrive-oriented facade in front of the sqs.Client API. type Redriver interface { ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error Purge(ctx context.Context, qName string) error RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error } type redriver struct { io.Writer *sqs.Client } func (r redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) { lqi := &sqs.ListQueuesInput{ MaxResults: aws.Int32(1000), NextToken: nil, QueueNamePrefix: aws.String(prefix), } lqo, err := r.Client.ListQueues(ctx, lqi) if err != nil { return nil, fmt.Errorf("listing queues: %w", err) } return lqo.QueueUrls, nil } func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) { qui := &sqs.GetQueueUrlInput{QueueName: &qName} qu, err := r.GetQueueUrl(ctx, qui) if err != nil { return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err) } qai := &sqs.GetQueueAttributesInput{ QueueUrl: qu.QueueUrl, AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy}, } qao, err := r.Client.GetQueueAttributes(ctx, qai) if err != nil { return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err) } rp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)] if !ok { return "", nil } var qrp QueueInfoAttributesRedrivePolicy if err = json.Unmarshal([]byte(rp), &qrp); err != nil { return "", fmt.Errorf("failed parsing redrive policy for queue %q %w", qName, err) } qURL, err := URLFromARNString(qrp.DeadLetterTargetARN) if err != nil { return "", fmt.Errorf( "failed converting queue %q ARN to URL: %w", qName, err) } return qURL, nil } func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) { // TODO implement me panic("implement me") } func (r redriver) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) { // TODO implement me panic("implement me") } func (r redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error { // TODO implement me panic("implement me") } func (r redriver) Purge(ctx context.Context, qName string) error { // TODO implement me panic("implement me") } func (r redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error { // TODO implement me panic("implement me") } func RedriverService(dic *izidic.Container) (any, error) { cli := dic.MustService(services.SvcClient).(*sqs.Client) w := dic.MustParam(services.PWriter).(io.Writer) return &redriver{ Writer: w, Client: cli, }, nil }