Browse Source

Templates queue links. Implements message cache store.

Frederic G. MARAND 1 year ago
parent
commit
f2c90678ae

+ 0 - 3
.run/Consumer.run.xml

@@ -3,9 +3,6 @@
     <module name="sqs_demo" />
     <working_directory value="$PROJECT_DIR$" />
     <parameters value="-profile=sqs-tutorial -url=https://sqs.eu-west-3.amazonaws.com" />
-    <envs>
-      <env name="AWS_PROFILE" value="sqs-tutorial" />
-    </envs>
     <kind value="PACKAGE" />
     <package value="code.osinet.fr/fgm/sqs_demo/back/cmd/consumer/" />
     <directory value="$PROJECT_DIR$" />

+ 0 - 3
.run/Producer.run.xml

@@ -3,9 +3,6 @@
     <module name="sqs_demo" />
     <working_directory value="$PROJECT_DIR$" />
     <parameters value="-profile=sqs-tutorial" />
-    <envs>
-      <env name="AWS_PROFILE" value="sqs-tutorial" />
-    </envs>
     <kind value="PACKAGE" />
     <package value="code.osinet.fr/fgm/sqs_demo/back/cmd/producer/" />
     <directory value="$PROJECT_DIR$" />

+ 2 - 0
back/cmd/producer/main.go

