redriver.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. package redriver
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "strconv"
  9. "strings"
  10. "github.com/aws/aws-sdk-go-v2/aws"
  11. "github.com/aws/aws-sdk-go-v2/service/sqs"
  12. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  13. "github.com/fgm/izidic"
  14. "code.osinet.fr/fgm/sqs_demo/back/services"
  15. )
  16. const (
  17. // MessageSystemAttributeNameDeadLetterQueueSourceArn is an undocumented
  18. // types.Message attribute, used by the SQS console to support redrive.
  19. MessageSystemAttributeNameDeadLetterQueueSourceArn types.MessageSystemAttributeName = "DeadLetterQueueSourceArn"
  20. )
  21. type ItemsKeys struct {
  22. MessageID string
  23. ReceiptHandle string
  24. }
  25. // Redriver is a redrive-oriented facade in front of the sqs.Client API.
  26. type Redriver interface {
  27. ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)
  28. GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error)
  29. GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)
  30. GetQueueItems(ctx context.Context, qName string) ([]Message, error)
  31. DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
  32. Purge(ctx context.Context, qName string) error
  33. RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error
  34. }
  35. type redriver struct {
  36. Wait int32
  37. io.Writer
  38. *sqs.Client
  39. }
  40. func (r *redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
  41. lqi := &sqs.ListQueuesInput{
  42. MaxResults: aws.Int32(1000),
  43. NextToken: nil,
  44. QueueNamePrefix: aws.String(prefix),
  45. }
  46. lqo, err := r.Client.ListQueues(ctx, lqi)
  47. if err != nil {
  48. return nil, fmt.Errorf("listing queues: %w", err)
  49. }
  50. return lqo.QueueUrls, nil
  51. }
  52. func (*redriver) parseQueueInfoRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) {
  53. srap, ok := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]
  54. if !ok || srap == "" {
  55. return nil, nil
  56. }
  57. qrap = &QueueInfoAttributesRedriveAllowPolicy{}
  58. if err = json.Unmarshal([]byte(srap), qrap); err != nil {
  59. return nil, fmt.Errorf(
  60. "failed parsing redrive policy for queue %q %w", qName, err)
  61. }
  62. return qrap, nil
  63. }
  64. func (*redriver) parseQueueInfoRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) {
  65. srp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
  66. if !ok || srp == "" {
  67. return nil, nil
  68. }
  69. qrp = &QueueInfoAttributesRedrivePolicy{}
  70. if err = json.Unmarshal([]byte(srp), qrp); err != nil {
  71. return qrp, fmt.Errorf(
  72. "failed parsing redrive policy for queue %q %w", qName, err)
  73. }
  74. if _, err = URLFromARNString(qrp.DeadLetterTargetARN); err != nil {
  75. return qrp, fmt.Errorf(
  76. "failed converting queue %q ARN to URL: %w", qName, err)
  77. }
  78. return qrp, nil
  79. }
  80. func (r *redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {
  81. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  82. qu, err := r.GetQueueUrl(ctx, qui)
  83. if err != nil {
  84. return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  85. }
  86. qai := &sqs.GetQueueAttributesInput{
  87. QueueUrl: qu.QueueUrl,
  88. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy},
  89. }
  90. qao, err := r.Client.GetQueueAttributes(ctx, qai)
  91. if err != nil {
  92. return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err)
  93. }
  94. if qao == nil {
  95. return "", fmt.Errorf("redrive policy for queue %q is empty", qName)
  96. }
  97. qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao)
  98. if err != nil {
  99. return "", fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
  100. }
  101. if qrp == nil {
  102. return "", nil // Queue has no DLQ: this is not an error
  103. }
  104. qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseQueueInfoRedrivePolicy
  105. return qURL, nil
  106. }
  107. func (r *redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
  108. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  109. qu, err := r.GetQueueUrl(ctx, qui)
  110. if err != nil {
  111. return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  112. }
  113. if qu.QueueUrl == nil {
  114. return nil, fmt.Errorf("URL for queue %q is empty", qName)
  115. }
  116. qai := &sqs.GetQueueAttributesInput{
  117. QueueUrl: qu.QueueUrl,
  118. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
  119. }
  120. qao, err := r.Client.GetQueueAttributes(ctx, qai)
  121. if err != nil {
  122. return nil, fmt.Errorf("failed getting all attributes for queue %q: %w", qName, err)
  123. }
  124. if qao == nil {
  125. return nil, fmt.Errorf("no attributes returned for queue %q", qName)
  126. }
  127. qi, err := r.parseQueueInfoAttributes(qName, qu, qao)
  128. if err != nil {
  129. return nil, err
  130. }
  131. return qi, err
  132. }
  133. func (r *redriver) parseQueueInfoAttributes(qName string, qu *sqs.GetQueueUrlOutput, qao *sqs.GetQueueAttributesOutput) (*QueueInfo, error) {
  134. var (
  135. qi *QueueInfo
  136. errCount int
  137. anom, anomd, anomnv int64
  138. created, changed int64 // timestamps
  139. delay, max, retention, vto, wait int64
  140. )
  141. u := *qu.QueueUrl
  142. a, err := ARNFromURL(u)
  143. if err != nil {
  144. return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err)
  145. }
  146. for _, field := range []struct {
  147. name types.QueueAttributeName
  148. value *int64
  149. sDef string
  150. def int64
  151. }{
  152. {types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1},
  153. {types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1},
  154. {types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1},
  155. {types.QueueAttributeNameCreatedTimestamp, &created, "0", 0},
  156. {types.QueueAttributeNameDelaySeconds, &delay, "-1", -1},
  157. {types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0},
  158. {types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)}, // 256ko
  159. {types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks
  160. {types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0}, // short polling
  161. {types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0}, // short polling
  162. } {
  163. s, ok := qao.Attributes[string(field.name)]
  164. if !ok {
  165. errCount++
  166. s = field.sDef
  167. }
  168. n, err := strconv.Atoi(s)
  169. if err != nil {
  170. errCount++
  171. }
  172. *field.value = int64(n)
  173. }
  174. qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao)
  175. if err != nil {
  176. return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
  177. }
  178. qrap, err := r.parseQueueInfoRedriveAllowPolicy(qName, *qao)
  179. if err != nil {
  180. return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err)
  181. }
  182. qi = &QueueInfo{
  183. Name: qName,
  184. URL: u,
  185. Attributes: &QueueInfoAttributes{
  186. ApproximateNumberOfMessages: anom,
  187. ApproximateNumberOfMessagesDelayed: anomd,
  188. ApproximateNumberOfMessagesNotVisible: anomnv,
  189. CreatedTimestamp: created,
  190. DelaySeconds: delay,
  191. LastModifiedTimestamp: changed,
  192. MaximumMessageSize: max,
  193. MessageRetentionPeriod: retention,
  194. QueueARN: a.String(),
  195. ReceiveMessageWaitTimeSeconds: wait,
  196. VisibilityTimeout: vto,
  197. RedrivePolicy: qrp,
  198. RedriveAllowPolicy: qrap,
  199. },
  200. }
  201. return qi, nil
  202. }
  203. func (*redriver) parseReceivedAttributes(msg types.Message) (*SystemMessageAttributes, error) {
  204. var (
  205. afrt, arc, sent int64
  206. errCount int
  207. err error
  208. )
  209. for _, field := range []struct {
  210. name types.MessageSystemAttributeName
  211. value *int64
  212. sDef string
  213. def int64
  214. }{
  215. {types.MessageSystemAttributeNameApproximateFirstReceiveTimestamp, &afrt, "0", 0},
  216. {types.MessageSystemAttributeNameApproximateReceiveCount, &arc, "0", 0},
  217. {types.MessageSystemAttributeNameSentTimestamp, &sent, "0", 0},
  218. } {
  219. s, ok := msg.Attributes[string(field.name)]
  220. if !ok {
  221. errCount++
  222. s = field.sDef
  223. }
  224. n, err := strconv.Atoi(s)
  225. if err != nil {
  226. errCount++
  227. }
  228. *field.value = int64(n)
  229. }
  230. ma := SystemMessageAttributes{
  231. ApproximateFirstReceiveTimestamp: afrt,
  232. ApproximateReceiveCount: arc,
  233. DeadLetterQueueSourceARN: msg.Attributes[string(MessageSystemAttributeNameDeadLetterQueueSourceArn)],
  234. SenderId: msg.Attributes[string(types.MessageSystemAttributeNameSenderId)],
  235. SentTimestamp: sent,
  236. }
  237. if errCount > 0 {
  238. err = fmt.Errorf("encounted %d errors parsing system message attributes", errCount)
  239. return nil, err
  240. }
  241. return &ma, nil
  242. }
  243. func (r *redriver) GetQueueItems(ctx context.Context, qName string) ([]Message, error) {
  244. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  245. qu, err := r.GetQueueUrl(ctx, qui)
  246. if err != nil {
  247. return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  248. }
  249. if qu.QueueUrl == nil {
  250. return nil, fmt.Errorf("URL for queue %q is empty", qName)
  251. }
  252. rmi := sqs.ReceiveMessageInput{
  253. QueueUrl: qu.QueueUrl,
  254. AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
  255. MaxNumberOfMessages: 10,
  256. MessageAttributeNames: []string{".*"},
  257. VisibilityTimeout: 0,
  258. WaitTimeSeconds: r.Wait,
  259. }
  260. rmo, err := r.Client.ReceiveMessage(ctx, &rmi)
  261. ms := make([]Message, 0, len(rmo.Messages))
  262. for _, m := range rmo.Messages {
  263. ma, err := r.parseReceivedAttributes(m)
  264. if err != nil {
  265. return nil, err
  266. }
  267. for _, field := range []struct {
  268. name string
  269. value any
  270. }{
  271. {"body", m.Body},
  272. {"md5 of body", m.MD5OfBody},
  273. {"md5 of message attributes", m.MD5OfMessageAttributes},
  274. {"message ID", m.MessageId},
  275. {"receipt handle", m.ReceiptHandle},
  276. } {
  277. if field.value == nil {
  278. return nil, fmt.Errorf("missing field %s on message", field.name)
  279. }
  280. }
  281. j, err := JSONableFromMessageAttributeValues(m.MessageAttributes)
  282. if err != nil {
  283. return nil, fmt.Errorf("failed decoding message boby: %w", err)
  284. }
  285. m := Message{
  286. Attributes: ma,
  287. Body: *m.Body,
  288. Md5OfBody: *m.MD5OfBody,
  289. Md5OfMessageAttributes: *m.MD5OfMessageAttributes,
  290. MessageAttributes: j,
  291. MessageId: *m.MessageId,
  292. ReceiptHandle: *m.ReceiptHandle,
  293. }
  294. ms = append(ms, m)
  295. }
  296. return ms, err
  297. }
  298. func (r *redriver) DeleteItems(ctx context.Context, qName string, keys []ItemsKeys) error {
  299. qui := &sqs.GetQueueUrlInput{QueueName: &qName}
  300. qu, err := r.GetQueueUrl(ctx, qui)
  301. if err != nil {
  302. return fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
  303. }
  304. entries := make([]types.DeleteMessageBatchRequestEntry, len(keys))
  305. for i := 0; i < len(keys); i++ {
  306. entries[i] = types.DeleteMessageBatchRequestEntry{
  307. Id: aws.String(keys[i].MessageID),
  308. ReceiptHandle: aws.String(keys[i].ReceiptHandle),
  309. }
  310. }
  311. dmi := sqs.DeleteMessageBatchInput{
  312. Entries: entries,
  313. QueueUrl: qu.QueueUrl,
  314. }
  315. dmo, err := r.DeleteMessageBatch(ctx, &dmi)
  316. if err != nil {
  317. return fmt.Errorf("failed deleting %d items from queue %q: %v",
  318. len(keys), qName, err)
  319. }
  320. if len(dmo.Failed) > 0 {
  321. errs := make([]string, len(dmo.Failed))
  322. for i, bree := range dmo.Failed {
  323. source := "aws"
  324. if bree.SenderFault {
  325. source = "redriver"
  326. }
  327. errs[i] = fmt.Sprintf("ID: %s / Failure: %s = %q / Source: %s",
  328. *bree.Id, *bree.Message, *bree.Code, source)
  329. }
  330. return fmt.Errorf("failed deleting %d items out of %d from queue %q: %s",
  331. len(dmo.Failed), len(keys), qName, strings.Join(errs, "\n"))
  332. }
  333. log.Println(dmo)
  334. return nil
  335. }
  336. func (r *redriver) Purge(ctx context.Context, qName string) error {
  337. // TODO implement me
  338. panic("implement me")
  339. }
  340. func (r *redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
  341. // TODO implement me
  342. panic("implement me")
  343. }
  344. func RedriverService(dic *izidic.Container) (any, error) {
  345. cli := dic.MustService(services.SvcClient).(*sqs.Client)
  346. w := dic.MustParam(services.PWriter).(io.Writer)
  347. wait := int32(dic.MustParam(services.PWait).(int))
  348. return &redriver{
  349. Wait: wait,
  350. Writer: w,
  351. Client: cli,
  352. }, nil
  353. }