consumer.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package main
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "strconv"
  11. "time"
  12. "github.com/aws/aws-sdk-go-v2/aws"
  13. "github.com/aws/aws-sdk-go-v2/service/sqs"
  14. "github.com/aws/aws-sdk-go-v2/service/sqs/types"
  15. "github.com/davecgh/go-spew/spew"
  16. "github.com/google/uuid"
  17. "gopkg.in/yaml.v2"
  18. )
  19. func lister(ctx context.Context, w io.Writer, client *sqs.Client) string {
  20. lqo, err := client.ListQueues(ctx, &sqs.ListQueuesInput{
  21. MaxResults: aws.Int32(10),
  22. NextToken: nil,
  23. QueueNamePrefix: aws.String(""),
  24. })
  25. if err != nil {
  26. log.Fatalf("failed listing queues: %v", err)
  27. }
  28. y := yaml.NewEncoder(w)
  29. y.Encode(lqo.QueueUrls)
  30. return lqo.QueueUrls[0]
  31. }
  32. func receiver(ctx context.Context, w io.Writer, client *sqs.Client, qURL string) {
  33. rmi := sqs.ReceiveMessageInput{
  34. QueueUrl: &qURL,
  35. AttributeNames: []types.QueueAttributeName{"All"},
  36. MessageAttributeNames: []string{"All"},
  37. VisibilityTimeout: 0,
  38. WaitTimeSeconds: 0,
  39. }
  40. msg, err := client.ReceiveMessage(ctx, &rmi)
  41. if err != nil {
  42. log.Fatalf("failed receiving from queue %s: %v", err)
  43. }
  44. spew.Fdump(w, msg.Messages)
  45. }
  46. type Handler func(ctx context.Context, enc *json.Encoder, input []byte) error
  47. type message types.Message
  48. func (m message) String() string {
  49. if m.MessageId == nil {
  50. return "0"
  51. }
  52. return *m.MessageId
  53. }
  54. func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.Client, qURL string, hdl Handler) error {
  55. rmi := sqs.ReceiveMessageInput{
  56. QueueUrl: &qURL,
  57. AttributeNames: []types.QueueAttributeName{"All"},
  58. MessageAttributeNames: []string{"All"},
  59. MaxNumberOfMessages: 1, // Default, also used when set to 0
  60. VisibilityTimeout: 1,
  61. WaitTimeSeconds: 5,
  62. }
  63. for {
  64. recv, err := client.ReceiveMessage(ctx, &rmi)
  65. if err != nil {
  66. return fmt.Errorf("failed receiving from queue: %w, aborting", err)
  67. }
  68. if len(recv.Messages) == 0 {
  69. fmt.Fprintf(w, "No message with %d seconds timeout\n", rmi.WaitTimeSeconds)
  70. continue
  71. }
  72. if len(recv.Messages) != 1 {
  73. return fmt.Errorf("unexpected number of messages: %d, expected 0 or 1, aborting", len(recv.Messages))
  74. }
  75. msg := message(recv.Messages[0])
  76. evt, err := validateMessage(msg)
  77. if err != nil {
  78. fmt.Fprintf(w, "invalid message %s: %w, dropping it anyway", msg, err)
  79. } else {
  80. if err := hdl(ctx, enc, evt.Body); err != nil {
  81. fmt.Fprintf(w, "Error processing message: %s: %v, dropping it anyway\n", msg, err)
  82. } else {
  83. fmt.Fprintf(w, "Message %s processed successfully\n", msg)
  84. }
  85. }
  86. dmi := sqs.DeleteMessageInput{
  87. QueueUrl: &qURL,
  88. ReceiptHandle: msg.ReceiptHandle,
  89. }
  90. _, err = client.DeleteMessage(ctx, &dmi)
  91. if err != nil {
  92. fmt.Fprintf(w, "Error deleting message %s after successful processing: %v\n", msg, err)
  93. continue
  94. }
  95. fmt.Fprintf(w, "Deleted processed message %s\n", msg)
  96. }
  97. }
  98. type EventAttributes struct {
  99. SenderID string `json:"SenderId"`
  100. SentTime time.Time `json:"SentTimestamp"`
  101. ApproximateReceiveCount int `json:"ApproximateReceiveCount"`
  102. ApproximateFirstReceiveTimestamp time.Time `json:"ApproximateFirstReceiveTimestamp"`
  103. }
  104. type Event struct {
  105. MessageID uuid.UUID `json:"MessageId"`
  106. BodySum string `json:"MD5OfBody"`
  107. AttrSum string `json:"MD5MofMessageAttributes"`
  108. EventAttributes
  109. Body []byte
  110. }
  111. func validateMessage(msg message) (*Event, error) {
  112. var (
  113. err error
  114. evt = Event{EventAttributes: EventAttributes{SenderID: msg.Attributes["SenderId"]}}
  115. )
  116. // Top-level fields
  117. if msg.MessageId == nil {
  118. return nil, fmt.Errorf("error: MessageId is nil")
  119. }
  120. evt.MessageID, err = uuid.Parse(*msg.MessageId)
  121. if err != nil {
  122. return nil, fmt.Errorf("error parsing MessageId as a UUID: %w", err)
  123. }
  124. if msg.MD5OfBody == nil {
  125. return nil, fmt.Errorf("error: MD5OfBody is nil")
  126. }
  127. evt.BodySum = *msg.MD5OfBody
  128. if msg.MD5OfMessageAttributes != nil {
  129. evt.AttrSum = *msg.MD5OfMessageAttributes
  130. }
  131. // EventAttributes fields
  132. msec, err := strconv.Atoi(msg.Attributes["SentTimestamp"])
  133. if err != nil {
  134. return nil, fmt.Errorf("error parsing SentTimestamp as milliseconds: %w", err)
  135. }
  136. evt.EventAttributes.SentTime = time.Unix(int64(msec)/1000, int64(msec)%1000)
  137. evt.EventAttributes.ApproximateReceiveCount, err = strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
  138. if err != nil {
  139. return nil, fmt.Errorf("error parsing ApproximateReceiveCount: %w", err)
  140. }
  141. msec, err = strconv.Atoi(msg.Attributes["ApproximateFirstReceiveTimestamp"])
  142. if err != nil {
  143. return nil, fmt.Errorf("error parsing ApproximateFirstReceiveTimestam as milliseconds: %w", err)
  144. }
  145. evt.EventAttributes.ApproximateFirstReceiveTimestamp = time.Unix(int64(msec)/1000, int64(msec)*1000)
  146. // EventBody field
  147. if msg.Body == nil {
  148. return nil, fmt.Errorf("message body is nil")
  149. }
  150. body := *msg.Body
  151. bs, err := hex.DecodeString(evt.BodySum)
  152. if err != nil {
  153. return nil, fmt.Errorf("error parsy body sum as a hex string: %w", err)
  154. }
  155. expected := *(*[16]byte)(bs)
  156. actual := md5.Sum([]byte(body))
  157. if actual != expected {
  158. return nil, fmt.Errorf("error parsing body sum as a MD5 sum")
  159. }
  160. evt.Body = []byte(body)
  161. return &evt, nil
  162. }