redriver.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. package redriver
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/aws/aws-sdk-go-v2/aws"
  12. "github.com/aws/aws-sdk-go-v2/service/sqs"
  13. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  14. "github.com/fgm/izidic"
  15. "code.osinet.fr/fgm/sqs_demo/back/services"
  16. )
  17. const (
  18. // MessageSystemAttributeNameDeadLetterQueueSourceArn is an undocumented
  19. // types.Message attribute, used by the SQS console to support redrive.
  20. MessageSystemAttributeNameDeadLetterQueueSourceArn types.MessageSystemAttributeName = "DeadLetterQueueSourceArn"
  21. )
  22. type ItemsKeys struct {
  23. MessageID string
  24. ReceiptHandle string
  25. }
  26. type QueueRedrivePolicies struct {
  27. *QueueInfoAttributesRedrivePolicy
  28. *QueueInfoAttributesRedriveAllowPolicy
  29. }
  30. // Redriver is a redrive-oriented facade in front of the sqs.Client API.
  31. type Redriver interface {
  32. ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)
  33. GetRedrivePolicies(ctx context.Context, qName string) (*QueueRedrivePolicies, error)
  34. GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)
  35. GetQueueItems(ctx context.Context, qName string) ([]Message, error)
  36. DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
  37. Purge(ctx context.Context, qName string) error
  38. RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error
  39. }
  40. type redriver struct {
  41. Wait int32
  42. io.Writer
  43. *sqs.Client
  44. }
  45. func (r *redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
  46. // TODO implement pagination for the day we need more than 1000 queues to be reported.
  47. lqi := &sqs.ListQueuesInput{
  48. MaxResults: aws.Int32(1000),
  49. NextToken: nil,
  50. QueueNamePrefix: aws.String(prefix),
  51. }
  52. lqo, err := r.Client.ListQueues(ctx, lqi)
  53. if err != nil {
  54. return nil, fmt.Errorf("listing queues: %w", err)
  55. }
  56. return lqo.QueueUrls, nil
  57. }
  58. func (*redriver) parseQueueInfoRedrivePolicies(qName string, qao sqs.GetQueueAttributesOutput) (*QueueRedrivePolicies, error) {
  59. var qrp QueueRedrivePolicies
  60. srp := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
  61. if srp != "" {
  62. rp := QueueInfoAttributesRedrivePolicy{}
  63. if err := json.Unmarshal([]byte(srp), &rp); err != nil {
  64. return nil, fmt.Errorf(
  65. "failed parsing redrive policy for queue %q %w", qName, err)
  66. }
  67. if _, err := URLFromARNString(rp.DeadLetterTargetARN); err != nil {
  68. return nil, fmt.Errorf(
  69. "failed converting queue %q ARN to URL: %w", qName, err)
  70. }
  71. qrp.QueueInfoAttributesRedrivePolicy = &rp
  72. }
  73. srap := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]
  74. if srap != "" {
  75. rap := QueueInfoAttributesRedriveAllowPolicy{}
  76. if err := json.Unmarshal([]byte(srap), &rap); err != nil {
  77. return nil, fmt.Errorf(
  78. "failed parsing redrive allow policy for queue %q %w", qName, err)
  79. }
  80. for _, src := range rap.SourceQueueARNs {
  81. if _, err := URLFromARNString(src); err != nil {
  82. return nil, fmt.Errorf(
  83. "failed converting queue %q ARN to URL: %w", qName, err)
  84. }
  85. }
  86. qrp.QueueInfoAttributesRedriveAllowPolicy = &rap
  87. }
  88. return &qrp, nil
  89. }
  90. func (r *redriver) GetRedrivePolicies(ctx context.Context, qName string) (policies *QueueRedrivePolicies, err error) {
  91. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  92. qu, err := r.GetQueueUrl(ctx, qui)
  93. if err != nil {
  94. return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  95. }
  96. qai := &sqs.GetQueueAttributesInput{
  97. QueueUrl: qu.QueueUrl,
  98. AttributeNames: []types.QueueAttributeName{
  99. types.QueueAttributeNameRedrivePolicy,
  100. types.QueueAttributeNameRedriveAllowPolicy,
  101. },
  102. }
  103. qao, err := r.Client.GetQueueAttributes(ctx, qai)
  104. if err != nil {
  105. return nil, fmt.Errorf("failed getting DLQ policies for queue %q: %w", qName, err)
  106. }
  107. if qao == nil {
  108. return nil, fmt.Errorf("redrive policy info for queue %q is empty", qName)
  109. }
  110. qrp, err := r.parseQueueInfoRedrivePolicies(qName, *qao)
  111. if err != nil {
  112. return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
  113. }
  114. if qrp == nil {
  115. return nil, nil // Queue has no DLQ: this is not an error
  116. }
  117. return qrp, nil
  118. }
  119. func (r *redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
  120. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  121. qu, err := r.GetQueueUrl(ctx, qui)
  122. if err != nil {
  123. return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  124. }
  125. if qu.QueueUrl == nil {
  126. return nil, fmt.Errorf("URL for queue %q is empty", qName)
  127. }
  128. qai := &sqs.GetQueueAttributesInput{
  129. QueueUrl: qu.QueueUrl,
  130. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
  131. }
  132. qao, err := r.Client.GetQueueAttributes(ctx, qai)
  133. if err != nil {
  134. return nil, fmt.Errorf("failed getting all attributes for queue %q: %w", qName, err)
  135. }
  136. if qao == nil {
  137. return nil, fmt.Errorf("no attributes returned for queue %q", qName)
  138. }
  139. qi, err := r.parseQueueInfoAttributes(qName, qu, qao)
  140. if err != nil {
  141. return nil, err
  142. }
  143. return qi, err
  144. }
  145. func (r *redriver) parseQueueInfoAttributes(qName string, qu *sqs.GetQueueUrlOutput, qao *sqs.GetQueueAttributesOutput) (*QueueInfo, error) {
  146. var (
  147. qi *QueueInfo
  148. errCount int
  149. anom, anomd, anomnv int64
  150. created, changed int64 // timestamps
  151. delay, max, retention, vto, wait int64
  152. )
  153. u := *qu.QueueUrl
  154. a, err := ARNFromURL(u)
  155. if err != nil {
  156. return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err)
  157. }
  158. for _, field := range []struct {
  159. name types.QueueAttributeName
  160. value *int64
  161. sDef string
  162. def int64
  163. }{
  164. {types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1},
  165. {types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1},
  166. {types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1},
  167. {types.QueueAttributeNameCreatedTimestamp, &created, "0", 0},
  168. {types.QueueAttributeNameDelaySeconds, &delay, "-1", -1},
  169. {types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0},
  170. {types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)}, // 256ko
  171. {types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks
  172. {types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0}, // short polling
  173. {types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0}, // short polling
  174. } {
  175. s, ok := qao.Attributes[string(field.name)]
  176. if !ok {
  177. errCount++
  178. s = field.sDef
  179. }
  180. n, err := strconv.Atoi(s)
  181. if err != nil {
  182. errCount++
  183. }
  184. *field.value = int64(n)
  185. }
  186. qrp, err := r.parseQueueInfoRedrivePolicies(qName, *qao)
  187. if err != nil {
  188. return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
  189. }
  190. qi = &QueueInfo{
  191. Name: qName,
  192. URL: u,
  193. Attributes: &QueueInfoAttributes{
  194. ApproximateNumberOfMessages: anom,
  195. ApproximateNumberOfMessagesDelayed: anomd,
  196. ApproximateNumberOfMessagesNotVisible: anomnv,
  197. CreatedTimestamp: created,
  198. DelaySeconds: delay,
  199. LastModifiedTimestamp: changed,
  200. MaximumMessageSize: max,
  201. MessageRetentionPeriod: retention,
  202. QueueARN: a.String(),
  203. ReceiveMessageWaitTimeSeconds: wait,
  204. VisibilityTimeout: vto,
  205. RedrivePolicy: qrp.QueueInfoAttributesRedrivePolicy,
  206. RedriveAllowPolicy: qrp.QueueInfoAttributesRedriveAllowPolicy,
  207. },
  208. }
  209. return qi, nil
  210. }
  211. func (*redriver) parseReceivedAttributes(msg types.Message) (*SystemMessageAttributes, error) {
  212. var (
  213. afrt, arc, sent int64
  214. errCount int
  215. err error
  216. )
  217. for _, field := range []struct {
  218. name types.MessageSystemAttributeName
  219. value *int64
  220. sDef string
  221. def int64
  222. }{
  223. {types.MessageSystemAttributeNameApproximateFirstReceiveTimestamp, &afrt, "0", 0},
  224. {types.MessageSystemAttributeNameApproximateReceiveCount, &arc, "0", 0},
  225. {types.MessageSystemAttributeNameSentTimestamp, &sent, "0", 0},
  226. } {
  227. s, ok := msg.Attributes[string(field.name)]
  228. if !ok {
  229. errCount++
  230. s = field.sDef
  231. }
  232. n, err := strconv.Atoi(s)
  233. if err != nil {
  234. errCount++
  235. }
  236. *field.value = int64(n)
  237. }
  238. ma := SystemMessageAttributes{
  239. ApproximateFirstReceiveTimestamp: afrt,
  240. ApproximateReceiveCount: arc,
  241. DeadLetterQueueSourceARN: msg.Attributes[string(MessageSystemAttributeNameDeadLetterQueueSourceArn)],
  242. SenderId: msg.Attributes[string(types.MessageSystemAttributeNameSenderId)],
  243. SentTimestamp: sent,
  244. }
  245. if errCount > 0 {
  246. err = fmt.Errorf("encounted %d errors parsing system message attributes", errCount)
  247. return nil, err
  248. }
  249. return &ma, nil
  250. }
  251. func (r *redriver) GetQueueItems(ctx context.Context, qName string) ([]Message, error) {
  252. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  253. qu, err := r.GetQueueUrl(ctx, qui)
  254. if err != nil {
  255. return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  256. }
  257. if qu.QueueUrl == nil {
  258. return nil, fmt.Errorf("URL for queue %q is empty", qName)
  259. }
  260. rmi := sqs.ReceiveMessageInput{
  261. QueueUrl: qu.QueueUrl,
  262. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
  263. MaxNumberOfMessages: 10,
  264. MessageAttributeNames: []string{".*"},
  265. VisibilityTimeout: 0,
  266. WaitTimeSeconds: r.Wait,
  267. }
  268. t0 := time.Now()
  269. rmo, err := r.Client.ReceiveMessage(ctx, &rmi)
  270. d := time.Since(t0)
  271. if err != nil {
  272. return nil, fmt.Errorf("after %v, failed receiving messages for queue %q: %w",
  273. d, qName, err)
  274. }
  275. ms := make([]Message, 0, len(rmo.Messages))
  276. for _, m := range rmo.Messages {
  277. ma, err := r.parseReceivedAttributes(m)
  278. if err != nil {
  279. return nil, err
  280. }
  281. for _, field := range []struct {
  282. name string
  283. value any
  284. }{
  285. {"body", m.Body},
  286. {"md5 of body", m.MD5OfBody},
  287. {"md5 of message attributes", m.MD5OfMessageAttributes},
  288. {"message ID", m.MessageId},
  289. {"receipt handle", m.ReceiptHandle},
  290. } {
  291. if field.value == nil {
  292. return nil, fmt.Errorf("missing field %s on message", field.name)
  293. }
  294. }
  295. j, err := JSONableFromMessageAttributeValues(m.MessageAttributes)
  296. if err != nil {
  297. return nil, fmt.Errorf("failed decoding message boby: %w", err)
  298. }
  299. m2 := Message{
  300. Attributes: ma,
  301. MessageAttributes: j,
  302. }
  303. for _, pair := range []struct{ dest, src *string }{
  304. {&m2.Body, m.Body},
  305. {&m2.Md5OfBody, m.MD5OfBody},
  306. {&m2.Md5OfMessageAttributes, m.MD5OfMessageAttributes},
  307. {&m2.MessageId, m.MessageId},
  308. {&m2.ReceiptHandle, m.ReceiptHandle},
  309. } {
  310. if pair.src != nil {
  311. *pair.dest = *pair.src
  312. }
  313. }
  314. ms = append(ms, m2)
  315. }
  316. return ms, err
  317. }
  318. func (r *redriver) DeleteItems(ctx context.Context, qName string, keys []ItemsKeys) error {
  319. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  320. qu, err := r.GetQueueUrl(ctx, qui)
  321. if err != nil {
  322. return fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  323. }
  324. entries := make([]types.DeleteMessageBatchRequestEntry, len(keys))
  325. for i := 0; i < len(keys); i++ {
  326. entries[i] = types.DeleteMessageBatchRequestEntry{
  327. Id: aws.String(keys[i].MessageID),
  328. ReceiptHandle: aws.String(keys[i].ReceiptHandle),
  329. }
  330. }
  331. dmi := sqs.DeleteMessageBatchInput{
  332. Entries: entries,
  333. QueueUrl: qu.QueueUrl,
  334. }
  335. dmo, err := r.DeleteMessageBatch(ctx, &dmi)
  336. if err != nil {
  337. return fmt.Errorf("failed deleting %d items from queue %q: %v",
  338. len(keys), qName, err)
  339. }
  340. if len(dmo.Failed) > 0 {
  341. errs := make([]string, len(dmo.Failed))
  342. for i, bree := range dmo.Failed {
  343. source := "aws"
  344. if bree.SenderFault {
  345. source = "redriver"
  346. }
  347. errs[i] = fmt.Sprintf("ID: %s / Failure: %s = %q / Source: %s",
  348. *bree.Id, *bree.Message, *bree.Code, source)
  349. }
  350. return fmt.Errorf("failed deleting %d items out of %d from queue %q: %s",
  351. len(dmo.Failed), len(keys), qName, strings.Join(errs, "\n"))
  352. }
  353. log.Println(dmo)
  354. return nil
  355. }
  356. func (r *redriver) Purge(ctx context.Context, qName string) error {
  357. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  358. qu, err := r.GetQueueUrl(ctx, qui)
  359. if err != nil {
  360. return fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  361. }
  362. pqi := sqs.PurgeQueueInput{QueueUrl: qu.QueueUrl}
  363. _, err = r.PurgeQueue(ctx, &pqi)
  364. if err != nil {
  365. return fmt.Errorf("failed purging queue %q: %w",
  366. qName, err)
  367. }
  368. return nil
  369. }
  370. func (r *redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
  371. // TODO implement me
  372. panic("implement me")
  373. }
  374. func RedriverService(dic *izidic.Container) (any, error) {
  375. cli := dic.MustService(services.SvcClient).(*sqs.Client)
  376. w := dic.MustParam(services.PWriter).(io.Writer)
  377. wait := int32(dic.MustParam(services.PWait).(int))
  378. return &redriver{
  379. Client: cli,
  380. Wait: wait,
  381. Writer: w,
  382. }, nil
  383. }