Browse Source

Raw GET /queue handler

Frédéric G. MARAND 1 year ago
parent
commit
2002fb1b5c

+ 1 - 1
.run/Redriver.run.xml

@@ -2,7 +2,7 @@
   <configuration default="false" name="Redriver" type="GoApplicationRunConfiguration" factoryName="Go Application">
     <module name="sqs_demo" />
     <working_directory value="$PROJECT_DIR$" />
-    <parameters value="-profile=sqs-tutorial -addr :81" />
+    <parameters value="-profile=sqs-tutorial -addr :81 -wait 10" />
     <kind value="PACKAGE" />
     <package value="code.osinet.fr/fgm/sqs_demo/cmd/redriver" />
     <directory value="$PROJECT_DIR$" />

+ 44 - 0
back/services/redriver/converters.go

@@ -1,11 +1,15 @@
 package redriver
 
 import (
+	"encoding/base64"
+	"errors"
 	"fmt"
 	"net/url"
+	"strconv"
 	"strings"
 
 	"github.com/aws/aws-sdk-go-v2/aws/arn"
+	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
 )
 
 func ARNFromURL(qURL string) (arn.ARN, error) {
@@ -79,3 +83,43 @@ func URLFromARN(qARN arn.ARN) (name string, err error) {
 	}
 	return u.String(), nil
 }
+
+func JSONableFromMessageAttributeValues(m map[string]types.MessageAttributeValue) (map[string]any, error) {
+	j := make(map[string]any)
+	for name, attr := range m {
+		if attr.DataType == nil {
+			return nil, errors.New("empty DataType on message attribute value")
+		}
+		switch t := *attr.DataType; t {
+		case "String":
+			if attr.StringValue == nil {
+				return nil, fmt.Errorf("nil string value on string field %q", name)
+			}
+			j[name] = *attr.StringValue
+		case "Number":
+			if attr.StringValue == nil {
+				return nil, fmt.Errorf("nil string value on number field %q", name)
+			}
+			n, err := strconv.Atoi(*attr.StringValue)
+			if err != nil {
+				return nil, fmt.Errorf("invalid numeric string on number field %q", name)
+			}
+			j[name] = n
+		case "Binary":
+			if attr.BinaryValue == nil {
+				j[name] = nil
+				continue
+			}
+			in := attr.BinaryValue
+			out := make([]byte, base64.StdEncoding.DecodedLen(len(in)))
+			_, err := base64.StdEncoding.Decode(out, in)
+			if err != nil {
+				return nil, fmt.Errorf("failed decoding binary field %q: %w", name, err)
+			}
+			j[name] = out
+		default:
+			return nil, fmt.Errorf("unimplemented DataType %q", t)
+		}
+	}
+	return j, nil
+}

+ 186 - 77
back/services/redriver/redriver.go

@@ -15,6 +15,12 @@ import (
 	"code.osinet.fr/fgm/sqs_demo/services"
 )
 
+const (
+	// MessageSystemAttributeNameDeadLetterQueueSourceArn is an undocumented
+	// types.Message attribute, used by the SQS console to support redrive.
+	MessageSystemAttributeNameDeadLetterQueueSourceArn types.MessageSystemAttributeName = "DeadLetterQueueSourceArn"
+)
+
 type ItemsKeys struct {
 	MessageID     string
 	ReceiptHandle string
@@ -25,18 +31,20 @@ type Redriver interface {
 	ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)
 	GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error)
 	GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)
-	GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error)
+	GetQueueItems(ctx context.Context, qName string) ([]Message, error)
 	DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
 	Purge(ctx context.Context, qName string) error
 	RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error
 }
 
 type redriver struct {
+	Wait int32
+
 	io.Writer
 	*sqs.Client
 }
 
