redriver.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package redriver
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "github.com/aws/aws-sdk-go-v2/aws"
  8. "github.com/aws/aws-sdk-go-v2/service/sqs"
  9. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  10. "github.com/fgm/izidic"
  11. "code.osinet.fr/fgm/sqs_demo/services"
  12. )
  13. type ItemsKeys struct {
  14. MessageID string
  15. ReceiptHandle string
  16. }
  17. // Redriver is a redrive-oriented facade in front of the sqs.Client API.
  18. type Redriver interface {
  19. ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)
  20. GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error)
  21. GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)
  22. GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error)
  23. DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
  24. Purge(ctx context.Context, qName string) error
  25. RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error
  26. }
  27. type redriver struct {
  28. io.Writer
  29. *sqs.Client
  30. }
  31. func (r redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
  32. lqi := &sqs.ListQueuesInput{
  33. MaxResults: aws.Int32(1000),
  34. NextToken: nil,
  35. QueueNamePrefix: aws.String(prefix),
  36. }
  37. lqo, err := r.Client.ListQueues(ctx, lqi)
  38. if err != nil {
  39. return nil, fmt.Errorf("listing queues: %w", err)
  40. }
  41. return lqo.QueueUrls, nil
  42. }
  43. func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {
  44. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  45. qu, err := r.GetQueueUrl(ctx, qui)
  46. if err != nil {
  47. return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  48. }
  49. qai := &sqs.GetQueueAttributesInput{
  50. QueueUrl: qu.QueueUrl,
  51. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy},
  52. }
  53. qao, err := r.Client.GetQueueAttributes(ctx, qai)
  54. if err != nil {
  55. return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err)
  56. }
  57. rp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
  58. if !ok {
  59. return "", nil
  60. }
  61. var qrp QueueInfoAttributesRedrivePolicy
  62. if err = json.Unmarshal([]byte(rp), &qrp); err != nil {
  63. return "", fmt.Errorf("failed parsing redrive policy for queue %q %w",
  64. qName, err)
  65. }
  66. qURL, err := URLFromARNString(qrp.DeadLetterTargetARN)
  67. if err != nil {
  68. return "", fmt.Errorf(
  69. "failed converting queue %q ARN to URL: %w", qName, err)
  70. }
  71. return qURL, nil
  72. }
  73. func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
  74. // TODO implement me
  75. panic("implement me")
  76. }
  77. func (r redriver) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) {
  78. // TODO implement me
  79. panic("implement me")
  80. }
  81. func (r redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error {
  82. // TODO implement me
  83. panic("implement me")
  84. }
  85. func (r redriver) Purge(ctx context.Context, qName string) error {
  86. // TODO implement me
  87. panic("implement me")
  88. }
  89. func (r redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
  90. // TODO implement me
  91. panic("implement me")
  92. }
  93. func RedriverService(dic *izidic.Container) (any, error) {
  94. cli := dic.MustService(services.SvcClient).(*sqs.Client)
  95. w := dic.MustParam(services.PWriter).(io.Writer)
  96. return &redriver{
  97. Writer: w,
  98. Client: cli,
  99. }, nil
  100. }