Browse Source

Implement a working consumer with SQS-independent handlers.

Frédéric G. Marand 2 years ago
parent
commit
ad23ea157e
8 changed files with 226 additions and 14 deletions
  1. 3 0
      Makefile
  2. 9 2
      README.md
  3. 142 8
      consumer.go
  4. 32 4
      demo.go
  5. 1 0
      go.mod
  6. 2 0
      go.sum
  7. 26 0
      handler.go
  8. 11 0
      producer.go

+ 3 - 0
Makefile

@@ -21,6 +21,9 @@ configure:
 create-queue: configure
 	$(SQS) create-queue --queue-name $(QNAME) --output table
 
+lint:
+	staticcheck
+
 receive:
 	$(SQS) receive-message $(SQSOPTS) --output json
 

+ 9 - 2
README.md

@@ -37,8 +37,15 @@ localstack_main  | 2023-01-04T06:36:30.140  INFO --- [-functhread5] hypercorn.er
 
 - on another terminal tab
   - create a queue: `make create-queue`
-  - send a message: `make send`
-  - list queues from Go and receive a message: `go run .`
+  - send messages: `make send`, which can be used repeatedly
+  - list queues from Go and receive messages: `go run .`
+
+## Modifying the consumer
+
+The actual consumer callback logic is in the SQS-unaware function `HandleDummy` in `handler.go`.
+
+Feel free to modify it for other data types.
+
 
 ## Cleaning up
 

+ 142 - 8
consumer.go

@@ -2,13 +2,20 @@ package main
 
 import (
 	"context"
+	"crypto/md5"
+	"encoding/hex"
+	"encoding/json"
+	"fmt"
 	"io"
 	"log"
+	"strconv"
+	"time"
 
 	"github.com/aws/aws-sdk-go-v2/aws"
 	"github.com/aws/aws-sdk-go-v2/service/sqs"
 	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
 	"github.com/davecgh/go-spew/spew"
+	"github.com/google/uuid"
 	"gopkg.in/yaml.v2"
 )
 
