Ver Fonte

Redriver complete.

Frederic G. MARAND há 2 anos atrás
pai
commit
0cf154d64f
5 ficheiros alterados com 165 adições e 46 exclusões
  1. 116 16
      back/services/redriver/redriver.go
  2. 8 2
      back/services/services.go
  3. 1 1
      back/web/confirm.go
  4. 38 27
      back/web/redrive.go
  5. 2 0
      back/web/routes.go

+ 116 - 16
back/services/redriver/redriver.go

@@ -3,6 +3,7 @@ package redriver
 import (
 	"context"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io"
 	"log"
@@ -19,11 +20,18 @@ import (
 )
 
 const (
+	// BatchMax defines the maximum number of message usable in a batch operation, including redriving.
+	BatchMax = 10
+
 	// MessageSystemAttributeNameDeadLetterQueueSourceArn is an undocumented
 	// types.Message attribute, used by the SQS console to support redrive.
 	MessageSystemAttributeNameDeadLetterQueueSourceArn types.MessageSystemAttributeName = "DeadLetterQueueSourceArn"
 )
 
+var (
+	ErrBatchTooBig = fmt.Errorf("operation requested on more than %d items", BatchMax)
+)
+
 type ItemsKeys struct {
 	MessageID     string
 	ReceiptHandle string
@@ -46,6 +54,7 @@ type Redriver interface {
 }
 
 type redriver struct {
+	VTO  int32
 	Wait int32
 
 	io.Writer
@@ -393,30 +402,69 @@ func (r *redriver) Purge(ctx context.Context, qName string) error {
 	return nil
 }
 
-// RedriveItems sends the selected message back to their respective source queue.
+// RedriveItems redrives the selected message back to their respective source queue,
+// removing them from the DLQ once they have been sent.
 //
 // Since a queue can act as a DLQ for more than one source queue, the messages
 // sends are grouped by source queue.
-func (r *redriver) RedriveItems(ctx context.Context, qName string, messages []Message) error {
-	qURLs := make(map[string][]Message, 1) // In most cases, only a single queue will be used.
+func (r *redriver) RedriveItems(ctx context.Context, dlqName string, messages []Message) error {
+	sqURLs := make(map[string][]Message, 1) // In most cases, only a single queue will be used.
 	for _, message := range messages {
 		sARN := message.Attributes.DeadLetterQueueSourceARN
 		sURL, err := URLFromARNString(sARN)
 		if err != nil {
 			return fmt.Errorf("failed resolving source ARN %q to URL: %v", sARN, err)
 		}
-		qURLs[sURL] = append(qURLs[sURL], message)
+		sqURLs[sURL] = append(sqURLs[sURL], message)
 	}
 
-	for qURL, messages := range qURLs {
-		if err := r.redriveQueueMessages(ctx, qURL, messages); err != nil {
+	for qURL, messages := range sqURLs {
+		if err := r.redriveQueueMessages(ctx, dlqName, qURL, messages); err != nil {
 			return err
 		}
 	}
 	return nil
 }
 
-func (r *redriver) redriveQueueMessages(ctx context.Context, qURL string, messages []Message) error {
+// redriveQueueMessages handles message redriving for messages in a single queue.
+func (r *redriver) redriveQueueMessages(ctx context.Context, dlqName string, qURL string, messages []Message) error {
+	if len(messages) > BatchMax {
+		return ErrBatchTooBig
+	}
+	qui := &sqs.GetQueueUrlInput{QueueName: &dlqName}
+	dlqURL, err := r.GetQueueUrl(ctx, qui)
+	if err != nil || dlqURL == nil {
+		return fmt.Errorf("failed getting URL for queue %q: %w", dlqName, err)
+	}
+
+	// Hide messages to prevent other consumers from seeing them and generating duplicates.
+	fatal, nontafal := r.hideQueueMessages(ctx, "", *dlqURL.QueueUrl, messages)
+	if fatal != nil {
+		return fmt.Errorf("failed hiding messages during redrive towards queue %q: %w", qURL, fatal)
+	}
+	if nontafal != nil {
+		log.Printf("Redrive nonfatal error hiding messages on queue %q: %v", dlqName, nontafal)
+	}
+
+	// Send the messages back to their source queue.
+	if err := r.resendQueueMessages(ctx, qURL, messages); err != nil {
+		return fmt.Errorf("failed sending messages back to queue %q: %w", qURL, err)
+	}
+
+	// Delete them from the DLQ.
+	keys := make([]ItemsKeys, len(messages))
+	for i, m := range messages {
+		keys[i] = m.Keys()
+	}
+	if err := r.DeleteItems(ctx, dlqName, keys); err != nil {
+		return fmt.Errorf("failed deleting messages already redriven from DLQ %s to queue %q: beware of duplicates: %w",
+			dlqName, qURL, err)
+	}
+
+	return nil
+}
+
+func (r *redriver) resendQueueMessages(ctx context.Context, qURL string, messages []Message) error {
 	smbre := make([]types.SendMessageBatchRequestEntry, len(messages))
 	for i, m := range messages {
 		m.MessageAttributes["previous-message-id"] = m.MessageId
@@ -425,14 +473,11 @@ func (r *redriver) redriveQueueMessages(ctx context.Context, qURL string, messag
 			return fmt.Errorf("failed converting message attributes for message %s on queue %q: %v",
 				m.MessageId, qURL, err)
 		}
+
 		smbre[i] = types.SendMessageBatchRequestEntry{
-			Id:                      aws.String(strconv.Itoa(i)),
-			MessageBody:             &m.Body,
-			DelaySeconds:            0,
-			MessageAttributes:       mav,
-			MessageDeduplicationId:  nil,
-			MessageGroupId:          nil,
-			MessageSystemAttributes: nil,
+			Id:                aws.String(strconv.Itoa(i)),
+			MessageBody:       &m.Body,
+			MessageAttributes: mav,
 		}
 	}
 	smbi := sqs.SendMessageBatchInput{
@@ -444,16 +489,71 @@ func (r *redriver) redriveQueueMessages(ctx context.Context, qURL string, messag
 		return fmt.Errorf("failed sending messages to queue %q: %v",
 			qURL, err)
 	}
-	log.Printf("%#v", smbo)
-	return nil
+	if len(smbo.Failed) == 0 {
+		return nil
+	}
+
+	errs := make([]error, len(smbo.Failed))
+	for _, err := range smbo.Failed {
+		msg := fmt.Sprintf("ID: %s, Code: %s, Message: %s", *err.Id, *err.Code, *err.Message)
+		if err.SenderFault {
+			msg += " (sender fault)"
+		}
+		errs = append(errs, errors.New(msg))
+	}
+
+	return fmt.Errorf("partial redrive: failed re-sending %d/%d messages, %v",
+		len(smbo.Failed), len(smbi.Entries), errs)
+}
+
+func (r *redriver) hideQueueMessages(ctx context.Context, dlqName string, qURL string, messages []Message) (fatal, nonfatal error) {
+	cmvbre := make([]types.ChangeMessageVisibilityBatchRequestEntry, len(messages))
+	for i, m := range messages {
+		cmvbre[i] = types.ChangeMessageVisibilityBatchRequestEntry{
+			Id:                aws.String(strconv.Itoa(i)),
+			ReceiptHandle:     aws.String(m.ReceiptHandle),
+			VisibilityTimeout: r.VTO,
+		}
+	}
+	cmvbi := sqs.ChangeMessageVisibilityBatchInput{
+		Entries:  cmvbre,
+		QueueUrl: aws.String(qURL),
+	}
+	cmvbo, err := r.ChangeMessageVisibilityBatch(ctx, &cmvbi)
+	if err != nil {
+		return fmt.Errorf("failed hiding request on DLQ %q: %w", dlqName, err), nil
+	}
+	switch len(cmvbo.Failed) {
+	case len(cmvbi.Entries):
+		// No message made it: abort.
+		return fmt.Errorf("failed hiding all %d messages on DLQ %q", len(cmvbi.Entries), dlqName), nil
+	case 0:
+		return nil, nil // All well
+	default:
+		errs := make([]error, len(cmvbo.Failed))
+		for _, err := range cmvbo.Failed {
+			msg := fmt.Sprintf("ID: %s, Code: %s, Message: %s", *err.Id, *err.Code, *err.Message)
+			if err.SenderFault {
+				msg += " (sender fault)"
+			}
+			errs = append(errs, errors.New(msg))
+		}
+
+		// Some message made it, crossing fingers.
+		return nil, fmt.Errorf("failed hiding %d/%d messages, %v",
+			len(cmvbo.Failed), len(cmvbi.Entries), errs)
+	}
 }
 
 func RedriverService(dic *izidic.Container) (any, error) {
 	cli := dic.MustService(services.SvcClient).(*sqs.Client)
 	w := dic.MustParam(services.PWriter).(io.Writer)
+	vto := dic.MustParam(services.PVTO).(time.Duration)
+
 	wait := int32(dic.MustParam(services.PWait).(int))
 	return &redriver{
 		Client: cli,
+		VTO:    int32(vto.Seconds()),
 		Wait:   wait,
 		Writer: w,
 	}, nil

+ 8 - 2
back/services/services.go

@@ -20,6 +20,7 @@ const (
 	PStoreSecret = "store-secret"
 	PTTL         = "ttl"
 	PURL         = "url"
+	PVTO         = "vto"
 	PWait        = "wait"
 
 	// Non-flags
@@ -30,12 +31,12 @@ const (
 
 	// Services
 	SvcClient       = "sqs"
-	SvcConsumer     = "consume_message"
+	SvcConsumer     = "consume-message"
 	SvcFlags        = "flags"
 	SvcHttp         = "http"
 	SvcLister       = "lister"
 	SvcLogger       = "logger"
-	SvcMessageStore = "message_store"
+	SvcMessageStore = "message-store"
 	SvcProducer     = "producer"
 	SvcReceiver     = "receiver"
 	SvcRedriver     = "redriver"
@@ -52,11 +53,15 @@ func FlagsService(dic *izidic.Container) (any, error) {
 	storeSecret := fs.String(PStoreSecret, "storeSecret", "The session store secret")
 	sqsURL := fs.String(PURL, "http://localhost:4566", "The SQS endpoint URL")
 	ttl := fs.Duration(PTTL, 10*time.Minute, "The message store TTL")
+	vto := fs.Duration(PVTO, 10*time.Minute, "The redrive visibility timeout")
 	wait := fs.Int(PWait, 3, "The maximum number of seconds to wait when receiving messages")
 	if err := fs.Parse(dic.MustParam(PArgs).([]string)); err != nil {
 		return nil, fmt.Errorf("cannot obtain CLI args")
 	}
 
+	// Durations are signed, and Duration.Round rounds down, but we want positive durations, rounded up.
+	*vto = (vto.Abs() + 500*time.Millisecond).Round(time.Second)
+
 	dic.Store(PAddr, *addr)
 	dic.Store(PProfile, *profile)
 	dic.Store(PQName, *qName)
@@ -65,6 +70,7 @@ func FlagsService(dic *izidic.Container) (any, error) {
 	dic.Store(PStoreSecret, []byte(*storeSecret))
 	dic.Store(PTTL, *ttl)
 	dic.Store(PURL, *sqsURL)
+	dic.Store(PVTO, *vto)
 	dic.Store(PWait, *wait)
 	return fs, nil
 }

+ 1 - 1
back/web/confirm.go

@@ -52,7 +52,7 @@ var confirms = map[QueueOp]struct {
 	},
 	OpPurge: {
 		confirm:     "Purge",
-		description: "All the messages in the deadletter queue will be lost after that step",
+		description: "All the messages in the letter queue will be lost after that step",
 		question:    "Do you confirm this purge request?",
 		Level:       LevelDanger,
 	},

+ 38 - 27
back/web/redrive.go

@@ -13,7 +13,8 @@ import (
 
 func makeRedriveHandler(rd redriver.Redriver, ms redriver.MessageStore) gin.HandlerFunc {
 	return func(c *gin.Context) {
-		ctx := c.Request.Context()
+		req := c.Request
+		ctx := req.Context()
 		qName := c.Param("name")
 		redirect := "/queue/" + qName
 
@@ -24,36 +25,16 @@ func makeRedriveHandler(rd redriver.Redriver, ms redriver.MessageStore) gin.Hand
 			c.Redirect(http.StatusSeeOther, redirect)
 		}()
 
-		req := c.Request
-		if err := req.ParseForm(); err != nil {
-			log.Printf("Failed to parse deletion confirm form for queue %s: %v",
-				qName, err)
-			sess.AddFlash(fmt.Sprintf("Failed to parsed deletion confirmm form for queue %s",
-				qName))
+		var (
+			messages []redriver.Message
+			done     bool
+		)
+		if messages, done = messagesFromRequest(req, qName, sess, ms); done {
 			return
 		}
-		ids := parseIDs(req.Form, validateUint)
 
-		var messages []redriver.Message
-		if len(ids) == 0 {
-			flash := fmt.Sprintf("Got no message to delete from queue %q", qName)
-			log.Print(flash)
-			sess.AddFlash(flash)
-			return
-		}
-		messages = parseMessages(req.Form, ids)
-		for i, message := range messages {
-			m, ok := ms.Get(message.ReceiptHandle)
-			if !ok {
-				sess.AddFlash(fmt.Sprintf("Failed retrieving message with ID %s and receipt handle %s from queue %q. Aborting redrive.",
-					message.MessageId, message.ReceiptHandle, qName))
-				return
-			}
-			messages[i] = m
-		}
 		var flash string
-		err := rd.RedriveItems(ctx, qName, messages)
-		if err != nil {
+		if err := rd.RedriveItems(ctx, qName, messages); err != nil {
 			flash = fmt.Sprintf("failed redriving selected messages on queue %q: %v",
 				qName, err)
 		} else {
@@ -65,3 +46,33 @@ func makeRedriveHandler(rd redriver.Redriver, ms redriver.MessageStore) gin.Hand
 		sess.AddFlash(flash)
 	}
 }
+
+func messagesFromRequest(req *http.Request, qName string, sess sessions.Session, ms redriver.MessageStore) ([]redriver.Message, bool) {
+	if err := req.ParseForm(); err != nil {
+		log.Printf("Failed to parse deletion confirm form for queue %s: %v",
+			qName, err)
+		sess.AddFlash(fmt.Sprintf("Failed to parsed deletion confirmm form for queue %s",
+			qName))
+		return nil, true
+	}
+	ids := parseIDs(req.Form, validateUint)
+
+	var messages []redriver.Message
+	if len(ids) == 0 {
+		flash := fmt.Sprintf("Got no message to delete from queue %q", qName)
+		log.Print(flash)
+		sess.AddFlash(flash)
+		return nil, true
+	}
+	messages = parseMessages(req.Form, ids)
+	for i, message := range messages {
+		m, ok := ms.Get(message.ReceiptHandle)
+		if !ok {
+			sess.AddFlash(fmt.Sprintf("Failed retrieving message with ID %s and receipt handle %s from queue %q. Aborting redrive.",
+				message.MessageId, message.ReceiptHandle, qName))
+			return nil, true
+		}
+		messages[i] = m
+	}
+	return messages, false
+}

+ 2 - 0
back/web/routes.go

@@ -58,9 +58,11 @@ func SetupRoutes(rd redriver.Redriver, ms redriver.MessageStore, renderer *templ
 func HttpService(dic *izidic.Container) (any, error) {
 	csrfSecret := dic.MustParam(services.PCSRFSecret).([]byte)
 	storeSecret := dic.MustParam(services.PStoreSecret).([]byte)
+
 	rd := dic.MustService(services.SvcRedriver).(redriver.Redriver)
 	re := dic.MustService(services.SvcRenderer).(*template.Template)
 	ms := dic.MustService(services.SvcMessageStore).(redriver.MessageStore)
+
 	return SetupRoutes(rd, ms, re, storeSecret, csrfSecret), nil
 }