123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package services
- import (
- "context"
- "crypto/md5"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "strconv"
- "time"
- "github.com/aws/aws-sdk-go-v2/service/sqs"
- "github.com/aws/aws-sdk-go-v2/service/sqs/types"
- "github.com/davecgh/go-spew/spew"
- "github.com/fgm/izidic"
- "github.com/google/uuid"
- )
- type Event struct {
- MessageID uuid.UUID `json:"MessageId"`
- BodySum string `json:"MD5OfBody"`
- AttrSum string `json:"MD5MofMessageAttributes"`
- EventAttributes
- MessageAttributes map[string]types.MessageAttributeValue
- Body []byte
- }
- func (e Event) IsRetryable() bool {
- maybeRetry, ok := e.MessageAttributes["retry"]
- if !ok {
- return false
- }
- if maybeRetry.DataType == nil || *maybeRetry.DataType != "String" || maybeRetry.StringValue == nil {
- return false
- }
- return *maybeRetry.StringValue == "1"
- }
- type EventAttributes struct {
- SenderID string `json:"SenderId"`
- SentTime time.Time `json:"SentTimestamp"`
- ApproximateReceiveCount int `json:"ApproximateReceiveCount"`
- ApproximateFirstReceiveTimestamp time.Time `json:"ApproximateFirstReceiveTimestamp"`
- }
- type Handler func(ctx context.Context, enc *json.Encoder, msgID uuid.UUID, sent time.Time, input []byte, meta map[string]types.MessageAttributeValue) error
- type message types.Message
- func (m message) String() string {
- if m.MessageId == nil {
- return "0"
- }
- return *m.MessageId
- }
- func ConsumerService(dic *izidic.Container) (any, error) {
- cli := dic.MustService("sqs").(*sqs.Client)
- w := dic.MustParam(PWriter).(io.Writer)
- hdl := dic.MustParam(PHandler).(Handler)
- enc := json.NewEncoder(w)
- return func(ctx context.Context, qURL string) error {
- return consumeMessage(ctx, w, enc, cli, qURL, hdl)
- }, nil
- }
- func ReceiverService(dic *izidic.Container) (any, error) {
- cli := dic.MustService("sqs").(*sqs.Client)
- w := dic.MustParam(PWriter).(io.Writer)
- return func(ctx context.Context, qURL string) {
- receiveMessage(ctx, w, cli, qURL)
- }, nil
- }
- func consumeMessage(ctx context.Context, _ io.Writer, enc *json.Encoder, client *sqs.Client, qURL string, hdl Handler) error {
- rmi := sqs.ReceiveMessageInput{
- QueueUrl: &qURL,
- AttributeNames: []types.QueueAttributeName{"All"},
- MessageAttributeNames: []string{"All"},
- MaxNumberOfMessages: 1, // Default, also used when set to 0
- VisibilityTimeout: 1,
- WaitTimeSeconds: 5,
- }
- for {
- recv, err := client.ReceiveMessage(ctx, &rmi)
- if err != nil {
- return fmt.Errorf("failed receiving from queue: %w, aborting", err)
- }
- if len(recv.Messages) == 0 {
- log.Printf("No message with %d seconds timeout\n", rmi.WaitTimeSeconds)
- continue
- }
- if len(recv.Messages) != 1 {
- return fmt.Errorf("unexpected number of messages: %d, expected 0 or 1, aborting", len(recv.Messages))
- }
- msg := message(recv.Messages[0])
- evt, err := validateMessage(msg)
- if err != nil {
- log.Printf("invalid message %s: %v, dropping it anyway", msg, err)
- } else {
- if err := hdl(ctx, enc, evt.MessageID, evt.SentTime, evt.Body, evt.MessageAttributes); err != nil {
- log.Printf("message %s failed processing : %v, dropping it anyway\n", msg, err)
- } else {
- log.Printf("message %s processed successfully\n", msg)
- }
- }
- if evt.IsRetryable() {
- log.Printf("message %s not deleted, for retry", msg)
- } else {
- dmi := sqs.DeleteMessageInput{
- QueueUrl: &qURL,
- ReceiptHandle: msg.ReceiptHandle,
- }
- _, err = client.DeleteMessage(ctx, &dmi)
- if err != nil {
- log.Printf("Error deleting message %s after successful processing: %v\n", msg, err)
- continue
- }
- log.Printf("message %s deleted after processing\n", msg)
- }
- }
- }
- func receiveMessage(ctx context.Context, w io.Writer, client *sqs.Client, qURL string) {
- rmi := sqs.ReceiveMessageInput{
- QueueUrl: &qURL,
- AttributeNames: []types.QueueAttributeName{"All"},
- MessageAttributeNames: []string{"All"},
- VisibilityTimeout: 1,
- WaitTimeSeconds: 5,
- }
- msg, err := client.ReceiveMessage(ctx, &rmi)
- if err != nil {
- log.Fatalf("failed receiving from queue %s: %v", qURL, err)
- }
- spew.Fdump(w, msg.Messages)
- }
- func validateMessage(msg message) (*Event, error) {
- var (
- err error
- evt = Event{EventAttributes: EventAttributes{SenderID: msg.Attributes["SenderId"]}}
- )
- // Top-level fields
- if msg.MessageId == nil {
- return nil, fmt.Errorf("error: MessageId is nil")
- }
- evt.MessageID, err = uuid.Parse(*msg.MessageId)
- if err != nil {
- return nil, fmt.Errorf("error parsing MessageId as a UUID: %w", err)
- }
- if msg.MD5OfBody == nil {
- return nil, fmt.Errorf("error: MD5OfBody is nil")
- }
- evt.BodySum = *msg.MD5OfBody
- if msg.MD5OfMessageAttributes != nil {
- evt.AttrSum = *msg.MD5OfMessageAttributes
- }
- // EventAttributes fields
- msec, err := strconv.Atoi(msg.Attributes["SentTimestamp"])
- if err != nil {
- return nil, fmt.Errorf("error parsing SentTimestamp as milliseconds: %w", err)
- }
- evt.EventAttributes.SentTime = time.Unix(int64(msec)/1000, int64(msec)%1000)
- evt.EventAttributes.ApproximateReceiveCount, err = strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
- if err != nil {
- return nil, fmt.Errorf("error parsing ApproximateReceiveCount: %w", err)
- }
- msec, err = strconv.Atoi(msg.Attributes["ApproximateFirstReceiveTimestamp"])
- if err != nil {
- return nil, fmt.Errorf("error parsing ApproximateFirstReceiveTimestam as milliseconds: %w", err)
- }
- evt.EventAttributes.ApproximateFirstReceiveTimestamp = time.Unix(int64(msec)/1000, int64(msec)*1000)
- evt.MessageAttributes = msg.MessageAttributes
- // EventBody field
- if msg.Body == nil {
- return nil, fmt.Errorf("message body is nil")
- }
- body := *msg.Body
- bs, err := hex.DecodeString(evt.BodySum)
- if err != nil {
- return nil, fmt.Errorf("error parsy body sum as a hex string: %w", err)
- }
- expected := *(*[16]byte)(bs)
- actual := md5.Sum([]byte(body))
- if actual != expected {
- return nil, fmt.Errorf("error parsing body sum as a MD5 sum")
- }
- evt.Body = []byte(body)
- return &evt, nil
- }
|