@@ -28,17 +35,144 @@ func lister(ctx context.Context, w io.Writer, client *sqs.Client) string {
 
 func receiver(ctx context.Context, w io.Writer, client *sqs.Client, qURL string) {
 	rmi := sqs.ReceiveMessageInput{
-		QueueUrl:                &qURL,
-		AttributeNames:          []types.QueueAttributeName{"All"},
-		MaxNumberOfMessages:     0,
-		MessageAttributeNames:   nil,
-		ReceiveRequestAttemptId: nil,
-		VisibilityTimeout:       0,
-		WaitTimeSeconds:         0,
+		QueueUrl:              &qURL,
+		AttributeNames:        []types.QueueAttributeName{"All"},
+		MessageAttributeNames: []string{"All"},
+		VisibilityTimeout:     0,
+		WaitTimeSeconds:       0,
 	}
 	msg, err := client.ReceiveMessage(ctx, &rmi)
 	if err != nil {
 		log.Fatalf("failed receiving from queue %s: %v", err)
 	}
-	spew.Dump(msg.Messages)
+	spew.Fdump(w, msg.Messages)
+}
+
+type Handler func(ctx context.Context, enc *json.Encoder, input []byte) error
+
+type message types.Message
+
+func (m message) String() string {
+	if m.MessageId == nil {
+		return "0"
+	}
+	return *m.MessageId
+}
+
+func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.Client, qURL string, hdl Handler) error {
+	rmi := sqs.ReceiveMessageInput{
+		QueueUrl:              &qURL,
+		AttributeNames:        []types.QueueAttributeName{"All"},
+		MessageAttributeNames: []string{"All"},
+		MaxNumberOfMessages:   1, // Default, also used when set to 0
+		VisibilityTimeout:     1,
+		WaitTimeSeconds:       5,
+	}
+
+	for {
+		recv, err := client.ReceiveMessage(ctx, &rmi)
+		if err != nil {
+			return fmt.Errorf("failed receiving from queue: %v, aborting", err)
+		}
+		if len(recv.Messages) == 0 {
+			fmt.Fprintf(w, "No message with %d seconds timeout\n", rmi.WaitTimeSeconds)
+			continue
+		}
+		if len(recv.Messages) != 1 {
+			return fmt.Errorf("unexpected number of messages: %d, expected 0 or 1, aborting", len(recv.Messages))
+		}
+		msg := message(recv.Messages[0])
+		evt, err := validateMessage(msg)
+		if err != nil {
+			fmt.Fprintf(w, "invalid message %s: %w, dropping it anyway", msg, err)
+		} else {
+			if err := hdl(ctx, enc, evt.Body); err != nil {
+				fmt.Fprintf(w, "Error processing message: %s: %v, dropping it anyway\n", msg, err)
+			} else {
+				fmt.Fprintf(w, "Message %s processed successfully\n", msg)
+			}
+		}
+		dmi := sqs.DeleteMessageInput{
+			QueueUrl:      &qURL,
+			ReceiptHandle: msg.ReceiptHandle,
+		}
+		_, err = client.DeleteMessage(ctx, &dmi)
+		if err != nil {
+			fmt.Fprintf(w, "Error deleting message %s after successful processing: %v\n", msg, err)
+			continue
+		}
+		fmt.Fprintf(w, "Deleted processed message %s\n", msg)
+	}
+}
+
+type EventAttributes struct {
+	SenderID                         string    `json:"SenderId"`
+	SentTime                         time.Time `json:"SentTimestamp"`
+	ApproximateReceiveCount          int       `json:"ApproximateReceiveCount"`
+	ApproximateFirstReceiveTimestamp time.Time `json:"ApproximateFirstReceiveTimestamp"`
+}
+type Event struct {
+	MessageID uuid.UUID `json:"MessageId"`
+	BodySum   string    `json:"MD5OfBody"`
+	AttrSum   string    `json:"MD5MofMessageAttributes"`
+
+	EventAttributes
+	Body []byte
+}
+
+func validateMessage(msg message) (*Event, error) {
+	var (
+		err error
+		evt = Event{EventAttributes: EventAttributes{SenderID: msg.Attributes["SenderId"]}}
+	)
+
+	// Top-level fields
+	if msg.MessageId == nil {
+		return nil, fmt.Errorf("error: MessageId is nil")
+	}
+	evt.MessageID, err = uuid.Parse(*msg.MessageId)
+	if err != nil {
+		return nil, fmt.Errorf("error parsing MessageId as a UUID: %v", err)
+	}
+	if msg.MD5OfBody == nil {
+		return nil, fmt.Errorf("error: MD5OfBody is nil")
+	}
+	evt.BodySum = *msg.MD5OfBody
+	if msg.MD5OfMessageAttributes != nil {
+		evt.AttrSum = *msg.MD5OfMessageAttributes
+	}
+
+	// EventAttributes fields
+	msec, err := strconv.Atoi(msg.Attributes["SentTimestamp"])
+	if err != nil {
+		return nil, fmt.Errorf("error parsing SentTimestamp as milliseconds: %w", err)
+	}
+	evt.EventAttributes.SentTime = time.Unix(int64(msec)/1000, int64(msec)%1000)
+
+	evt.EventAttributes.ApproximateReceiveCount, err = strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
+	if err != nil {
+		return nil, fmt.Errorf("error parsing ApproximateReceiveCount: %w", err)
+	}
+	msec, err = strconv.Atoi(msg.Attributes["ApproximateFirstReceiveTimestamp"])
+	if err != nil {
+		return nil, fmt.Errorf("error parsing ApproximateFirstReceiveTimestam as milliseconds: %w", err)
+	}
+	evt.EventAttributes.ApproximateFirstReceiveTimestamp = time.Unix(int64(msec)/1000, int64(msec)*1000)
+
+	// EventBody field
+	if msg.Body == nil {
+		return nil, fmt.Errorf("message body is nil")
+	}
+	body := *msg.Body
+	bs, err := hex.DecodeString(evt.BodySum)
+	if err != nil {
+		return nil, fmt.Errorf("error parsy body sum as a hex string: %w", err)
+	}
+	expected := *(*[16]byte)(bs)
+	actual := md5.Sum([]byte(body))
+	if actual != expected {
+		return nil, fmt.Errorf("error parsing body sum as a MD5 sum")
+	}
+	evt.Body = []byte(body)
+	return &evt, nil
 }

+ 32 - 4
demo.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"context"
+	"encoding/json"
 	"flag"
 	"fmt"
 	"io"