@@ -9,6 +9,7 @@ import (
 	"github.com/fgm/izidic"
 
 	"code.osinet.fr/fgm/sqs_demo/back/services"
+	"code.osinet.fr/fgm/sqs_demo/back/services/redriver"
 )
 
 func main() {
@@ -38,6 +39,7 @@ func Resolve(w io.Writer, name string, args []string) *izidic.Container {
 	dic.Register(services.SvcFlags, services.FlagsService)
 	dic.Register(services.SvcLister, services.ListerService)
 	dic.Register(services.SvcLogger, services.LoggerService)
+	dic.Register(services.SvcMessageStore, redriver.MessageStoreService)
 	dic.Register(services.SvcProducer, services.ProducerService)
 
 	dic.MustService(services.SvcFlags) // Store generated params before freeze.

+ 1 - 0
back/cmd/redriver/main.go

@@ -37,6 +37,7 @@ func Resolve(w io.Writer, name string, args []string) *izidic.Container {
 	dic.Register(services.SvcFlags, services.FlagsService)
 	dic.Register(services.SvcLister, services.ListerService)
 	dic.Register(services.SvcLogger, services.LoggerService)
+	dic.Register(services.SvcMessageStore, redriver.MessageStoreService)
 	dic.Register(services.SvcProducer, services.ProducerService)
 	dic.Register(services.SvcRedriver, redriver.RedriverService)
 	dic.Register(services.SvcHttp, web.HttpService)

+ 120 - 0
back/services/redriver/message_store.go

@@ -0,0 +1,120 @@
+package redriver
+
+import (
+	"errors"
+	"log"
+	"sync"
+	"time"
+
+	"github.com/fgm/izidic"
+)
+
+// Ensure messageStore implements interface MessageStore.
+var _ MessageStore = &messageStore{}
+
+// MessageStore is a Message local cache with TTL.
+type MessageStore interface {
+	Set(m Message) error
+	Get(receiptHandle string) (Message, bool)
+	Clear(receiptHandle string)
+	Flush()
+	Close()
+}
+
+type messageStoreEntry struct {
+	value  Message
+	stored time.Time
+}
+
+type messageStore struct {
+	ttl time.Duration
+	sync.Mutex
+	sync.Map
+	done chan struct{}
+}
+
+func (ms *messageStore) Close() {
+	ms.done <- struct{}{}
+}
+
+func (ms *messageStore) Set(m Message) error {
+	if m.ReceiptHandle == "" {
+		return errors.New("cannot store a message with a nil ReceiptHandle")
+	}
+	mse := messageStoreEntry{
+		value:  m,
+		stored: time.Now(),
+	}
+	ms.Store(m.ReceiptHandle, mse)
+	return nil
+}
+
+func (ms *messageStore) Get(receiptHandle string) (Message, bool) {
+	v, ok := ms.Load(receiptHandle)
+	if !ok {
+		return Message{}, false
+	}
+	m, ok := v.(messageStoreEntry)
+	if !ok {
+		return Message{}, false
+	}
+	return m.value, true
+}
+
+func (ms *messageStore) Clear(receiptHandle string) {
+	ms.Map.Delete(receiptHandle)
+}
+
+func (ms *messageStore) Flush() {
+	ms.Lock()
+	ms.Map = sync.Map{}
+	ms.Unlock()
+}
+
+func (ms *messageStore) expire() {
+	start := time.Now()
+	ms.Range(func(k, v any) bool {
+		log.Printf("expiring")
+		rh, ok := k.(string)
+		if !ok {
+			log.Printf("map contains a non-string key: %T", k)
+		}
+		mse, ok := v.(messageStoreEntry)
+		if !ok {
+			log.Printf("map contains a non-Messsage entry: %T", v)
+			return false
+		}
+		if start.Sub(mse.stored) > ms.ttl {
+			log.Printf("clearing message %s", rh)
+			ms.Clear(rh)
+		}
+		return true
+	})
+}
+
+func (ms *messageStore) scheduleExpiries() {
+	t := time.NewTicker(ms.ttl / 2)
+	live := true
+	for live {
+		select {
+		case <-ms.done:
+			live = false
+		case <-t.C:
+			ms.expire()
+		}
+	}
+}
+
+func newMessageStore(ttl time.Duration) *messageStore {
+	ms := messageStore{
+		ttl:  ttl,
+		Map:  sync.Map{},
+		done: make(chan struct{}),
+	}
+	go ms.scheduleExpiries()
+	return &ms
+}
+
+func MessageStoreService(dic *izidic.Container) (any, error) {
+	return newMessageStore(10 * time.Second), nil
+}

+ 111 - 0
back/services/redriver/message_store_test.go

@@ -0,0 +1,111 @@
+package redriver
+
+import (
+	"log"
+	"strconv"
+	"strings"
+	"testing"
+	"time"
+)
+
+func TestMessageStore_TTL(t *testing.T) {
+	const (
+		rh  = "foo"
+		ttl = 1 * time.Millisecond
+	)
+	m1 := Message{ReceiptHandle: rh}
+	actual := newMessageStore(ttl)
+
+	_, ok := actual.Get(rh)
+	if ok {
+		t.Errorf("message found, but not yet stored")
+	}
+
+	actual.Set(m1)
+	m2, ok := actual.Get(rh)
+	if !ok {
+		t.Errorf("message not found, but expected to find it")
+	}
+	if m2.ReceiptHandle != rh {
+		t.Errorf("message RH is %q but key is %q", m2.ReceiptHandle, rh)
+	}
+	time.Sleep(3 * ttl)
+	_, ok = actual.Get(rh)
+	if ok {
+		t.Errorf("message found, but expected it to be expired")
+	}
+}
+
+func TestMessageStore_Flush(t *testing.T) {
+	len := func(ms *messageStore) int {
+		count := 0
+		ms.Range(func(_, _ any) bool {
+			count++
+			return true
+		})
+		return count
+	}
+	const limit = 10
+	ms := newMessageStore(1 * time.Minute)
+	defer ms.Close()
+	for i := 0; i < limit; i++ {
+		ms.Set(Message{ReceiptHandle: strconv.Itoa(i)})
+	}
+	if actualLen := len(ms); actualLen != limit {
+		t.Fatalf("expected %d items, got %d", len(ms), actualLen)
+	}
+	ms.Flush()
+	if actualLen := len(ms); actualLen != 0 {
+		t.Fatalf("expected no items, got %d", actualLen)
+	}
+
+}
+
+func TestMessageStore_Set(t *testing.T) {
+	for _, test := range [...]struct {
+		name      string
+		input     string
+		expectErr bool
+	}{
+		{"valid RH", "foo", false},
+		{"empty RH", "", true},
+	} {
+		t.Run(test.name, func(t *testing.T) {
+			ms := newMessageStore(time.Minute)
+			defer ms.Close()
+			actual := ms.Set(Message{ReceiptHandle: test.input})
+			if actual != nil != test.expectErr {
+				t.Errorf("expected error %t, but got error %t", test.expectErr, actual)
+			}
+		})
+	}
+}
+
+func TestMessageStore_Get_sad(t *testing.T) {
+	ms := newMessageStore(time.Minute)
+	defer ms.Close()
+	ms.Map.Store("foo", "sad")
+	_, ok := ms.Get("foo")
+	if ok {
+		t.Fatalf("expected failure, but got success")
+	}
+}
+
+func TestMessageStore_expire(t *testing.T) {
+	const ttl = 1 * time.Millisecond
+	ms := newMessageStore(ttl)
+	w := &strings.Builder{}
+	log.SetOutput(w)
+	ms.Map.Store(13, Message{ReceiptHandle: "13"})
+	ms.Map.Store("13", "vendredi")
+	time.Sleep(3 * ttl)
+	ms.Close()
+	s := w.String()
+	if !strings.Contains(s, "map contains a non-string key") {
+		t.Errorf("expected expiry to have logged a key warning, but found none")
+	}
+	if !strings.Contains(s, "map contains a non-Messsage entry") {
+		t.Errorf("expected expiry to have logged a data warning, but found none")
+	}
+
+}

+ 24 - 12
back/services/redriver/redriver.go

@@ -8,6 +8,7 @@ import (
 	"log"
 	"strconv"
 	"strings"
+	"time"
 
 	"github.com/aws/aws-sdk-go-v2/aws"
 	"github.com/aws/aws-sdk-go-v2/service/sqs"
@@ -285,7 +286,13 @@ func (r *redriver) GetQueueItems(ctx context.Context, qName string) ([]Message,
 		VisibilityTimeout:     0,
 		WaitTimeSeconds:       r.Wait,
 	}
+	t0 := time.Now()
 	rmo, err := r.Client.ReceiveMessage(ctx, &rmi)
+	d := time.Since(t0)
+	if err != nil {
+		return nil, fmt.Errorf("after %v, failed receiving messages for queue %q: %w",
+			d, qName, err)
+	}
 	ms := make([]Message, 0, len(rmo.Messages))
 	for _, m := range rmo.Messages {
 		ma, err := r.parseReceivedAttributes(m)
@@ -310,16 +317,22 @@ func (r *redriver) GetQueueItems(ctx context.Context, qName string) ([]Message,
 		if err != nil {
 			return nil, fmt.Errorf("failed decoding message boby: %w", err)
 		}
-		m := Message{
-			Attributes:             ma,
-			Body:                   *m.Body,
-			Md5OfBody:              *m.MD5OfBody,
-			Md5OfMessageAttributes: *m.MD5OfMessageAttributes,
-			MessageAttributes:      j,
-			MessageId:              *m.MessageId,
-			ReceiptHandle:          *m.ReceiptHandle,
+		m2 := Message{
+			Attributes:        ma,
+			MessageAttributes: j,
+		}
+		for _, pair := range []struct{ dest, src *string }{
+			{&m2.Body, m.Body},
+			{&m2.Md5OfBody, m.MD5OfBody},
+			{&m2.Md5OfMessageAttributes, m.MD5OfMessageAttributes},
+			{&m2.MessageId, m.MessageId},
+			{&m2.ReceiptHandle, m.ReceiptHandle},
+		} {
+			if pair.src != nil {
+				*pair.dest = *pair.src
+			}
 		}
-		ms = append(ms, m)
+		ms = append(ms, m2)
 	}
 	return ms, err
 }
@@ -390,9 +403,8 @@ func RedriverService(dic *izidic.Container) (any, error) {
 	w := dic.MustParam(services.PWriter).(io.Writer)
 	wait := int32(dic.MustParam(services.PWait).(int))
 	return &redriver{
-		Wait: wait,
-
-		Writer: w,
 		Client: cli,
+		Wait:   wait,
+		Writer: w,
 	}, nil
 }

+ 11 - 10
back/services/services.go

@@ -27,16 +27,17 @@ const (
 	PWriter  = "writer"
 
 	// Services
-	SvcClient   = "sqs"
-	SvcConsumer = "consumeMessage"
-	SvcFlags    = "flags"
-	SvcHttp     = "http"
-	SvcLister   = "lister"
-	SvcLogger   = "logger"
-	SvcProducer = "producer"
-	SvcReceiver = "receiver"
-	SvcRedriver = "redriver"
-	SvcRenderer = "renderer"
+	SvcClient       = "sqs"
+	SvcConsumer     = "consume_message"
+	SvcFlags        = "flags"
+	SvcHttp         = "http"
+	SvcLister       = "lister"
+	SvcLogger       = "logger"
+	SvcMessageStore = "message_store"
+	SvcProducer     = "producer"
+	SvcReceiver     = "receiver"
+	SvcRedriver     = "redriver"
+	SvcRenderer     = "renderer"
 )
 
 func FlagsService(dic *izidic.Container) (any, error) {

+ 1 - 1
back/web/confirm.go

@@ -41,7 +41,7 @@ var confirms = map[QueueOp]struct {
 	Level                 // danger|warning
 }{
 	OpDelete: {
-		confirm:     "Delete",
+		confirm:     "Clear",
 		description: "These messages cannot be recovered after that step",
 		question:    "Do you confirm this deletion request?",
 		Level:       LevelDanger,

+ 4 - 1
back/web/queue.go

@@ -11,7 +11,7 @@ import (
 	"code.osinet.fr/fgm/sqs_demo/back/services/redriver"
 )
 
-func makeQueueHandler(rd redriver.Redriver) gin.HandlerFunc {
+func makeQueueHandler(rd redriver.Redriver, ms redriver.MessageStore) gin.HandlerFunc {
 	return func(c *gin.Context) {
 		ctx := c.Request.Context()
 		qName := c.Param("name")
@@ -34,6 +34,9 @@ func makeQueueHandler(rd redriver.Redriver) gin.HandlerFunc {
 			c.HTML(http.StatusInternalServerError, "500", nil)
 			return
 		}
+		for _, item := range items {
+			ms.Set(item)
+		}
 		itemsLatency := time.Since(t0) - infoLatency
 		c.HTML(http.StatusOK, "queue-get", gin.H{
 			"flashes": flashes,

+ 5 - 3
back/web/routes.go

@@ -21,7 +21,7 @@ import (
 	"code.osinet.fr/fgm/sqs_demo/front"
 )
 
-func SetupRoutes(rd redriver.Redriver, renderer *template.Template, storeSecret, csrfSecret []byte) *gin.Engine {
+func SetupRoutes(rd redriver.Redriver, ms redriver.MessageStore, renderer *template.Template, storeSecret, csrfSecret []byte) *gin.Engine {
 	const assetsPrefix = "/assets/"
 	r := gin.Default()
 	r.SetHTMLTemplate(renderer)
@@ -43,7 +43,7 @@ func SetupRoutes(rd redriver.Redriver, renderer *template.Template, storeSecret,
 	r.GET("/queue/:name/confirm", mw, makeConfirmHandler()) // Needs mw to set token.
 
 	// Back done, front WIP
-	r.GET("/queue/:name", makeQueueHandler(rd))
+	r.GET("/queue/:name", makeQueueHandler(rd, ms))
 
 	// JSON done
 	r.GET("/", makeHomeHandler(rd))
@@ -60,7 +60,8 @@ func HttpService(dic *izidic.Container) (any, error) {
 	storeSecret := dic.MustParam(services.PStoreSecret).([]byte)
 	rd := dic.MustService(services.SvcRedriver).(redriver.Redriver)
 	re := dic.MustService(services.SvcRenderer).(*template.Template)
-	return SetupRoutes(rd, re, storeSecret, csrfSecret), nil
+	ms := dic.MustService(services.SvcMessageStore).(redriver.MessageStore)
+	return SetupRoutes(rd, ms, re, storeSecret, csrfSecret), nil
 }
 
 func RendererService(_ *izidic.Container) (any, error) {
@@ -72,6 +73,7 @@ func RendererService(_ *izidic.Container) (any, error) {
 		"timestamp": func(ts int64) time.Time {
 			return time.Unix(ts, 0)
 		},
+		"nameFromARN": redriver.NameFromARNString,
 	}).Funcs(sprig.FuncMap())
 	for _, tpl := range []struct {
 		name  string

+ 2 - 2
front/templates/queue.gohtml

@@ -63,7 +63,7 @@
                 {{ if $attr.RedrivePolicy }}
                     <tr>
                         <th scope="row">DeadLetter queue</th>
-                        <td>ARN: {{ $attr.RedrivePolicy.DeadLetterTargetARN }}</td>
+                        <td><a href="/queue/{{ nameFromARN $attr.RedrivePolicy.DeadLetterTargetARN }}">{{ $attr.RedrivePolicy.DeadLetterTargetARN }}</a></td>
                         <td>Max. receive count: {{ $attr.RedrivePolicy.MaxReceiveCount }}</td>
                     </tr>
                 {{ end }}
@@ -74,7 +74,7 @@
                         <td>Queues:
                             <ul>
                                 {{ range $attr.RedriveAllowPolicy.SourceQueueARNs }}
-                                    <li>{{ . }}</li>
+                                    <li><a href="/queue/{{- nameFromARN . -}}">{{ . }}</a></li>
                                 {{ end }}
                             </ul>
                         </td>