redriver.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package redriver
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "github.com/aws/aws-sdk-go-v2/aws"
  9. "github.com/aws/aws-sdk-go-v2/service/sqs"
  10. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  11. "github.com/fgm/izidic"
  12. "code.osinet.fr/fgm/sqs_demo/services"
  13. )
  14. type ItemsKeys struct {
  15. MessageID string
  16. ReceiptHandle string
  17. }
  18. // Redriver is a redrive-oriented facade in front of the sqs.Client API.
  19. type Redriver interface {
  20. ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)
  21. GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error)
  22. GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)
  23. GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error)
  24. DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
  25. Purge(ctx context.Context, qName string) error
  26. RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error
  27. }
  28. type redriver struct {
  29. io.Writer
  30. *sqs.Client
  31. }
  32. func (r redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
  33. lqi := &sqs.ListQueuesInput{
  34. MaxResults: aws.Int32(1000),
  35. NextToken: nil,
  36. QueueNamePrefix: aws.String(prefix),
  37. }
  38. lqo, err := r.Client.ListQueues(ctx, lqi)
  39. if err != nil {
  40. return nil, fmt.Errorf("listing queues: %w", err)
  41. }
  42. return lqo.QueueUrls, nil
  43. }
  44. func (r redriver) parseRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) {
  45. srap, ok := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]
  46. if !ok || srap == "" {
  47. return nil, nil
  48. }
  49. qrap = &QueueInfoAttributesRedriveAllowPolicy{}
  50. if err = json.Unmarshal([]byte(srap), qrap); err != nil {
  51. return nil, fmt.Errorf(
  52. "failed parsing redrive policy for queue %q %w", qName, err)
  53. }
  54. return qrap, nil
  55. }
  56. func (r redriver) parseRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) {
  57. srp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
  58. if !ok || srp == "" {
  59. return nil, nil
  60. }
  61. qrp = &QueueInfoAttributesRedrivePolicy{}
  62. if err = json.Unmarshal([]byte(srp), qrp); err != nil {
  63. return qrp, fmt.Errorf(
  64. "failed parsing redrive policy for queue %q %w", qName, err)
  65. }
  66. if _, err = URLFromARNString(qrp.DeadLetterTargetARN); err != nil {
  67. return qrp, fmt.Errorf(
  68. "failed converting queue %q ARN to URL: %w", qName, err)
  69. }
  70. return qrp, nil
  71. }
  72. func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {
  73. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  74. qu, err := r.GetQueueUrl(ctx, qui)
  75. if err != nil {
  76. return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  77. }
  78. qai := &sqs.GetQueueAttributesInput{
  79. QueueUrl: qu.QueueUrl,
  80. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy},
  81. }
  82. qao, err := r.Client.GetQueueAttributes(ctx, qai)
  83. if err != nil {
  84. return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err)
  85. }
  86. if qao == nil {
  87. return "", fmt.Errorf("redrive policy for queue %q is empty", qName)
  88. }
  89. qrp, err := r.parseRedrivePolicy(qName, *qao)
  90. if err != nil {
  91. return "", fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
  92. }
  93. if qrp == nil {
  94. return "", nil // Queue has no DLQ: this is not an error
  95. }
  96. qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseRedrivePolicy
  97. return qURL, nil
  98. }
  99. func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
  100. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  101. qu, err := r.GetQueueUrl(ctx, qui)
  102. if err != nil {
  103. return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  104. }
  105. if qu.QueueUrl == nil {
  106. return nil, fmt.Errorf("URL for queue %q is empty", qName)
  107. }
  108. u := *qu.QueueUrl
  109. a, err := ARNFromURL(u)
  110. if err != nil {
  111. return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err)
  112. }
  113. qai := &sqs.GetQueueAttributesInput{
  114. QueueUrl: qu.QueueUrl,
  115. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
  116. }
  117. qao, err := r.Client.GetQueueAttributes(ctx, qai)
  118. if err != nil {
  119. return nil, fmt.Errorf("failed getting all attributes for queue %q: %w", qName, err)
  120. }
  121. if qao == nil {
  122. return nil, fmt.Errorf("no attributes returned for queue %q", qName)
  123. }
  124. var (
  125. errCount = 0
  126. qi *QueueInfo
  127. )
  128. {
  129. var (
  130. anom, anomd, anomnv int32
  131. created, changed int32 // timestamps
  132. delay, max, retention, vto, wait int32
  133. )
  134. for _, field := range []struct {
  135. name types.QueueAttributeName
  136. value *int32
  137. sDef string
  138. def int32
  139. }{
  140. {types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1},
  141. {types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1},
  142. {types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1},
  143. {types.QueueAttributeNameCreatedTimestamp, &created, "0", 0},
  144. {types.QueueAttributeNameDelaySeconds, &delay, "-1", -1},
  145. {types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0},
  146. {types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)}, // 256ko
  147. {types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks
  148. {types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0}, // short polling
  149. {types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0}, // short polling
  150. } {
  151. s, ok := qao.Attributes[string(field.name)]
  152. if !ok {
  153. errCount++
  154. s = field.sDef
  155. }
  156. n, err := strconv.Atoi(s)
  157. if err != nil {
  158. errCount++
  159. }
  160. *field.value = int32(n)
  161. }
  162. qrp, err := r.parseRedrivePolicy(qName, *qao)
  163. if err != nil {
  164. return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
  165. }
  166. qrap, err := r.parseRedriveAllowPolicy(qName, *qao)
  167. if err != nil {
  168. return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err)
  169. }
  170. qi = &QueueInfo{
  171. Name: qName,
  172. Url: u,
  173. Attributes: &QueueInfoAttributes{
  174. ApproximateNumberOfMessages: anom,
  175. ApproximateNumberOfMessagesDelayed: anomd,
  176. ApproximateNumberOfMessagesNotVisible: anomnv,
  177. CreatedTimestamp: created,
  178. DelaySeconds: delay,
  179. LastModifiedTimestamp: changed,
  180. MaximumMessageSize: max,
  181. MessageRetentionPeriod: retention,
  182. QueueARN: a.String(),
  183. ReceiveMessageWaitTimeSeconds: wait,
  184. VisibilityTimeout: vto,
  185. RedrivePolicy: qrp,
  186. RedriveAllowPolicy: qrap,
  187. },
  188. }
  189. }
  190. return qi, err
  191. }
  192. func (r redriver) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) {
  193. // TODO implement me
  194. panic("implement me")
  195. }
  196. func (r redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error {
  197. // TODO implement me
  198. panic("implement me")
  199. }
  200. func (r redriver) Purge(ctx context.Context, qName string) error {
  201. // TODO implement me
  202. panic("implement me")
  203. }
  204. func (r redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
  205. // TODO implement me
  206. panic("implement me")
  207. }
  208. func RedriverService(dic *izidic.Container) (any, error) {
  209. cli := dic.MustService(services.SvcClient).(*sqs.Client)
  210. w := dic.MustParam(services.PWriter).(io.Writer)
  211. return &redriver{
  212. Writer: w,
  213. Client: cli,
  214. }, nil
  215. }