فهرست منبع

Redrive now producing to source queue.

Frederic G. MARAND 2 سال پیش
والد
کامیت
49702feddf

+ 13 - 0
.run/Message Store.run.xml

@@ -0,0 +1,13 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="Message Store" type="GoTestRunConfiguration" factoryName="Go Test">
+    <module name="sqs_demo" />
+    <working_directory value="$PROJECT_DIR$/back/services/redriver" />
+    <root_directory value="$PROJECT_DIR$" />
+    <kind value="PACKAGE" />
+    <package value="code.osinet.fr/fgm/sqs_demo/back/services/redriver" />
+    <directory value="$PROJECT_DIR$" />
+    <filePath value="$PROJECT_DIR$" />
+    <framework value="gotest" />
+    <method v="2" />
+  </configuration>
+</component>

+ 0 - 2
back/cmd/producer/main.go

@@ -9,7 +9,6 @@ import (
 	"github.com/fgm/izidic"
 
 	"code.osinet.fr/fgm/sqs_demo/back/services"
-	"code.osinet.fr/fgm/sqs_demo/back/services/redriver"
 )
 
 func main() {
@@ -39,7 +38,6 @@ func Resolve(w io.Writer, name string, args []string) *izidic.Container {
 	dic.Register(services.SvcFlags, services.FlagsService)
 	dic.Register(services.SvcLister, services.ListerService)
 	dic.Register(services.SvcLogger, services.LoggerService)
-	dic.Register(services.SvcMessageStore, redriver.MessageStoreService)
 	dic.Register(services.SvcProducer, services.ProducerService)
 
 	dic.MustService(services.SvcFlags) // Store generated params before freeze.

+ 21 - 0
back/services/redriver/converters.go

@@ -8,6 +8,7 @@ import (
 	"strconv"
 	"strings"
 
+	"github.com/aws/aws-sdk-go-v2/aws"
 	"github.com/aws/aws-sdk-go-v2/aws/arn"
 	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
 )
@@ -84,6 +85,26 @@ func URLFromARN(qARN arn.ARN) (name string, err error) {
 	return u.String(), nil
 }
 
+// MessageAttributeValuesFromJSONable attempts to convert a plain JSON to Message.MessageAttributes field value.
+//
+// FIXME we should not be storing those maps in JSONable form anyway, because that loses information.
+func MessageAttributeValuesFromJSONable(in map[string]any) (map[string]types.MessageAttributeValue, error) {
+	m := make(map[string]types.MessageAttributeValue, len(in))
+	for k, v := range in {
+		mav := types.MessageAttributeValue{StringValue: aws.String(fmt.Sprint(v))}
+		switch v.(type) {
+		case int:
+			mav.DataType = aws.String("Number")
+		case string:
+			mav.DataType = aws.String("String")
+		default:
+			return nil, fmt.Errorf("message attributes contain a non-convertible value: %T", v)
+		}
+		m[k] = mav
+	}
+	return m, nil
+}
+
 func JSONableFromMessageAttributeValues(m map[string]types.MessageAttributeValue) (map[string]any, error) {
 	j := make(map[string]any)
 	for name, attr := range m {

+ 4 - 1
back/services/redriver/message_store.go

@@ -7,6 +7,8 @@ import (
 	"time"
 
 	"github.com/fgm/izidic"
+
+	"code.osinet.fr/fgm/sqs_demo/back/services"
 )
 
 // Ensure messageStore implements interface MessageStore.
@@ -116,5 +118,6 @@ func newMessageStore(ttl time.Duration) *messageStore {
 }
 
 func MessageStoreService(dic *izidic.Container) (any, error) {
-	return newMessageStore(10 * time.Second), nil
+	ttl := dic.MustParam(services.PTTL).(time.Duration)
+	return newMessageStore(ttl), nil
 }

+ 54 - 4
back/services/redriver/redriver.go

@@ -42,7 +42,7 @@ type Redriver interface {
 	GetQueueItems(ctx context.Context, qName string) ([]Message, error)
 	DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
 	Purge(ctx context.Context, qName string) error
-	RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error
+	RedriveItems(ctx context.Context, qName string, messages []Message) error
 }
 
 type redriver struct {
@@ -393,9 +393,59 @@ func (r *redriver) Purge(ctx context.Context, qName string) error {
 	return nil
 }
 
-func (r *redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
-	// TODO implement me
-	panic("implement me")
+// RedriveItems sends the selected message back to their respective source queue.
+//
+// Since a queue can act as a DLQ for more than one source queue, the messages
+// sends are grouped by source queue.
+func (r *redriver) RedriveItems(ctx context.Context, qName string, messages []Message) error {
+	qURLs := make(map[string][]Message, 1) // In most cases, only a single queue will be used.
+	for _, message := range messages {
+		sARN := message.Attributes.DeadLetterQueueSourceARN
+		sURL, err := URLFromARNString(sARN)
+		if err != nil {
+			return fmt.Errorf("failed resolving source ARN %q to URL: %v", sARN, err)
+		}
+		qURLs[sURL] = append(qURLs[sURL], message)
+	}
+
+	for qURL, messages := range qURLs {
+		if err := r.redriveQueueMessages(ctx, qURL, messages); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (r *redriver) redriveQueueMessages(ctx context.Context, qURL string, messages []Message) error {
+	smbre := make([]types.SendMessageBatchRequestEntry, len(messages))
+	for i, m := range messages {
+		m.MessageAttributes["previous-message-id"] = m.MessageId
+		mav, err := MessageAttributeValuesFromJSONable(m.MessageAttributes)
+		if err != nil {
+			return fmt.Errorf("failed converting message attributes for message %s on queue %q: %v",
+				m.MessageId, qURL, err)
+		}
+		smbre[i] = types.SendMessageBatchRequestEntry{
+			Id:                      aws.String(strconv.Itoa(i)),
+			MessageBody:             &m.Body,
+			DelaySeconds:            0,
+			MessageAttributes:       mav,
+			MessageDeduplicationId:  nil,
+			MessageGroupId:          nil,
+			MessageSystemAttributes: nil,
+		}
+	}
+	smbi := sqs.SendMessageBatchInput{
+		Entries:  smbre,
+		QueueUrl: &qURL,
+	}
+	smbo, err := r.SendMessageBatch(ctx, &smbi)
+	if err != nil {
+		return fmt.Errorf("failed sending messages to queue %q: %v",
+			qURL, err)
+	}
+	log.Printf("%#v", smbo)
+	return nil
 }
 
 func RedriverService(dic *izidic.Container) (any, error) {

+ 7 - 0
back/services/redriver/types.go

@@ -19,6 +19,13 @@ type Message struct {
 	ReceiptHandle          string                   `json:"receipt_handle,omitempty"`            // ReceiptHandle is an identifier associated with the act of receiving the message. A new receipt handle is returned every time you receive a message. When deleting a message, you provide the last received receipt handle to delete the message.
 }
 
+func (m Message) Keys() ItemsKeys {
+	return ItemsKeys{
+		MessageID:     m.MessageId,
+		ReceiptHandle: m.ReceiptHandle,
+	}
+}
+
 type NameRedriveBody struct {
 	Id            string `json:"id,omitempty"`
 	ReceiptHandle string `json:"receipt_handle,omitempty"`

+ 4 - 0
back/services/services.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io"
 	"log"
+	"time"
 
 	"github.com/fgm/izidic"
 )
@@ -17,6 +18,7 @@ const (
 	PRegion      = "region"
 	PCSRFSecret  = "csrf-secret"
 	PStoreSecret = "store-secret"
+	PTTL         = "ttl"
 	PURL         = "url"
 	PWait        = "wait"
 
@@ -49,6 +51,7 @@ func FlagsService(dic *izidic.Container) (any, error) {
 	csrfSecret := fs.String(PCSRFSecret, "csrfSecret", "The CSRF secret")
 	storeSecret := fs.String(PStoreSecret, "storeSecret", "The session store secret")
 	sqsURL := fs.String(PURL, "http://localhost:4566", "The SQS endpoint URL")
+	ttl := fs.Duration(PTTL, 10*time.Minute, "The message store TTL")
 	wait := fs.Int(PWait, 3, "The maximum number of seconds to wait when receiving messages")
 	if err := fs.Parse(dic.MustParam(PArgs).([]string)); err != nil {
 		return nil, fmt.Errorf("cannot obtain CLI args")
@@ -60,6 +63,7 @@ func FlagsService(dic *izidic.Container) (any, error) {
 	dic.Store(PRegion, *region)
 	dic.Store(PCSRFSecret, []byte(*csrfSecret))
 	dic.Store(PStoreSecret, []byte(*storeSecret))
+	dic.Store(PTTL, *ttl)
 	dic.Store(PURL, *sqsURL)
 	dic.Store(PWait, *wait)
 	return fs, nil

+ 1 - 4
back/web/delete.go

@@ -46,10 +46,7 @@ func makeDeleteHandler(rd redriver.Redriver) gin.HandlerFunc {
 
 		keys := make([]redriver.ItemsKeys, len(messages))
 		for i, msg := range messages {
-			keys[i] = redriver.ItemsKeys{
-				MessageID:     msg.MessageId,
-				ReceiptHandle: msg.ReceiptHandle,
-			}
+			keys[i] = msg.Keys()
 		}
 		t0 := time.Now()
 		err := rd.DeleteItems(ctx, qName, keys)

+ 58 - 2
back/web/redrive.go

@@ -1,11 +1,67 @@
 package web
 
 import (
+	"fmt"
+	"log"
+	"net/http"
+
+	"github.com/gin-contrib/sessions"
 	"github.com/gin-gonic/gin"
 
 	"code.osinet.fr/fgm/sqs_demo/back/services/redriver"
 )
 
-func makeRedriveHandler(rd redriver.Redriver) gin.HandlerFunc {
-	return nil
+func makeRedriveHandler(rd redriver.Redriver, ms redriver.MessageStore) gin.HandlerFunc {
+	return func(c *gin.Context) {
+		ctx := c.Request.Context()
+		qName := c.Param("name")
+		redirect := "/queue/" + qName
+
+		sess := sessions.Default(c)
+		// Do not consume sess.Flashes(): this is a redirect-only handler, and they would be lost.
+		defer func() {
+			_ = sess.Save()
+			c.Redirect(http.StatusSeeOther, redirect)
+		}()
+
+		req := c.Request
+		if err := req.ParseForm(); err != nil {
+			log.Printf("Failed to parse deletion confirm form for queue %s: %v",
+				qName, err)
+			sess.AddFlash(fmt.Sprintf("Failed to parsed deletion confirmm form for queue %s",
+				qName))
+			return
+		}
+		ids := parseIDs(req.Form, validateUint)
+
+		var messages []redriver.Message
+		if len(ids) == 0 {
+			flash := fmt.Sprintf("Got no message to delete from queue %q", qName)
+			log.Print(flash)
+			sess.AddFlash(flash)
+			return
+		}
+		messages = parseMessages(req.Form, ids)
+		for i, message := range messages {
+			m, ok := ms.Get(message.ReceiptHandle)
+			if !ok {
+				sess.AddFlash(fmt.Sprintf("Failed retrieving message with ID %s and receipt handle %s from queue %q. Aborting redrive.",
+					message.MessageId, message.ReceiptHandle, qName))
+				return
+			}
+			messages[i] = m
+		}
+		var flash string
+		err := rd.RedriveItems(ctx, qName, messages)
+		if err != nil {
+			flash = fmt.Sprintf("failed redriving selected messages on queue %q: %v",
+				qName, err)
+		} else {
+			flash = fmt.Sprintf("%d messages redriven to queue %q."+
+				`They now carry new MessageIDs and their past MessageID will be visible in the "previous-message-id" attribute`,
+				len(messages), qName)
+		}
+		log.Print(flash)
+		sess.AddFlash(flash)
+	}
 }

+ 3 - 3
back/web/routes.go

@@ -49,9 +49,9 @@ func SetupRoutes(rd redriver.Redriver, ms redriver.MessageStore, renderer *templ
 	r.GET("/", makeHomeHandler(rd))
 
 	// TODO
-	r.POST("/queue/:name/delete", mw, makeDeleteHandler(rd))   // Needs mw to check token.
-	r.POST("/queue/:name/purge", mw, makePurgeHandler(rd))     // Needs mw to check token.
-	r.POST("/queue/:name/redrive", mw, makeRedriveHandler(rd)) // Needs mw to check token.
+	r.POST("/queue/:name/delete", mw, makeDeleteHandler(rd))       // Needs mw to check token.
+	r.POST("/queue/:name/purge", mw, makePurgeHandler(rd))         // Needs mw to check token.
+	r.POST("/queue/:name/redrive", mw, makeRedriveHandler(rd, ms)) // Needs mw to check token.
 	return r
 }