| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 | package mainimport (	"context"	"crypto/md5"	"encoding/hex"	"encoding/json"	"fmt"	"io"	"log"	"strconv"	"time"	"github.com/aws/aws-sdk-go-v2/aws"	"github.com/aws/aws-sdk-go-v2/service/sqs"	"github.com/aws/aws-sdk-go-v2/service/sqs/types"	"github.com/davecgh/go-spew/spew"	"github.com/google/uuid"	"gopkg.in/yaml.v2")func lister(ctx context.Context, w io.Writer, client *sqs.Client) string {	lqo, err := client.ListQueues(ctx, &sqs.ListQueuesInput{		MaxResults:      aws.Int32(10),		NextToken:       nil,		QueueNamePrefix: aws.String(""),	})	if err != nil {		log.Fatalf("failed listing queues: %v", err)	}	y := yaml.NewEncoder(w)	y.Encode(lqo.QueueUrls)	return lqo.QueueUrls[0]}func receiver(ctx context.Context, w io.Writer, client *sqs.Client, qURL string) {	rmi := sqs.ReceiveMessageInput{		QueueUrl:              &qURL,		AttributeNames:        []types.QueueAttributeName{"All"},		MessageAttributeNames: []string{"All"},		VisibilityTimeout:     0,		WaitTimeSeconds:       0,	}	msg, err := client.ReceiveMessage(ctx, &rmi)	if err != nil {		log.Fatalf("failed receiving from queue %s: %v", err)	}	spew.Fdump(w, msg.Messages)}type Handler func(ctx context.Context, enc *json.Encoder, input []byte) errortype message types.Messagefunc (m message) String() string {	if m.MessageId == nil {		return "0"	}	return *m.MessageId}func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.Client, qURL string, hdl Handler) error {	rmi := sqs.ReceiveMessageInput{		QueueUrl:              &qURL,		AttributeNames:        []types.QueueAttributeName{"All"},		MessageAttributeNames: []string{"All"},		MaxNumberOfMessages:   1, // Default, also used when set to 0		VisibilityTimeout:     1,		WaitTimeSeconds:       5,	}	for {		recv, err := client.ReceiveMessage(ctx, &rmi)		if err != nil {			return fmt.Errorf("failed receiving from queue: %w, aborting", err)		}		if len(recv.Messages) == 0 {			fmt.Fprintf(w, "No message with %d seconds timeout\n", rmi.WaitTimeSeconds)			continue		}		if len(recv.Messages) != 1 {			return fmt.Errorf("unexpected number of messages: %d, expected 0 or 1, aborting", len(recv.Messages))		}		msg := message(recv.Messages[0])		evt, err := validateMessage(msg)		if err != nil {			fmt.Fprintf(w, "invalid message %s: %w, dropping it anyway", msg, err)		} else {			if err := hdl(ctx, enc, evt.Body); err != nil {				fmt.Fprintf(w, "Error processing message: %s: %v, dropping it anyway\n", msg, err)			} else {				fmt.Fprintf(w, "Message %s processed successfully\n", msg)			}		}		dmi := sqs.DeleteMessageInput{			QueueUrl:      &qURL,			ReceiptHandle: msg.ReceiptHandle,		}		_, err = client.DeleteMessage(ctx, &dmi)		if err != nil {			fmt.Fprintf(w, "Error deleting message %s after successful processing: %v\n", msg, err)			continue		}		fmt.Fprintf(w, "Deleted processed message %s\n", msg)	}}type EventAttributes struct {	SenderID                         string    `json:"SenderId"`	SentTime                         time.Time `json:"SentTimestamp"`	ApproximateReceiveCount          int       `json:"ApproximateReceiveCount"`	ApproximateFirstReceiveTimestamp time.Time `json:"ApproximateFirstReceiveTimestamp"`}type Event struct {	MessageID uuid.UUID `json:"MessageId"`	BodySum   string    `json:"MD5OfBody"`	AttrSum   string    `json:"MD5MofMessageAttributes"`	EventAttributes	Body []byte}func validateMessage(msg message) (*Event, error) {	var (		err error		evt = Event{EventAttributes: EventAttributes{SenderID: msg.Attributes["SenderId"]}}	)	// Top-level fields	if msg.MessageId == nil {		return nil, fmt.Errorf("error: MessageId is nil")	}	evt.MessageID, err = uuid.Parse(*msg.MessageId)	if err != nil {		return nil, fmt.Errorf("error parsing MessageId as a UUID: %w", err)	}	if msg.MD5OfBody == nil {		return nil, fmt.Errorf("error: MD5OfBody is nil")	}	evt.BodySum = *msg.MD5OfBody	if msg.MD5OfMessageAttributes != nil {		evt.AttrSum = *msg.MD5OfMessageAttributes	}	// EventAttributes fields	msec, err := strconv.Atoi(msg.Attributes["SentTimestamp"])	if err != nil {		return nil, fmt.Errorf("error parsing SentTimestamp as milliseconds: %w", err)	}	evt.EventAttributes.SentTime = time.Unix(int64(msec)/1000, int64(msec)%1000)	evt.EventAttributes.ApproximateReceiveCount, err = strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])	if err != nil {		return nil, fmt.Errorf("error parsing ApproximateReceiveCount: %w", err)	}	msec, err = strconv.Atoi(msg.Attributes["ApproximateFirstReceiveTimestamp"])	if err != nil {		return nil, fmt.Errorf("error parsing ApproximateFirstReceiveTimestam as milliseconds: %w", err)	}	evt.EventAttributes.ApproximateFirstReceiveTimestamp = time.Unix(int64(msec)/1000, int64(msec)*1000)	// EventBody field	if msg.Body == nil {		return nil, fmt.Errorf("message body is nil")	}	body := *msg.Body	bs, err := hex.DecodeString(evt.BodySum)	if err != nil {		return nil, fmt.Errorf("error parsy body sum as a hex string: %w", err)	}	expected := *(*[16]byte)(bs)	actual := md5.Sum([]byte(body))	if actual != expected {		return nil, fmt.Errorf("error parsing body sum as a MD5 sum")	}	evt.Body = []byte(body)	return &evt, nil}
 |