123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- package redriver
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "strconv"
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/service/sqs"
- "github.com/aws/aws-sdk-go-v2/service/sqs/types"
- "github.com/fgm/izidic"
- "code.osinet.fr/fgm/sqs_demo/services"
- )
- type ItemsKeys struct {
- MessageID string
- ReceiptHandle string
- }
- // Redriver is a redrive-oriented facade in front of the sqs.Client API.
- type Redriver interface {
- ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)
- GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error)
- GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)
- GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error)
- DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
- Purge(ctx context.Context, qName string) error
- RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error
- }
- type redriver struct {
- io.Writer
- *sqs.Client
- }
- func (r redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
- lqi := &sqs.ListQueuesInput{
- MaxResults: aws.Int32(1000),
- NextToken: nil,
- QueueNamePrefix: aws.String(prefix),
- }
- lqo, err := r.Client.ListQueues(ctx, lqi)
- if err != nil {
- return nil, fmt.Errorf("listing queues: %w", err)
- }
- return lqo.QueueUrls, nil
- }
- func (r redriver) parseRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) {
- srap, ok := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]
- if !ok || srap == "" {
- return nil, nil
- }
- qrap = &QueueInfoAttributesRedriveAllowPolicy{}
- if err = json.Unmarshal([]byte(srap), qrap); err != nil {
- return nil, fmt.Errorf(
- "failed parsing redrive policy for queue %q %w", qName, err)
- }
- return qrap, nil
- }
- func (r redriver) parseRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) {
- srp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
- if !ok || srp == "" {
- return nil, nil
- }
- qrp = &QueueInfoAttributesRedrivePolicy{}
- if err = json.Unmarshal([]byte(srp), qrp); err != nil {
- return qrp, fmt.Errorf(
- "failed parsing redrive policy for queue %q %w", qName, err)
- }
- if _, err = URLFromARNString(qrp.DeadLetterTargetARN); err != nil {
- return qrp, fmt.Errorf(
- "failed converting queue %q ARN to URL: %w", qName, err)
- }
- return qrp, nil
- }
- func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {
- qui := &sqs.GetQueueUrlInput{QueueName: &qName}
- qu, err := r.GetQueueUrl(ctx, qui)
- if err != nil {
- return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
- }
- qai := &sqs.GetQueueAttributesInput{
- QueueUrl: qu.QueueUrl,
- AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy},
- }
- qao, err := r.Client.GetQueueAttributes(ctx, qai)
- if err != nil {
- return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err)
- }
- if qao == nil {
- return "", fmt.Errorf("redrive policy for queue %q is empty", qName)
- }
- qrp, err := r.parseRedrivePolicy(qName, *qao)
- if err != nil {
- return "", fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
- }
- if qrp == nil {
- return "", nil // Queue has no DLQ: this is not an error
- }
- qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseRedrivePolicy
- return qURL, nil
- }
- func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
- qui := &sqs.GetQueueUrlInput{QueueName: &qName}
- qu, err := r.GetQueueUrl(ctx, qui)
- if err != nil {
- return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
- }
- if qu.QueueUrl == nil {
- return nil, fmt.Errorf("URL for queue %q is empty", qName)
- }
- u := *qu.QueueUrl
- a, err := ARNFromURL(u)
- if err != nil {
- return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err)
- }
- qai := &sqs.GetQueueAttributesInput{
- QueueUrl: qu.QueueUrl,
- AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
- }
- qao, err := r.Client.GetQueueAttributes(ctx, qai)
- if err != nil {
- return nil, fmt.Errorf("failed getting all attributes for queue %q: %w", qName, err)
- }
- if qao == nil {
- return nil, fmt.Errorf("no attributes returned for queue %q", qName)
- }
- var (
- errCount = 0
- qi *QueueInfo
- )
- {
- var (
- anom, anomd, anomnv int32
- created, changed int32 // timestamps
- delay, max, retention, vto, wait int32
- )
- for _, field := range []struct {
- name types.QueueAttributeName
- value *int32
- sDef string
- def int32
- }{
- {types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1},
- {types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1},
- {types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1},
- {types.QueueAttributeNameCreatedTimestamp, &created, "0", 0},
- {types.QueueAttributeNameDelaySeconds, &delay, "-1", -1},
- {types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0},
- {types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)}, // 256ko
- {types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks
- {types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0}, // short polling
- {types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0}, // short polling
- } {
- s, ok := qao.Attributes[string(field.name)]
- if !ok {
- errCount++
- s = field.sDef
- }
- n, err := strconv.Atoi(s)
- if err != nil {
- errCount++
- }
- *field.value = int32(n)
- }
- qrp, err := r.parseRedrivePolicy(qName, *qao)
- if err != nil {
- return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
- }
- qrap, err := r.parseRedriveAllowPolicy(qName, *qao)
- if err != nil {
- return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err)
- }
- qi = &QueueInfo{
- Name: qName,
- Url: u,
- Attributes: &QueueInfoAttributes{
- ApproximateNumberOfMessages: anom,
- ApproximateNumberOfMessagesDelayed: anomd,
- ApproximateNumberOfMessagesNotVisible: anomnv,
- CreatedTimestamp: created,
- DelaySeconds: delay,
- LastModifiedTimestamp: changed,
- MaximumMessageSize: max,
- MessageRetentionPeriod: retention,
- QueueARN: a.String(),
- ReceiveMessageWaitTimeSeconds: wait,
- VisibilityTimeout: vto,
- RedrivePolicy: qrp,
- RedriveAllowPolicy: qrap,
- },
- }
- }
- return qi, err
- }
- func (r redriver) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) {
- // TODO implement me
- panic("implement me")
- }
- func (r redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error {
- // TODO implement me
- panic("implement me")
- }
- func (r redriver) Purge(ctx context.Context, qName string) error {
- // TODO implement me
- panic("implement me")
- }
- func (r redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
- // TODO implement me
- panic("implement me")
- }
- func RedriverService(dic *izidic.Container) (any, error) {
- cli := dic.MustService(services.SvcClient).(*sqs.Client)
- w := dic.MustParam(services.PWriter).(io.Writer)
- return &redriver{
- Writer: w,
- Client: cli,
- }, nil
- }
|