redriver.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package redriver
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "strconv"
  9. "strings"
  10. "github.com/aws/aws-sdk-go-v2/aws"
  11. "github.com/aws/aws-sdk-go-v2/service/sqs"
  12. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  13. "github.com/fgm/izidic"
  14. "code.osinet.fr/fgm/sqs_demo/back/services"
  15. )
  16. const (
  17. // MessageSystemAttributeNameDeadLetterQueueSourceArn is an undocumented
  18. // types.Message attribute, used by the SQS console to support redrive.
  19. MessageSystemAttributeNameDeadLetterQueueSourceArn types.MessageSystemAttributeName = "DeadLetterQueueSourceArn"
  20. )
  21. type ItemsKeys struct {
  22. MessageID string
  23. ReceiptHandle string
  24. }
  25. type QueueRedrivePolicies struct {
  26. *QueueInfoAttributesRedrivePolicy
  27. *QueueInfoAttributesRedriveAllowPolicy
  28. }
  29. // Redriver is a redrive-oriented facade in front of the sqs.Client API.
  30. type Redriver interface {
  31. ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)
  32. GetRedrivePolicies(ctx context.Context, qName string) (*QueueRedrivePolicies, error)
  33. GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)
  34. GetQueueItems(ctx context.Context, qName string) ([]Message, error)
  35. DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
  36. Purge(ctx context.Context, qName string) error
  37. RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error
  38. }
  39. type redriver struct {
  40. Wait int32
  41. io.Writer
  42. *sqs.Client
  43. }
  44. func (r *redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
  45. // TODO implement pagination for the day we need more than 1000 queues to be reported.
  46. lqi := &sqs.ListQueuesInput{
  47. MaxResults: aws.Int32(1000),
  48. NextToken: nil,
  49. QueueNamePrefix: aws.String(prefix),
  50. }
  51. lqo, err := r.Client.ListQueues(ctx, lqi)
  52. if err != nil {
  53. return nil, fmt.Errorf("listing queues: %w", err)
  54. }
  55. return lqo.QueueUrls, nil
  56. }
  57. func (*redriver) parseQueueInfoRedrivePolicies(qName string, qao sqs.GetQueueAttributesOutput) (*QueueRedrivePolicies, error) {
  58. var qrp QueueRedrivePolicies
  59. srp := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
  60. if srp != "" {
  61. rp := QueueInfoAttributesRedrivePolicy{}
  62. if err := json.Unmarshal([]byte(srp), &rp); err != nil {
  63. return nil, fmt.Errorf(
  64. "failed parsing redrive policy for queue %q %w", qName, err)
  65. }
  66. if _, err := URLFromARNString(rp.DeadLetterTargetARN); err != nil {
  67. return nil, fmt.Errorf(
  68. "failed converting queue %q ARN to URL: %w", qName, err)
  69. }
  70. qrp.QueueInfoAttributesRedrivePolicy = &rp
  71. }
  72. srap := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]
  73. if srap != "" {
  74. rap := QueueInfoAttributesRedriveAllowPolicy{}
  75. if err := json.Unmarshal([]byte(srap), &rap); err != nil {
  76. return nil, fmt.Errorf(
  77. "failed parsing redrive allow policy for queue %q %w", qName, err)
  78. }
  79. for _, src := range rap.SourceQueueARNs {
  80. if _, err := URLFromARNString(src); err != nil {
  81. return nil, fmt.Errorf(
  82. "failed converting queue %q ARN to URL: %w", qName, err)
  83. }
  84. }
  85. qrp.QueueInfoAttributesRedriveAllowPolicy = &rap
  86. }
  87. return &qrp, nil
  88. }
  89. func (r *redriver) GetRedrivePolicies(ctx context.Context, qName string) (policies *QueueRedrivePolicies, err error) {
  90. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  91. qu, err := r.GetQueueUrl(ctx, qui)
  92. if err != nil {
  93. return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  94. }
  95. qai := &sqs.GetQueueAttributesInput{
  96. QueueUrl: qu.QueueUrl,
  97. AttributeNames: []types.QueueAttributeName{
  98. types.QueueAttributeNameRedrivePolicy,
  99. types.QueueAttributeNameRedriveAllowPolicy,
  100. },
  101. }
  102. qao, err := r.Client.GetQueueAttributes(ctx, qai)
  103. if err != nil {
  104. return nil, fmt.Errorf("failed getting DLQ policies for queue %q: %w", qName, err)
  105. }
  106. if qao == nil {
  107. return nil, fmt.Errorf("redrive policy info for queue %q is empty", qName)
  108. }
  109. qrp, err := r.parseQueueInfoRedrivePolicies(qName, *qao)
  110. if err != nil {
  111. return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
  112. }
  113. if qrp == nil {
  114. return nil, nil // Queue has no DLQ: this is not an error
  115. }
  116. return qrp, nil
  117. }
  118. func (r *redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
  119. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  120. qu, err := r.GetQueueUrl(ctx, qui)
  121. if err != nil {
  122. return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  123. }
  124. if qu.QueueUrl == nil {
  125. return nil, fmt.Errorf("URL for queue %q is empty", qName)
  126. }
  127. qai := &sqs.GetQueueAttributesInput{
  128. QueueUrl: qu.QueueUrl,
  129. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
  130. }
  131. qao, err := r.Client.GetQueueAttributes(ctx, qai)
  132. if err != nil {
  133. return nil, fmt.Errorf("failed getting all attributes for queue %q: %w", qName, err)
  134. }
  135. if qao == nil {
  136. return nil, fmt.Errorf("no attributes returned for queue %q", qName)
  137. }
  138. qi, err := r.parseQueueInfoAttributes(qName, qu, qao)
  139. if err != nil {
  140. return nil, err
  141. }
  142. return qi, err
  143. }
  144. func (r *redriver) parseQueueInfoAttributes(qName string, qu *sqs.GetQueueUrlOutput, qao *sqs.GetQueueAttributesOutput) (*QueueInfo, error) {
  145. var (
  146. qi *QueueInfo
  147. errCount int
  148. anom, anomd, anomnv int64
  149. created, changed int64 // timestamps
  150. delay, max, retention, vto, wait int64
  151. )
  152. u := *qu.QueueUrl
  153. a, err := ARNFromURL(u)
  154. if err != nil {
  155. return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err)
  156. }
  157. for _, field := range []struct {
  158. name types.QueueAttributeName
  159. value *int64
  160. sDef string
  161. def int64
  162. }{
  163. {types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1},
  164. {types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1},
  165. {types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1},
  166. {types.QueueAttributeNameCreatedTimestamp, &created, "0", 0},
  167. {types.QueueAttributeNameDelaySeconds, &delay, "-1", -1},
  168. {types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0},
  169. {types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)}, // 256ko
  170. {types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks
  171. {types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0}, // short polling
  172. {types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0}, // short polling
  173. } {
  174. s, ok := qao.Attributes[string(field.name)]
  175. if !ok {
  176. errCount++
  177. s = field.sDef
  178. }
  179. n, err := strconv.Atoi(s)
  180. if err != nil {
  181. errCount++
  182. }
  183. *field.value = int64(n)
  184. }
  185. qrp, err := r.parseQueueInfoRedrivePolicies(qName, *qao)
  186. if err != nil {
  187. return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
  188. }
  189. qi = &QueueInfo{
  190. Name: qName,
  191. URL: u,
  192. Attributes: &QueueInfoAttributes{
  193. ApproximateNumberOfMessages: anom,
  194. ApproximateNumberOfMessagesDelayed: anomd,
  195. ApproximateNumberOfMessagesNotVisible: anomnv,
  196. CreatedTimestamp: created,
  197. DelaySeconds: delay,
  198. LastModifiedTimestamp: changed,
  199. MaximumMessageSize: max,
  200. MessageRetentionPeriod: retention,
  201. QueueARN: a.String(),
  202. ReceiveMessageWaitTimeSeconds: wait,
  203. VisibilityTimeout: vto,
  204. RedrivePolicy: qrp.QueueInfoAttributesRedrivePolicy,
  205. RedriveAllowPolicy: qrp.QueueInfoAttributesRedriveAllowPolicy,
  206. },
  207. }
  208. return qi, nil
  209. }
  210. func (*redriver) parseReceivedAttributes(msg types.Message) (*SystemMessageAttributes, error) {
  211. var (
  212. afrt, arc, sent int64
  213. errCount int
  214. err error
  215. )
  216. for _, field := range []struct {
  217. name types.MessageSystemAttributeName
  218. value *int64
  219. sDef string
  220. def int64
  221. }{
  222. {types.MessageSystemAttributeNameApproximateFirstReceiveTimestamp, &afrt, "0", 0},
  223. {types.MessageSystemAttributeNameApproximateReceiveCount, &arc, "0", 0},
  224. {types.MessageSystemAttributeNameSentTimestamp, &sent, "0", 0},
  225. } {
  226. s, ok := msg.Attributes[string(field.name)]
  227. if !ok {
  228. errCount++
  229. s = field.sDef
  230. }
  231. n, err := strconv.Atoi(s)
  232. if err != nil {
  233. errCount++
  234. }
  235. *field.value = int64(n)
  236. }
  237. ma := SystemMessageAttributes{
  238. ApproximateFirstReceiveTimestamp: afrt,
  239. ApproximateReceiveCount: arc,
  240. DeadLetterQueueSourceARN: msg.Attributes[string(MessageSystemAttributeNameDeadLetterQueueSourceArn)],
  241. SenderId: msg.Attributes[string(types.MessageSystemAttributeNameSenderId)],
  242. SentTimestamp: sent,
  243. }
  244. if errCount > 0 {
  245. err = fmt.Errorf("encounted %d errors parsing system message attributes", errCount)
  246. return nil, err
  247. }
  248. return &ma, nil
  249. }
  250. func (r *redriver) GetQueueItems(ctx context.Context, qName string) ([]Message, error) {
  251. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  252. qu, err := r.GetQueueUrl(ctx, qui)
  253. if err != nil {
  254. return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  255. }
  256. if qu.QueueUrl == nil {
  257. return nil, fmt.Errorf("URL for queue %q is empty", qName)
  258. }
  259. rmi := sqs.ReceiveMessageInput{
  260. QueueUrl: qu.QueueUrl,
  261. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
  262. MaxNumberOfMessages: 10,
  263. MessageAttributeNames: []string{".*"},
  264. VisibilityTimeout: 0,
  265. WaitTimeSeconds: r.Wait,
  266. }
  267. rmo, err := r.Client.ReceiveMessage(ctx, &rmi)
  268. ms := make([]Message, 0, len(rmo.Messages))
  269. for _, m := range rmo.Messages {
  270. ma, err := r.parseReceivedAttributes(m)
  271. if err != nil {
  272. return nil, err
  273. }
  274. for _, field := range []struct {
  275. name string
  276. value any
  277. }{
  278. {"body", m.Body},
  279. {"md5 of body", m.MD5OfBody},
  280. {"md5 of message attributes", m.MD5OfMessageAttributes},
  281. {"message ID", m.MessageId},
  282. {"receipt handle", m.ReceiptHandle},
  283. } {
  284. if field.value == nil {
  285. return nil, fmt.Errorf("missing field %s on message", field.name)
  286. }
  287. }
  288. j, err := JSONableFromMessageAttributeValues(m.MessageAttributes)
  289. if err != nil {
  290. return nil, fmt.Errorf("failed decoding message boby: %w", err)
  291. }
  292. m := Message{
  293. Attributes: ma,
  294. Body: *m.Body,
  295. Md5OfBody: *m.MD5OfBody,
  296. Md5OfMessageAttributes: *m.MD5OfMessageAttributes,
  297. MessageAttributes: j,
  298. MessageId: *m.MessageId,
  299. ReceiptHandle: *m.ReceiptHandle,
  300. }
  301. ms = append(ms, m)
  302. }
  303. return ms, err
  304. }
  305. func (r *redriver) DeleteItems(ctx context.Context, qName string, keys []ItemsKeys) error {
  306. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  307. qu, err := r.GetQueueUrl(ctx, qui)
  308. if err != nil {
  309. return fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  310. }
  311. entries := make([]types.DeleteMessageBatchRequestEntry, len(keys))
  312. for i := 0; i < len(keys); i++ {
  313. entries[i] = types.DeleteMessageBatchRequestEntry{
  314. Id: aws.String(keys[i].MessageID),
  315. ReceiptHandle: aws.String(keys[i].ReceiptHandle),
  316. }
  317. }
  318. dmi := sqs.DeleteMessageBatchInput{
  319. Entries: entries,
  320. QueueUrl: qu.QueueUrl,
  321. }
  322. dmo, err := r.DeleteMessageBatch(ctx, &dmi)
  323. if err != nil {
  324. return fmt.Errorf("failed deleting %d items from queue %q: %v",
  325. len(keys), qName, err)
  326. }
  327. if len(dmo.Failed) > 0 {
  328. errs := make([]string, len(dmo.Failed))
  329. for i, bree := range dmo.Failed {
  330. source := "aws"
  331. if bree.SenderFault {
  332. source = "redriver"
  333. }
  334. errs[i] = fmt.Sprintf("ID: %s / Failure: %s = %q / Source: %s",
  335. *bree.Id, *bree.Message, *bree.Code, source)
  336. }
  337. return fmt.Errorf("failed deleting %d items out of %d from queue %q: %s",
  338. len(dmo.Failed), len(keys), qName, strings.Join(errs, "\n"))
  339. }
  340. log.Println(dmo)
  341. return nil
  342. }
  343. func (r *redriver) Purge(ctx context.Context, qName string) error {
  344. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  345. qu, err := r.GetQueueUrl(ctx, qui)
  346. if err != nil {
  347. return fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  348. }
  349. pqi := sqs.PurgeQueueInput{QueueUrl: qu.QueueUrl}
  350. _, err = r.PurgeQueue(ctx, &pqi)
  351. if err != nil {
  352. return fmt.Errorf("failed purging queue %q: %w",
  353. qName, err)
  354. }
  355. return nil
  356. }
  357. func (r *redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
  358. // TODO implement me
  359. panic("implement me")
  360. }
  361. func RedriverService(dic *izidic.Container) (any, error) {
  362. cli := dic.MustService(services.SvcClient).(*sqs.Client)
  363. w := dic.MustParam(services.PWriter).(io.Writer)
  364. wait := int32(dic.MustParam(services.PWait).(int))
  365. return &redriver{
  366. Wait: wait,
  367. Writer: w,
  368. Client: cli,
  369. }, nil
  370. }