package main import ( "context" "crypto/md5" "encoding/hex" "encoding/json" "fmt" "io" "log" "strconv" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/davecgh/go-spew/spew" "github.com/google/uuid" "gopkg.in/yaml.v2" ) func lister(ctx context.Context, w io.Writer, client *sqs.Client) string { lqo, err := client.ListQueues(ctx, &sqs.ListQueuesInput{ MaxResults: aws.Int32(10), NextToken: nil, QueueNamePrefix: aws.String(""), }) if err != nil { log.Fatalf("failed listing queues: %v", err) } y := yaml.NewEncoder(w) y.Encode(lqo.QueueUrls) return lqo.QueueUrls[0] } func receiver(ctx context.Context, w io.Writer, client *sqs.Client, qURL string) { rmi := sqs.ReceiveMessageInput{ QueueUrl: &qURL, AttributeNames: []types.QueueAttributeName{"All"}, MessageAttributeNames: []string{"All"}, VisibilityTimeout: 0, WaitTimeSeconds: 0, } msg, err := client.ReceiveMessage(ctx, &rmi) if err != nil { log.Fatalf("failed receiving from queue %s: %v", err) } spew.Fdump(w, msg.Messages) } type Handler func(ctx context.Context, enc *json.Encoder, input []byte) error type message types.Message func (m message) String() string { if m.MessageId == nil { return "0" } return *m.MessageId } func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.Client, qURL string, hdl Handler) error { rmi := sqs.ReceiveMessageInput{ QueueUrl: &qURL, AttributeNames: []types.QueueAttributeName{"All"}, MessageAttributeNames: []string{"All"}, MaxNumberOfMessages: 1, // Default, also used when set to 0 VisibilityTimeout: 1, WaitTimeSeconds: 5, } for { recv, err := client.ReceiveMessage(ctx, &rmi) if err != nil { return fmt.Errorf("failed receiving from queue: %w, aborting", err) } if len(recv.Messages) == 0 { fmt.Fprintf(w, "No message with %d seconds timeout\n", rmi.WaitTimeSeconds) continue } if len(recv.Messages) != 1 { return fmt.Errorf("unexpected number of messages: %d, expected 0 or 1, aborting", len(recv.Messages)) } msg := message(recv.Messages[0]) evt, err := validateMessage(msg) if err != nil { fmt.Fprintf(w, "invalid message %s: %w, dropping it anyway", msg, err) } else { if err := hdl(ctx, enc, evt.Body); err != nil { fmt.Fprintf(w, "Error processing message: %s: %v, dropping it anyway\n", msg, err) } else { fmt.Fprintf(w, "Message %s processed successfully\n", msg) } } dmi := sqs.DeleteMessageInput{ QueueUrl: &qURL, ReceiptHandle: msg.ReceiptHandle, } _, err = client.DeleteMessage(ctx, &dmi) if err != nil { fmt.Fprintf(w, "Error deleting message %s after successful processing: %v\n", msg, err) continue } fmt.Fprintf(w, "Deleted processed message %s\n", msg) } } type EventAttributes struct { SenderID string `json:"SenderId"` SentTime time.Time `json:"SentTimestamp"` ApproximateReceiveCount int `json:"ApproximateReceiveCount"` ApproximateFirstReceiveTimestamp time.Time `json:"ApproximateFirstReceiveTimestamp"` } type Event struct { MessageID uuid.UUID `json:"MessageId"` BodySum string `json:"MD5OfBody"` AttrSum string `json:"MD5MofMessageAttributes"` EventAttributes Body []byte } func validateMessage(msg message) (*Event, error) { var ( err error evt = Event{EventAttributes: EventAttributes{SenderID: msg.Attributes["SenderId"]}} ) // Top-level fields if msg.MessageId == nil { return nil, fmt.Errorf("error: MessageId is nil") } evt.MessageID, err = uuid.Parse(*msg.MessageId) if err != nil { return nil, fmt.Errorf("error parsing MessageId as a UUID: %w", err) } if msg.MD5OfBody == nil { return nil, fmt.Errorf("error: MD5OfBody is nil") } evt.BodySum = *msg.MD5OfBody if msg.MD5OfMessageAttributes != nil { evt.AttrSum = *msg.MD5OfMessageAttributes } // EventAttributes fields msec, err := strconv.Atoi(msg.Attributes["SentTimestamp"]) if err != nil { return nil, fmt.Errorf("error parsing SentTimestamp as milliseconds: %w", err) } evt.EventAttributes.SentTime = time.Unix(int64(msec)/1000, int64(msec)%1000) evt.EventAttributes.ApproximateReceiveCount, err = strconv.Atoi(msg.Attributes["ApproximateReceiveCount"]) if err != nil { return nil, fmt.Errorf("error parsing ApproximateReceiveCount: %w", err) } msec, err = strconv.Atoi(msg.Attributes["ApproximateFirstReceiveTimestamp"]) if err != nil { return nil, fmt.Errorf("error parsing ApproximateFirstReceiveTimestam as milliseconds: %w", err) } evt.EventAttributes.ApproximateFirstReceiveTimestamp = time.Unix(int64(msec)/1000, int64(msec)*1000) // EventBody field if msg.Body == nil { return nil, fmt.Errorf("message body is nil") } body := *msg.Body bs, err := hex.DecodeString(evt.BodySum) if err != nil { return nil, fmt.Errorf("error parsy body sum as a hex string: %w", err) } expected := *(*[16]byte)(bs) actual := md5.Sum([]byte(body)) if actual != expected { return nil, fmt.Errorf("error parsing body sum as a MD5 sum") } evt.Body = []byte(body) return &evt, nil }