@@ -21,9 +22,15 @@ func main2(w io.Writer, name string, args []string) {
 	ctx := context.Background()
 	dic := resolve(w, name, args)
 	lister := dic.MustService("lister").(func(ctx context.Context) string)
-	receiver := dic.MustService("receiver").(func(ctx context.Context, qName string))
-	qName := lister(ctx)
-	receiver(ctx, qName)
+	receiver := dic.MustService("receiver").(func(ctx context.Context, qURL string))
+	consumer := dic.MustService("consumer").(func(ctx context.Context, qURL string) error)
+	qURL := lister(ctx)
+	receiver(ctx, qURL)
+	err := consumer(ctx, qURL)
+	if err != nil {
+		log.Fatalf("error in consumer: %v, aborting", err)
+	}
+	log.Printf("exiting cleanly")
 }
 
 func resolve(w io.Writer, name string, args []string) *izidic.Container {
@@ -31,11 +38,14 @@ func resolve(w io.Writer, name string, args []string) *izidic.Container {
 	dic.Store("name", name)
 	dic.Store("args", args)
 	dic.Store("writer", w)
+	dic.Store("handler", Handler(HandleDummy))
 	dic.Register("flags", flagsService)
 	dic.Register("logger", loggerService)
 	dic.Register("sqs", sqsClientService)
 	dic.Register("lister", listerService)
 	dic.Register("receiver", receiverService)
+	dic.Register("sender", senderService)
+	dic.Register("consumer", consumerService)
 
 	dic.MustService("flags") // Store generated params before freeze.
 	dic.Freeze()
@@ -94,9 +104,27 @@ func listerService(dic *izidic.Container) (any, error) {
 }
 
 func receiverService(dic *izidic.Container) (any, error) {
+	cli := dic.MustService("sqs").(*sqs.Client)
+	w := dic.MustParam("writer").(io.Writer)
+	return func(ctx context.Context, qURL string) {
+		receiver(ctx, w, cli, qURL)
+	}, nil
+}
+
+func senderService(dic *izidic.Container) (any, error) {
 	cli := dic.MustService("sqs").(*sqs.Client)
 	w := dic.MustParam("writer").(io.Writer)
 	return func(ctx context.Context, qName string) {
-		receiver(ctx, w, cli, qName)
+		sender(ctx, w, cli, qName)
+	}, nil
+}
+
+func consumerService(dic *izidic.Container) (any, error) {
+	cli := dic.MustService("sqs").(*sqs.Client)
+	w := dic.MustParam("writer").(io.Writer)
+	hdl := dic.MustParam("handler").(Handler)
+	enc := json.NewEncoder(w)
+	return func(ctx context.Context, qURL string) error {
+		return consumer(ctx, w, enc, cli, qURL, hdl)
 	}, nil
 }

+ 1 - 0
go.mod

@@ -8,6 +8,7 @@ require (
 	github.com/aws/aws-sdk-go-v2/service/sqs v1.19.17
 	github.com/davecgh/go-spew v1.1.0
 	github.com/fgm/izidic v0.0.2
+	github.com/google/uuid v1.3.0
 	gopkg.in/yaml.v2 v2.4.0
 )
 

+ 2 - 0
go.sum

@@ -30,6 +30,8 @@ github.com/fgm/izidic v0.0.2 h1:xpIr9sEVE2xVMlUPu7zcQhumxPB0mjW2/j8Cm598sMw=
 github.com/fgm/izidic v0.0.2/go.mod h1:HSUQlWnf88mpvaovrffBQnjqug3WLjTisLZl5g2RaEc=
 github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
 github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

+ 26 - 0
handler.go

@@ -0,0 +1,26 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"time"
+
+	"github.com/google/uuid"
+)
+
+// HandleDummy is blissfully unaware of SQS, and could therefore be used as a handler for another message queue.
+func HandleDummy(_ context.Context, enc *json.Encoder, msg []byte) error {
+	type EventBody struct {
+		EventID   uuid.UUID      `json:"event_id"`
+		EventTime time.Time      `json:"event_time"`
+		Data      map[string]int `json:"data,omitempty"`
+	}
+	evt := EventBody{}
+	if err := json.Unmarshal(msg, &evt); err != nil {
+		return fmt.Errorf("error parsing body as JSON: %w", err)
+	}
+
+	enc.Encode(evt)
+	return nil
+}

+ 11 - 0
producer.go

@@ -1 +1,12 @@
 package main
+
+import (
+	"context"
+	"io"
+
+	"github.com/aws/aws-sdk-go-v2/service/sqs"
+)
+
+func sender(ctx context.Context, w io.Writer, client *sqs.Client, qName string) {
+
+}