-func (r redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
+func (r *redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
 	lqi := &sqs.ListQueuesInput{
 		MaxResults:      aws.Int32(1000),
 		NextToken:       nil,
@@ -49,7 +57,7 @@ func (r redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []st
 	return lqo.QueueUrls, nil
 }
 
-func (r redriver) parseRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) {
+func (*redriver) parseQueueInfoRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) {
 	srap, ok := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]
 	if !ok || srap == "" {
 		return nil, nil
@@ -62,7 +70,7 @@ func (r redriver) parseRedriveAllowPolicy(qName string, qao sqs.GetQueueAttribut
 	return qrap, nil
 }
 
-func (r redriver) parseRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) {
+func (*redriver) parseQueueInfoRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) {
 	srp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
 	if !ok || srp == "" {
 		return nil, nil
@@ -79,7 +87,7 @@ func (r redriver) parseRedrivePolicy(qName string, qao sqs.GetQueueAttributesOut
 	return qrp, nil
 }
 
-func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {
+func (r *redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {
 	qui := &sqs.GetQueueUrlInput{QueueName: &qName}
 	qu, err := r.GetQueueUrl(ctx, qui)
 	if err != nil {
@@ -96,18 +104,18 @@ func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, er
 	if qao == nil {
 		return "", fmt.Errorf("redrive policy for queue %q is empty", qName)
 	}
-	qrp, err := r.parseRedrivePolicy(qName, *qao)
+	qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao)
 	if err != nil {
 		return "", fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
 	}
 	if qrp == nil {
 		return "", nil // Queue has no DLQ: this is not an error
 	}
-	qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseRedrivePolicy
+	qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseQueueInfoRedrivePolicy
 	return qURL, nil
 }
 
-func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
+func (r *redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
 	qui := &sqs.GetQueueUrlInput{QueueName: &qName}
 	qu, err := r.GetQueueUrl(ctx, qui)
 	if err != nil {
@@ -116,11 +124,7 @@ func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, e
 	if qu.QueueUrl == nil {
 		return nil, fmt.Errorf("URL for queue %q is empty", qName)
 	}
-	u := *qu.QueueUrl
-	a, err := ARNFromURL(u)
-	if err != nil {
-		return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err)
-	}
+
 	qai := &sqs.GetQueueAttributesInput{
 		QueueUrl:       qu.QueueUrl,
 		AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
@@ -132,91 +136,193 @@ func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, e
 	if qao == nil {
 		return nil, fmt.Errorf("no attributes returned for queue %q", qName)
 	}
+	qi, err := r.parseQueueInfoAttributes(qName, qu, qao)
+	if err != nil {
+		return nil, err
+	}
+	return qi, err
+}
+
+func (r *redriver) parseQueueInfoAttributes(qName string, qu *sqs.GetQueueUrlOutput, qao *sqs.GetQueueAttributesOutput) (*QueueInfo, error) {
 	var (
-		errCount = 0
-		qi       *QueueInfo
+		qi                               *QueueInfo
+		errCount                         int
+		anom, anomd, anomnv              int64
+		created, changed                 int64 // timestamps
+		delay, max, retention, vto, wait int64
 	)
-	{
-		var (
-			anom, anomd, anomnv              int32
-			created, changed                 int32 // timestamps
-			delay, max, retention, vto, wait int32
-		)
+	u := *qu.QueueUrl
+	a, err := ARNFromURL(u)
+	if err != nil {
+		return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err)
+	}
+
+	for _, field := range []struct {
+		name  types.QueueAttributeName
+		value *int64
+		sDef  string
+		def   int64
+	}{
+		{types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1},
+		{types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1},
+		{types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1},
+		{types.QueueAttributeNameCreatedTimestamp, &created, "0", 0},
+		{types.QueueAttributeNameDelaySeconds, &delay, "-1", -1},
+		{types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0},
+		{types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)},          // 256ko
+		{types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks
+		{types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0},               // short polling
+		{types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0},                            // short polling
+	} {
+		s, ok := qao.Attributes[string(field.name)]
+		if !ok {
+			errCount++
+			s = field.sDef
+		}
+		n, err := strconv.Atoi(s)
+		if err != nil {
+			errCount++
+		}
+		*field.value = int64(n)
+	}
+	qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao)
+	if err != nil {
+		return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
+	}
+	qrap, err := r.parseQueueInfoRedriveAllowPolicy(qName, *qao)
+	if err != nil {
+		return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err)
+	}
+	qi = &QueueInfo{
+		Name: qName,
+		Url:  u,
+		Attributes: &QueueInfoAttributes{
+			ApproximateNumberOfMessages:           anom,
+			ApproximateNumberOfMessagesDelayed:    anomd,
+			ApproximateNumberOfMessagesNotVisible: anomnv,
+			CreatedTimestamp:                      created,
+			DelaySeconds:                          delay,
+			LastModifiedTimestamp:                 changed,
+			MaximumMessageSize:                    max,
+			MessageRetentionPeriod:                retention,
+			QueueARN:                              a.String(),
+			ReceiveMessageWaitTimeSeconds:         wait,
+			VisibilityTimeout:                     vto,
+			RedrivePolicy:                         qrp,
+			RedriveAllowPolicy:                    qrap,
+		},
+	}
+	return qi, nil
+}
+
+func (*redriver) parseReceivedAttributes(msg types.Message) (*SystemMessageAttributes, error) {
+	var (
+		afrt, arc, sent int64
+		errCount        int
+		err             error
+	)
+	for _, field := range []struct {
+		name  types.MessageSystemAttributeName
+		value *int64
+		sDef  string
+		def   int64
+	}{
+		{types.MessageSystemAttributeNameApproximateFirstReceiveTimestamp, &afrt, "0", 0},
+		{types.MessageSystemAttributeNameApproximateReceiveCount, &arc, "0", 0},
+		{types.MessageSystemAttributeNameSentTimestamp, &sent, "0", 0},
+	} {
+		s, ok := msg.Attributes[string(field.name)]
+		if !ok {
+			errCount++
+			s = field.sDef
+		}
+		n, err := strconv.Atoi(s)
+		if err != nil {
+			errCount++
+		}
+		*field.value = int64(n)
+	}
+	ma := SystemMessageAttributes{
+		ApproximateFirstReceiveTimestamp: afrt,
+		ApproximateReceiveCount:          arc,
+		DeadLetterQueueSourceARN:         msg.Attributes[string(MessageSystemAttributeNameDeadLetterQueueSourceArn)],
+		SenderId:                         msg.Attributes[string(types.MessageSystemAttributeNameSenderId)],
+		SentTimestamp:                    sent,
+	}
+	if errCount > 0 {
+		err = fmt.Errorf("encounted %d errors parsing system message attributes", errCount)
+		return nil, err
+	}
+	return &ma, nil
+}
+
+func (r *redriver) GetQueueItems(ctx context.Context, qName string) ([]Message, error) {
+	qui := &sqs.GetQueueUrlInput{QueueName: &qName}
+	qu, err := r.GetQueueUrl(ctx, qui)
+	if err != nil {
+		return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
+	}
+	if qu.QueueUrl == nil {
+		return nil, fmt.Errorf("URL for queue %q is empty", qName)
+	}
+	rmi := sqs.ReceiveMessageInput{
+		QueueUrl:              qu.QueueUrl,
+		AttributeNames:        []types.QueueAttributeName{types.QueueAttributeNameAll},
+		MaxNumberOfMessages:   10,
+		MessageAttributeNames: []string{".*"},
+		VisibilityTimeout:     0,
+		WaitTimeSeconds:       r.Wait,
+	}
+	rmo, err := r.Client.ReceiveMessage(ctx, &rmi)
+	ms := make([]Message, 0, len(rmo.Messages))
+	for _, m := range rmo.Messages {
+		ma, err := r.parseReceivedAttributes(m)
+		if err != nil {
+			return nil, err
+		}
 		for _, field := range []struct {
-			name  types.QueueAttributeName
-			value *int32
-			sDef  string
-			def   int32
+			name  string
+			value any
 		}{
-			{types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1},
-			{types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1},
-			{types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1},
-			{types.QueueAttributeNameCreatedTimestamp, &created, "0", 0},
-			{types.QueueAttributeNameDelaySeconds, &delay, "-1", -1},
-			{types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0},
-			{types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)},          // 256ko
-			{types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks
-			{types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0},               // short polling
-			{types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0},                            // short polling
+			{"body", m.Body},
+			{"md5 of body", m.MD5OfBody},
+			{"md5 of message attributes", m.MD5OfMessageAttributes},
+			{"message ID", m.MessageId},
+			{"receipt handle", m.ReceiptHandle},
 		} {
-			s, ok := qao.Attributes[string(field.name)]
-			if !ok {
-				errCount++
-				s = field.sDef
+			if field.value == nil {
+				return nil, fmt.Errorf("missing field %s on message", field.name)
 			}
-			n, err := strconv.Atoi(s)
-			if err != nil {
-				errCount++
-			}
-			*field.value = int32(n)
 		}
-		qrp, err := r.parseRedrivePolicy(qName, *qao)
+		j, err := JSONableFromMessageAttributeValues(m.MessageAttributes)
 		if err != nil {
-			return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)
+			return nil, fmt.Errorf("failed decoding message boby: %w", err)
 		}
-		qrap, err := r.parseRedriveAllowPolicy(qName, *qao)
-		if err != nil {
-			return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err)
-		}
-		qi = &QueueInfo{
-			Name: qName,
-			Url:  u,
-			Attributes: &QueueInfoAttributes{
-				ApproximateNumberOfMessages:           anom,
-				ApproximateNumberOfMessagesDelayed:    anomd,
-				ApproximateNumberOfMessagesNotVisible: anomnv,
-				CreatedTimestamp:                      created,
-				DelaySeconds:                          delay,
-				LastModifiedTimestamp:                 changed,
-				MaximumMessageSize:                    max,
-				MessageRetentionPeriod:                retention,
-				QueueARN:                              a.String(),
-				ReceiveMessageWaitTimeSeconds:         wait,
-				VisibilityTimeout:                     vto,
-				RedrivePolicy:                         qrp,
-				RedriveAllowPolicy:                    qrap,
-			},
+		m := Message{
+			Attributes:             ma,
+			Body:                   *m.Body,
+			Md5OfBody:              *m.MD5OfBody,
+			Md5OfMessageAttributes: *m.MD5OfMessageAttributes,
+			MessageAttributes:      j,
+			MessageId:              *m.MessageId,
+			ReceiptHandle:          *m.ReceiptHandle,
 		}
+		ms = append(ms, m)
 	}
-	return qi, err
+	return ms, err
 }
 
-func (r redriver) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) {
+func (r *redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error {
 	// TODO implement me
 	panic("implement me")
 }
 
-func (r redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error {
+func (r *redriver) Purge(ctx context.Context, qName string) error {
 	// TODO implement me
 	panic("implement me")
 }
 
-func (r redriver) Purge(ctx context.Context, qName string) error {
-	// TODO implement me
-	panic("implement me")
-}
-
-func (r redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
+func (r *redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
 	// TODO implement me
 	panic("implement me")
 }
@@ -224,7 +330,10 @@ func (r redriver) RedriveItems(ctx context.Context, qName string, itemIDs []Item
 func RedriverService(dic *izidic.Container) (any, error) {
 	cli := dic.MustService(services.SvcClient).(*sqs.Client)
 	w := dic.MustParam(services.PWriter).(io.Writer)
+	wait := int32(dic.MustParam(services.PWait).(int))
 	return &redriver{
+		Wait: wait,
+
 		Writer: w,
 		Client: cli,
 	}, nil

File diff suppressed because it is too large
+ 20 - 18
back/services/redriver/types.go


+ 11 - 4
back/services/services.go

@@ -10,16 +10,21 @@ import (
 )
 
 const (
+	// Flags
 	PAddr    = "addr"
-	PArgs    = "args"
-	PHandler = "handler"
-	PName    = "name"
 	PProfile = "profile"
 	PQName   = "queue-name"
 	PRegion  = "region"
 	PURL     = "url"
+	PWait    = "wait"
+
+	// Non-flags
+	PArgs    = "args"
+	PHandler = "handler"
+	PName    = "name"
 	PWriter  = "writer"
 
+	// Services
 	SvcClient   = "sqs"
 	SvcConsumer = "consumeMessage"
 	SvcFlags    = "flags"
@@ -38,15 +43,17 @@ func FlagsService(dic *izidic.Container) (any, error) {
 	region := fs.String(PRegion, "eu-west-3", "The AWS region to connect to")
 	qName := fs.String(PQName, "dummy-queue", "The queue name")
 	sqsURL := fs.String(PURL, "http://localhost:4566", "The SQS endpoint URL")
+	wait := fs.Int(PWait, 3, "The maximum number of seconds to wait when receiving messages")
 	if err := fs.Parse(dic.MustParam(PArgs).([]string)); err != nil {
 		return nil, fmt.Errorf("cannot obtain CLI args")
 	}
 
 	dic.Store(PAddr, *addr)
 	dic.Store(PProfile, *profile)
+	dic.Store(PQName, *qName)
 	dic.Store(PRegion, *region)
 	dic.Store(PURL, *sqsURL)
-	dic.Store(PQName, *qName)
+	dic.Store(PWait, *wait)
 	return fs, nil
 }
 

+ 7 - 1
back/web/queue.go

@@ -19,6 +19,12 @@ func makeQueueHandler(rd redriver.Redriver) gin.HandlerFunc {
 			c.JSON(http.StatusInternalServerError, nil)
 			return
 		}
-		c.JSON(http.StatusOK, qi)
+		items, err := rd.GetQueueItems(ctx, qName)
+		if err != nil {
+			log.Printf("failed getting items for queue %q: %v", qName, err)
+			c.JSON(http.StatusInternalServerError, nil)
+			return
+		}
+		c.JSON(http.StatusOK, map[string]any{"info": qi, "items": items})
 	}
 }

+ 1 - 0
back/web/routes.go

@@ -15,6 +15,7 @@ var css []byte
 
 func SetupRoutes(rd redriver.Redriver) *gin.Engine {
 	r := gin.Default()
+	r.SetTrustedProxies(nil)
 	r.GET("/", makeHomeHandler(rd))
 	r.GET("/public/redriver.css", func(c *gin.Context) { c.Writer.Write(css) })
 	r.GET("/queue/:name", makeQueueHandler(rd))

Some files were not shown because too many files changed in this diff