Browse Source

Working ListQueues, GetDLQ

Frédéric G. MARAND 1 year ago
parent
commit
3127ace690

+ 15 - 0
.run/Redriver.run.xml

@@ -0,0 +1,15 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="Redriver" type="GoApplicationRunConfiguration" factoryName="Go Application">
+    <module name="sqs_demo" />
+    <working_directory value="$PROJECT_DIR$/back" />
+    <parameters value="-profile=sqs-tutorial -url=https://sqs.eu-west-3.amazonaws.com" />
+    <envs>
+      <env name="AWS_PROFILE" value="sqs-tutorial" />
+    </envs>
+    <kind value="PACKAGE" />
+    <package value="code.osinet.fr/fgm/sqs_demo/cmd/redriver" />
+    <directory value="$PROJECT_DIR$" />
+    <filePath value="$PROJECT_DIR$/demo.go" />
+    <method v="2" />
+  </configuration>
+</component>

+ 3 - 0
Makefile

@@ -0,0 +1,3 @@
+lint:
+	cd back; go vet ./...
+	cd back; staticcheck ./...

File diff suppressed because it is too large
+ 98 - 108
api/dlq.yml


+ 26 - 6
back/cmd/redriver/main.go

@@ -2,13 +2,16 @@ package main
 
 import (
 	"context"
+	"fmt"
 	"io"
 	"log"
 	"os"
 
 	"github.com/fgm/izidic"
+	"gopkg.in/yaml.v2"
 
 	"code.osinet.fr/fgm/sqs_demo/services"
+	"code.osinet.fr/fgm/sqs_demo/services/redriver"
 )
 
 func main() {
@@ -18,12 +21,28 @@ func main() {
 func main2(w io.Writer, name string, args []string) (exitCode byte) {
 	ctx := context.Background()
 	dic := Resolve(w, name, args)
-	lister := dic.MustService(services.SvcLister).(func(ctx context.Context) string)
-	qURL := lister(ctx)
-
-	producer := dic.MustService(services.SvcProducer).(func(ctx context.Context, qName string))
-	producer(ctx, qURL)
-
+	r := dic.MustService(services.SvcRedriver).(redriver.Redriver)
+	qURLs, err := r.ListQueues(ctx, "test")
+	if err != nil {
+		log.Printf("failed listing queues: %v", err)
+		return 1
+	}
+	qMap := map[string]string{}
+	for _, qURL := range qURLs {
+		name, err := redriver.NameFromURL(qURL)
+		if err != nil {
+			log.Println(err)
+			return 2
+		}
+		dlqURL, err := r.GetDLQ(ctx, name)
+		if err != nil {
+			log.Println(err)
+			return 3
+		}
+		qMap[qURL] = dlqURL
+	}
+	bs, _ := yaml.Marshal(qMap)
+	fmt.Fprintf(w, "%s\n", bs)
 	log.Printf("exiting cleanly")
 	return 0
 }
@@ -39,6 +58,7 @@ func Resolve(w io.Writer, name string, args []string) *izidic.Container {
 	dic.Register(services.SvcLister, services.ListerService)
 	dic.Register(services.SvcLogger, services.LoggerService)
 	dic.Register(services.SvcProducer, services.ProducerService)
+	dic.Register(services.SvcRedriver, redriver.RedriverService)
 
 	dic.MustService(services.SvcFlags) // Store generated params before freeze.
 	dic.Freeze()

+ 1 - 0
back/go.mod

@@ -9,6 +9,7 @@ require (
 	github.com/davecgh/go-spew v1.1.0
 	github.com/fgm/izidic v0.0.2
 	github.com/google/uuid v1.3.0
+	github.com/gorilla/mux v1.8.0
 	gopkg.in/yaml.v2 v2.4.0
 )
 

+ 2 - 0
back/go.sum

@@ -32,6 +32,8 @@ github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
+github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
 github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
 github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

+ 1 - 1
back/services/consumer.go

@@ -135,7 +135,7 @@ func receiveMessage(ctx context.Context, w io.Writer, client *sqs.Client, qURL s
 	}
 	msg, err := client.ReceiveMessage(ctx, &rmi)
 	if err != nil {
-		log.Fatalf("failed receiving from queue %s: %v", err)
+		log.Fatalf("failed receiving from queue %s: %v", qURL, err)
 	}
 	spew.Fdump(w, msg.Messages)
 }

+ 55 - 0
back/services/redriver/converters.go

@@ -0,0 +1,55 @@
+package redriver
+
+import (
+	"fmt"
+	"net/url"
+	"strings"
+
+	"github.com/aws/aws-sdk-go-v2/aws/arn"
+)
+
+func NameFromURL(qURL string) (name string, err error) {
+	u, err := url.Parse(qURL)
+	if err != nil {
+		return "", fmt.Errorf("queue URL %q is not a valid URL: %w", qURL, err)
+	}
+	path := u.EscapedPath()
+	parts := strings.Split(strings.Trim(path, "/"), "/")
+	if len(parts) != 2 {
+		return "", fmt.Errorf("queue path %q does not have exactly 2 parts", path)
+	}
+	return parts[1], nil
+}
+
+func NameFromARN(qARN arn.ARN) string {
+	return qARN.Resource
+}
+
+func NameFromARNString(qARN string) (name string, err error) {
+	a, err := arn.Parse(qARN)
+	if err != nil {
+		return "", fmt.Errorf("queue \"ARN\" %q is not an valid ARN", qARN)
+	}
+	return NameFromARN(a), nil
+}
+
+func URLFromARNString(qARN string) (name string, err error) {
+	a, err := arn.Parse(qARN)
+	if err != nil {
+		return "", fmt.Errorf("queue \"ARN\" %q is not an valid ARN", qARN)
+	}
+	return URLFromARN(a)
+}
+
+func URLFromARN(qARN arn.ARN) (name string, err error) {
+	path, err := url.JoinPath("/", qARN.AccountID, qARN.Resource)
+	if err != nil {
+		return "", fmt.Errorf("incorrect queue ARN: %w", err)
+	}
+	u := url.URL{
+		Scheme: "https",
+		Host:   fmt.Sprintf("%s.%s.%s", qARN.Service, qARN.Region, "amazonws.com"),
+		Path:   path,
+	}
+	return u.String(), nil
+}

+ 114 - 0
back/services/redriver/redriver.go

@@ -0,0 +1,114 @@
+package redriver
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io"
+
+	"github.com/aws/aws-sdk-go-v2/aws"
+	"github.com/aws/aws-sdk-go-v2/service/sqs"
+	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
+	"github.com/fgm/izidic"
+
+	"code.osinet.fr/fgm/sqs_demo/services"
+)
+
+type ItemsKeys struct {
+	MessageID     string
+	ReceiptHandle string
+}
+
+// Redriver is a redrive-oriented facade in front of the sqs.Client API.
+type Redriver interface {
+	ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)
+	GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error)
+	GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)
+	GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error)
+	DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error
+	Purge(ctx context.Context, qName string) error
+	RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error
+}
+
+type redriver struct {
+	io.Writer
+	*sqs.Client
+}
+
+func (r redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {
+	lqi := &sqs.ListQueuesInput{
+		MaxResults:      aws.Int32(1000),
+		NextToken:       nil,
+		QueueNamePrefix: aws.String(prefix),
+	}
+	lqo, err := r.Client.ListQueues(ctx, lqi)
+	if err != nil {
+		return nil, fmt.Errorf("listing queues: %w", err)
+	}
+	return lqo.QueueUrls, nil
+}
+
+func (r redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {
+	qui := &sqs.GetQueueUrlInput{QueueName: &qName}
+	qu, err := r.GetQueueUrl(ctx, qui)
+	if err != nil {
+		return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err)
+	}
+	qai := &sqs.GetQueueAttributesInput{
+		QueueUrl:       qu.QueueUrl,
+		AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy},
+	}
+	qao, err := r.Client.GetQueueAttributes(ctx, qai)
+	if err != nil {
+		return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err)
+	}
+	rp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]
+	if !ok {
+		return "", nil
+	}
+	var qrp QueueInfoAttributesRedrivePolicy
+	if err = json.Unmarshal([]byte(rp), &qrp); err != nil {
+		return "", fmt.Errorf("failed parsing redrive policy for queue %q %w",
+			qName, err)
+	}
+	qURL, err := URLFromARNString(qrp.DeadLetterTargetARN)
+	if err != nil {
+		return "", fmt.Errorf(
+			"failed converting queue %q ARN to URL: %w", qName, err)
+	}
+	return qURL, nil
+}
+
+func (r redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {
+	// TODO implement me
+	panic("implement me")
+}
+
+func (r redriver) GetQueueItems(ctx context.Context, qName string, max int) ([]sqs.ReceiveMessageOutput, error) {
+	// TODO implement me
+	panic("implement me")
+}
+
+func (r redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error {
+	// TODO implement me
+	panic("implement me")
+}
+
+func (r redriver) Purge(ctx context.Context, qName string) error {
+	// TODO implement me
+	panic("implement me")
+}
+
+func (r redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {
+	// TODO implement me
+	panic("implement me")
+}
+
+func RedriverService(dic *izidic.Container) (any, error) {
+	cli := dic.MustService(services.SvcClient).(*sqs.Client)
+	w := dic.MustParam(services.PWriter).(io.Writer)
+	return &redriver{
+		Writer: w,
+		Client: cli,
+	}, nil
+}

+ 15 - 28
back/services/redriver.go → back/services/redriver/redriver_future.go

@@ -1,24 +1,13 @@
-package services
+//go:build futuresqs
 
-import (
-	"context"
-	"io"
-
-	"github.com/aws/aws-sdk-go-v2/service/sqs"
-	"github.com/fgm/izidic"
-)
-
-type CreateMoveTaskOutput struct {
-	Status                           *string // Running
-	SourceARN                        *string
-	ApproximateNumberOfMessagesMoved int
-}
-
-// Redriver defines the behavior of DLQ redriver services.
+// The redriver_future.go file contains types and code related to the yet-unpublished
+// DLQ redrive APIs.
 //
-// See https://github.com/aws/aws-sdk-go-v2/issues/1991
-// about the commented-out methods.
-type Redriver interface {
+// See https://github.com/aws/aws-sdk-go-v2/issues/1991 for details.
+package redriver
+
+// Redriver defines the behavior of DLQ redriverV2 services.
+type RedriverV2 interface {
 	// CreateMoveTask()
 
 	// ListMoveTasks() []any
@@ -26,10 +15,10 @@ type Redriver interface {
 	ListDLQs() map[string]string
 }
 
-type redriver struct {
+type redriverV2 struct {
 }
 
-func (r redriver) CreateMoveTask() {
+func (r redriverV2) CreateMoveTask() {
 	// TODO implement me
 	// u := url.URL{
 	// 	Scheme:      "https",
@@ -51,15 +40,13 @@ func (r redriver) CreateMoveTask() {
 	*/
 }
 
-func (r redriver) ListMoveTasks() []any {
+func (r redriverV2) ListMoveTasks() []any {
 	// TODO implement me
 	panic("implement me")
 }
 
-func RedriverService(dic *izidic.Container) (any, error) {
-	cli := dic.MustService(SvcClient).(*sqs.Client)
-	w := dic.MustParam(PWriter).(io.Writer)
-	return func(ctx context.Context, qName string) {
-		senderHandler(ctx, w, cli, qName)
-	}, nil
+type CreateMoveTaskOutput struct {
+	Status                           *string // Running
+	SourceARN                        *string
+	ApproximateNumberOfMessagesMoved int
 }

File diff suppressed because it is too large
+ 46 - 0
back/services/redriver/types.go


Some files were not shown because too many files changed in this diff