Sfoglia il codice sorgente

Split consumer and producer to prepare for redriver.

Frederic G. MARAND 1 anno fa
parent
commit
1a42a33d99

+ 2 - 2
.run/Demo.run.xml → .run/Consumer.run.xml

@@ -1,5 +1,5 @@
 <component name="ProjectRunConfigurationManager">
-  <configuration default="false" name="Demo" type="GoApplicationRunConfiguration" factoryName="Go Application">
+  <configuration default="false" name="Consumer" type="GoApplicationRunConfiguration" factoryName="Go Application">
     <module name="sqs_demo" />
     <working_directory value="$PROJECT_DIR$" />
     <parameters value="-profile=sqs-tutorial -url=https://sqs.eu-west-3.amazonaws.com" />
@@ -7,7 +7,7 @@
       <env name="AWS_PROFILE" value="sqs-tutorial" />
     </envs>
     <kind value="PACKAGE" />
-    <package value="code.osinet.fr/fgm/sqs_demo" />
+    <package value="code.osinet.fr/fgm/sqs_demo/cmd/consumer/" />
     <directory value="$PROJECT_DIR$" />
     <filePath value="$PROJECT_DIR$/demo.go" />
     <method v="2" />

+ 0 - 30
back/aws.go

@@ -1,30 +0,0 @@
-package main
-
-import (
-	"fmt"
-
-	"github.com/aws/aws-sdk-go-v2/aws"
-)
-
-type endpointResolver struct {
-	region string
-}
-
-func (e endpointResolver) ResolveEndpoint(service, region string, options ...interface{}) (aws.Endpoint, error) {
-	if service != `SQS` {
-		return aws.Endpoint{}, fmt.Errorf("trying to resolve non-SQS service: %s", service)
-	}
-	ep := aws.Endpoint{
-		URL:               "http://localhost:4566/",
-		HostnameImmutable: false,
-		PartitionID:       "000000000000",
-		SigningName:       "",
-		SigningRegion:     e.region,
-		SigningMethod:     "",
-		Source:            0,
-	}
-	if region != "" {
-		ep.URL = fmt.Sprintf("https://sqs.%s.amazonaws.com/", region)
-	}
-	return ep, nil
-}

+ 0 - 0
back/handler.go → back/cmd/consumer/handler.go


+ 56 - 0
back/cmd/consumer/main.go

@@ -0,0 +1,56 @@
+package main
+
+import (
+	"context"
+	"io"
+	"log"
+	"os"
+
+	"github.com/fgm/izidic"
+
+	"code.osinet.fr/fgm/sqs_demo/di"
+)
+
+func main() {
+	os.Exit(int(main2(os.Stdout, os.Args[0], os.Args[1:])))
+}
+
+func main2(w io.Writer, name string, args []string) (exitCode byte) {
+	ctx := context.Background()
+	dic := Resolve(w, name, args)
+	lister := dic.MustService(di.Lister).(func(ctx context.Context) string)
+	qURL := lister(ctx)
+
+	if false {
+		receiver := dic.MustService(di.Receiver).(func(ctx context.Context, qURL string))
+		receiver(ctx, qURL)
+	}
+	consumer := dic.MustService(di.Consumer).(func(ctx context.Context, qURL string) error)
+	err := consumer(ctx, qURL)
+	if err != nil {
+		log.Printf("error in consumer: %v, aborting", err)
+		exitCode = 1
+	}
+
+	log.Printf("exiting cleanly")
+	return 0
+}
+
+func Resolve(w io.Writer, name string, args []string) *izidic.Container {
+	dic := izidic.New()
+	dic.Store("name", name)
+	dic.Store("args", args)
+	dic.Store("writer", w)
+	dic.Store("handler", di.Handler(HandleDummy))
+
+	dic.Register(di.Client, di.SQSClientService)
+	dic.Register(di.Consumer, di.ConsumerService)
+	dic.Register(di.Flags, di.FlagsService)
+	dic.Register(di.Lister, di.ListerService)
+	dic.Register(di.Logger, di.LoggerService)
+	dic.Register(di.Receiver, di.ReceiverService)
+
+	dic.MustService(di.Flags) // Store generated params before freeze.
+	dic.Freeze()
+	return dic
+}

+ 46 - 0
back/cmd/producer/main.go

@@ -0,0 +1,46 @@
+package main
+
+import (
+	"context"
+	"io"
+	"log"
+	"os"
+
+	"github.com/fgm/izidic"
+
+	"code.osinet.fr/fgm/sqs_demo/di"
+)
+
+func main() {
+	os.Exit(int(main2(os.Stdout, os.Args[0], os.Args[1:])))
+}
+
+func main2(w io.Writer, name string, args []string) (exitCode byte) {
+	ctx := context.Background()
+	dic := Resolve(w, name, args)
+	lister := dic.MustService(di.Lister).(func(ctx context.Context) string)
+	qURL := lister(ctx)
+
+	producer := dic.MustService(di.Producer).(func(ctx context.Context, qName string))
+	producer(ctx, qURL)
+
+	log.Printf("exiting cleanly")
+	return 0
+}
+
+func Resolve(w io.Writer, name string, args []string) *izidic.Container {
+	dic := izidic.New()
+	dic.Store("name", name)
+	dic.Store("args", args)
+	dic.Store("writer", w)
+
+	dic.Register(di.Client, di.SQSClientService)
+	dic.Register(di.Flags, di.FlagsService)
+	dic.Register(di.Lister, di.ListerService)
+	dic.Register(di.Logger, di.LoggerService)
+	dic.Register(di.Producer, di.ProducerService)
+
+	dic.MustService(di.Flags) // Store generated params before freeze.
+	dic.Freeze()
+	return dic
+}

+ 0 - 138
back/demo.go

@@ -1,138 +0,0 @@
-package main
-
-import (
-	"context"
-	"encoding/json"
-	"flag"
-	"fmt"
-	"io"
-	"log"
-	"os"
-
-	"github.com/aws/aws-sdk-go-v2/config"
-	"github.com/aws/aws-sdk-go-v2/service/sqs"
-	"github.com/fgm/izidic"
-)
-
-func main() {
-	os.Exit(int(main2(os.Stdout, os.Args[0], os.Args[1:])))
-}
-
-func main2(w io.Writer, name string, args []string) byte {
-	ctx := context.Background()
-	dic := resolve(w, name, args)
-	lister := dic.MustService("lister").(func(ctx context.Context) string)
-	qURL := lister(ctx)
-
-	if false {
-		receiver := dic.MustService("receiver").(func(ctx context.Context, qURL string))
-		receiver(ctx, qURL)
-	}
-	producer := dic.MustService("producer").(func(ctx context.Context, qName string))
-	go producer(ctx, qURL)
-
-	consumer := dic.MustService("consumer").(func(ctx context.Context, qURL string) error)
-	err := consumer(ctx, qURL)
-	if err != nil {
-		log.Printf("error in consumer: %v, aborting", err)
-		return 1
-	}
-	log.Printf("exiting cleanly")
-	return 0
-}
-
-func resolve(w io.Writer, name string, args []string) *izidic.Container {
-	dic := izidic.New()
-	dic.Store("name", name)
-	dic.Store("args", args)
-	dic.Store("writer", w)
-	dic.Store("handler", Handler(HandleDummy))
-	dic.Register("flags", flagsService)
-	dic.Register("logger", loggerService)
-	dic.Register("sqs", sqsClientService)
-	dic.Register("lister", listerService)
-	dic.Register("receiver", receiverService)
-	dic.Register("producer", producerService)
-	dic.Register("consumer", consumerService)
-
-	dic.MustService("flags") // Store generated params before freeze.
-	dic.Freeze()
-	return dic
-}
-
-func flagsService(dic *izidic.Container) (any, error) {
-	fs := flag.NewFlagSet(dic.MustParam("name").(string), flag.ContinueOnError)
-	profile := fs.String("profile", "test-profile", "The AWS profile")
-	region := fs.String("region", "eu-west-3", "The AWS region to connect to")
-	qName := fs.String("queue-name", "dummy-queue", "The queue name")
-	sqsURL := fs.String("url", "http://localhost:4566", "The SQS endpoint URL")
-	if err := fs.Parse(dic.MustParam("args").([]string)); err != nil {
-		return nil, fmt.Errorf("cannot obtain CLI args")
-	}
-
-	dic.Store("profile", *profile)
-	dic.Store("region", *region)
-	dic.Store("url", *sqsURL)
-	dic.Store("queue-name", *qName)
-	return fs, nil
-}
-
-// loggerService is an izidic.Service also containing a one-time initialization action.
-func loggerService(dic *izidic.Container) (any, error) {
-	w := dic.MustParam("writer").(io.Writer)
-	log.SetOutput(w) // Support dependency code not taking an injected logger.
-	logger := log.New(w, "", log.LstdFlags)
-	return logger, nil
-}
-
-func sqsClientService(dic *izidic.Container) (any, error) {
-	ctx := context.Background()
-	profile := dic.MustParam("profile").(string)
-	region := dic.MustParam("region").(string)
-	epr := endpointResolver{region: region}
-	cfg, err := config.LoadDefaultConfig(ctx,
-		config.WithRegion(region),
-		config.WithSharedConfigProfile(profile),
-		config.WithEndpointResolverWithOptions(epr),
-	)
-	if err != nil {
-		return nil, fmt.Errorf("failed loading default AWS config: %w", err)
-	}
-
-	client := sqs.NewFromConfig(cfg)
-	return client, nil
-}
-
-func listerService(dic *izidic.Container) (any, error) {
-	cli := dic.MustService("sqs").(*sqs.Client)
-	w := dic.MustParam("writer").(io.Writer)
-	return func(ctx context.Context) string {
-		return lister(ctx, w, cli)
-	}, nil
-}
-
-func receiverService(dic *izidic.Container) (any, error) {
-	cli := dic.MustService("sqs").(*sqs.Client)
-	w := dic.MustParam("writer").(io.Writer)
-	return func(ctx context.Context, qURL string) {
-		receiver(ctx, w, cli, qURL)
-	}, nil
-}
-
-func producerService(dic *izidic.Container) (any, error) {
-	cli := dic.MustService("sqs").(*sqs.Client)
-	w := dic.MustParam("writer").(io.Writer)
-	return func(ctx context.Context, qName string) {
-		sender(ctx, w, cli, qName)
-	}, nil
-}
-
-func consumerService(dic *izidic.Container) (any, error) {
-	cli := dic.MustService("sqs").(*sqs.Client)
-	w := dic.MustParam("writer").(io.Writer)
-	hdl := dic.MustParam("handler").(Handler)
-	enc := json.NewEncoder(w)
-	return func(ctx context.Context, qURL string) error {
-		return consumer(ctx, w, enc, cli, qURL, hdl)
-	}, nil
-}

+ 52 - 0
back/di/client.go

@@ -0,0 +1,52 @@
+package di
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/aws/aws-sdk-go-v2/aws"
+	"github.com/aws/aws-sdk-go-v2/config"
+	"github.com/aws/aws-sdk-go-v2/service/sqs"
+	"github.com/fgm/izidic"
+)
+
+func SQSClientService(dic *izidic.Container) (any, error) {
+	ctx := context.Background()
+	profile := dic.MustParam("profile").(string)
+	region := dic.MustParam("region").(string)
+	epr := endpointResolver{region: region}
+	cfg, err := config.LoadDefaultConfig(ctx,
+		config.WithRegion(region),
+		config.WithSharedConfigProfile(profile),
+		config.WithEndpointResolverWithOptions(epr),
+	)
+	if err != nil {
+		return nil, fmt.Errorf("failed loading default AWS config: %w", err)
+	}
+
+	client := sqs.NewFromConfig(cfg)
+	return client, nil
+}
+
+type endpointResolver struct {
+	region string
+}
+
+func (e endpointResolver) ResolveEndpoint(service, region string, options ...interface{}) (aws.Endpoint, error) {
+	if service != `SQS` {
+		return aws.Endpoint{}, fmt.Errorf("trying to Resolve non-SQS service: %s", service)
+	}
+	ep := aws.Endpoint{
+		URL:               "http://localhost:4566/",
+		HostnameImmutable: false,
+		PartitionID:       "000000000000",
+		SigningName:       "",
+		SigningRegion:     e.region,
+		SigningMethod:     "",
+		Source:            0,
+	}
+	if region != "" {
+		ep.URL = fmt.Sprintf("https://sqs.%s.amazonaws.com/", region)
+	}
+	return ep, nil
+}

+ 70 - 51
back/consumer.go → back/di/consumer.go

@@ -1,4 +1,4 @@
-package main
+package di
 
 import (
 	"context"
@@ -11,41 +11,39 @@ import (
 	"strconv"
 	"time"
 
-	"github.com/aws/aws-sdk-go-v2/aws"
 	"github.com/aws/aws-sdk-go-v2/service/sqs"
 	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
 	"github.com/davecgh/go-spew/spew"
+	"github.com/fgm/izidic"
 	"github.com/google/uuid"
-	"gopkg.in/yaml.v2"
 )
 
-func lister(ctx context.Context, w io.Writer, client *sqs.Client) string {
-	lqo, err := client.ListQueues(ctx, &sqs.ListQueuesInput{
-		MaxResults:      aws.Int32(10),
-		NextToken:       nil,
-		QueueNamePrefix: aws.String(""),
-	})
-	if err != nil {
-		log.Fatalf("failed listing queues: %v", err)
-	}
-	y := yaml.NewEncoder(w)
-	y.Encode(lqo.QueueUrls)
-	return lqo.QueueUrls[0]
+type Event struct {
+	MessageID uuid.UUID `json:"MessageId"`
+	BodySum   string    `json:"MD5OfBody"`
+	AttrSum   string    `json:"MD5MofMessageAttributes"`
+
+	EventAttributes
+	MessageAttributes map[string]types.MessageAttributeValue
+	Body              []byte
 }
 
-func receiver(ctx context.Context, w io.Writer, client *sqs.Client, qURL string) {
-	rmi := sqs.ReceiveMessageInput{
-		QueueUrl:              &qURL,
-		AttributeNames:        []types.QueueAttributeName{"All"},
-		MessageAttributeNames: []string{"All"},
-		VisibilityTimeout:     1,
-		WaitTimeSeconds:       5,
+func (e Event) IsRetryable() bool {
+	maybeRetry, ok := e.MessageAttributes["retry"]
+	if !ok {
+		return false
 	}
-	msg, err := client.ReceiveMessage(ctx, &rmi)
-	if err != nil {
-		log.Fatalf("failed receiving from queue %s: %v", err)
+	if maybeRetry.DataType == nil || *maybeRetry.DataType != "String" || maybeRetry.StringValue == nil {
+		return false
 	}
-	spew.Fdump(w, msg.Messages)
+	return *maybeRetry.StringValue == "1"
+}
+
+type EventAttributes struct {
+	SenderID                         string    `json:"SenderId"`
+	SentTime                         time.Time `json:"SentTimestamp"`
+	ApproximateReceiveCount          int       `json:"ApproximateReceiveCount"`
+	ApproximateFirstReceiveTimestamp time.Time `json:"ApproximateFirstReceiveTimestamp"`
 }
 
 type Handler func(ctx context.Context, enc *json.Encoder, msgID uuid.UUID, sent time.Time, input []byte, meta map[string]types.MessageAttributeValue) error
@@ -59,7 +57,25 @@ func (m message) String() string {
 	return *m.MessageId
 }
 
-func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.Client, qURL string, hdl Handler) error {
+func ConsumerService(dic *izidic.Container) (any, error) {
+	cli := dic.MustService("sqs").(*sqs.Client)
+	w := dic.MustParam("writer").(io.Writer)
+	hdl := dic.MustParam("handler").(Handler)
+	enc := json.NewEncoder(w)
+	return func(ctx context.Context, qURL string) error {
+		return consumeMessage(ctx, w, enc, cli, qURL, hdl)
+	}, nil
+}
+
+func ReceiverService(dic *izidic.Container) (any, error) {
+	cli := dic.MustService("sqs").(*sqs.Client)
+	w := dic.MustParam("writer").(io.Writer)
+	return func(ctx context.Context, qURL string) {
+		receiveMessage(ctx, w, cli, qURL)
+	}, nil
+}
+
+func consumeMessage(ctx context.Context, _ io.Writer, enc *json.Encoder, client *sqs.Client, qURL string, hdl Handler) error {
 	rmi := sqs.ReceiveMessageInput{
 		QueueUrl:              &qURL,
 		AttributeNames:        []types.QueueAttributeName{"All"},
@@ -84,7 +100,7 @@ func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.C
 		msg := message(recv.Messages[0])
 		evt, err := validateMessage(msg)
 		if err != nil {
-			log.Printf("invalid message %s: %w, dropping it anyway", msg, err)
+			log.Printf("invalid message %s: %v, dropping it anyway", msg, err)
 		} else {
 			if err := hdl(ctx, enc, evt.MessageID, evt.SentTime, evt.Body, evt.MessageAttributes); err != nil {
 				log.Printf("message %s failed processing : %v, dropping it anyway\n", msg, err)
@@ -92,33 +108,36 @@ func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.C
 				log.Printf("message %s processed successfully\n", msg)
 			}
 		}
-		dmi := sqs.DeleteMessageInput{
-			QueueUrl:      &qURL,
-			ReceiptHandle: msg.ReceiptHandle,
-		}
-		_, err = client.DeleteMessage(ctx, &dmi)
-		if err != nil {
-			log.Printf("Error deleting message %s after successful processing: %v\n", msg, err)
-			continue
+		if evt.IsRetryable() {
+			log.Printf("message %s not deleted, for retry", msg)
+		} else {
+			dmi := sqs.DeleteMessageInput{
+				QueueUrl:      &qURL,
+				ReceiptHandle: msg.ReceiptHandle,
+			}
+			_, err = client.DeleteMessage(ctx, &dmi)
+			if err != nil {
+				log.Printf("Error deleting message %s after successful processing: %v\n", msg, err)
+				continue
+			}
+			log.Printf("message %s deleted after processing\n", msg)
 		}
-		log.Printf("message %s deleted after processing\n", msg)
 	}
 }
 
-type EventAttributes struct {
-	SenderID                         string    `json:"SenderId"`
-	SentTime                         time.Time `json:"SentTimestamp"`
-	ApproximateReceiveCount          int       `json:"ApproximateReceiveCount"`
-	ApproximateFirstReceiveTimestamp time.Time `json:"ApproximateFirstReceiveTimestamp"`
-}
-type Event struct {
-	MessageID uuid.UUID `json:"MessageId"`
-	BodySum   string    `json:"MD5OfBody"`
-	AttrSum   string    `json:"MD5MofMessageAttributes"`
-
-	EventAttributes
-	MessageAttributes map[string]types.MessageAttributeValue
-	Body              []byte
+func receiveMessage(ctx context.Context, w io.Writer, client *sqs.Client, qURL string) {
+	rmi := sqs.ReceiveMessageInput{
+		QueueUrl:              &qURL,
+		AttributeNames:        []types.QueueAttributeName{"All"},
+		MessageAttributeNames: []string{"All"},
+		VisibilityTimeout:     1,
+		WaitTimeSeconds:       5,
+	}
+	msg, err := client.ReceiveMessage(ctx, &rmi)
+	if err != nil {
+		log.Fatalf("failed receiving from queue %s: %v", err)
+	}
+	spew.Fdump(w, msg.Messages)
 }
 
 func validateMessage(msg message) (*Event, error) {

+ 45 - 0
back/di/di.go

@@ -0,0 +1,45 @@
+package di
+
+import (
+	"flag"
+	"fmt"
+	"io"
+	"log"
+
+	"github.com/fgm/izidic"
+)
+
+const (
+	Client   = "sqs"
+	Consumer = "consumeMessage"
+	Flags    = "flags"
+	Lister   = "lister"
+	Logger   = "logger"
+	Producer = "producer"
+	Receiver = "receiver"
+)
+
+func FlagsService(dic *izidic.Container) (any, error) {
+	fs := flag.NewFlagSet(dic.MustParam("name").(string), flag.ContinueOnError)
+	profile := fs.String("profile", "test-profile", "The AWS profile")
+	region := fs.String("region", "eu-west-3", "The AWS region to connect to")
+	qName := fs.String("queue-name", "dummy-queue", "The queue name")
+	sqsURL := fs.String("url", "http://localhost:4566", "The SQS endpoint URL")
+	if err := fs.Parse(dic.MustParam("args").([]string)); err != nil {
+		return nil, fmt.Errorf("cannot obtain CLI args")
+	}
+
+	dic.Store("profile", *profile)
+	dic.Store("region", *region)
+	dic.Store("url", *sqsURL)
+	dic.Store("queue-name", *qName)
+	return fs, nil
+}
+
+// LoggerService is an izidic.Service also containing a one-time initialization action.
+func LoggerService(dic *izidic.Container) (any, error) {
+	w := dic.MustParam("writer").(io.Writer)
+	log.SetOutput(w) // Support dependency code not taking an injected logger.
+	logger := log.New(w, "", log.LstdFlags)
+	return logger, nil
+}

+ 34 - 0
back/di/lister.go

@@ -0,0 +1,34 @@
+package di
+
+import (
+	"context"
+	"io"
+	"log"
+
+	"github.com/aws/aws-sdk-go-v2/aws"
+	"github.com/aws/aws-sdk-go-v2/service/sqs"
+	"github.com/fgm/izidic"
+	"gopkg.in/yaml.v2"
+)
+
+func ListerService(dic *izidic.Container) (any, error) {
+	cli := dic.MustService("sqs").(*sqs.Client)
+	w := dic.MustParam("writer").(io.Writer)
+	return func(ctx context.Context) string {
+		return lister(ctx, w, cli)
+	}, nil
+}
+
+func lister(ctx context.Context, w io.Writer, client *sqs.Client) string {
+	lqo, err := client.ListQueues(ctx, &sqs.ListQueuesInput{
+		MaxResults:      aws.Int32(10),
+		NextToken:       nil,
+		QueueNamePrefix: aws.String(""),
+	})
+	if err != nil {
+		log.Fatalf("failed listing queues: %v", err)
+	}
+	y := yaml.NewEncoder(w)
+	y.Encode(lqo.QueueUrls)
+	return lqo.QueueUrls[0]
+}

+ 21 - 8
back/producer.go → back/di/producer.go

@@ -1,4 +1,4 @@
-package main
+package di
 
 import (
 	"context"
@@ -11,8 +11,24 @@ import (
 	"github.com/aws/aws-sdk-go-v2/aws"
 	"github.com/aws/aws-sdk-go-v2/service/sqs"
 	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
+	"github.com/fgm/izidic"
 )
 
+func ProducerService(dic *izidic.Container) (any, error) {
+	cli := dic.MustService("sqs").(*sqs.Client)
+	w := dic.MustParam("writer").(io.Writer)
+	return func(ctx context.Context, qName string) {
+		senderHandler(ctx, w, cli, qName)
+	}, nil
+}
+
+func senderHandler(ctx context.Context, _ io.Writer, client *sqs.Client, qURL string) {
+	pinger := time.NewTicker(2 * time.Second)
+	for range pinger.C {
+		sendOne(ctx, client, qURL)
+	}
+}
+
 func sendOne(ctx context.Context, client *sqs.Client, qURL string) {
 	data := map[string]int{"a": rand.Int()}
 	data["b"] = data["a"] + 1
@@ -28,6 +44,10 @@ func sendOne(ctx context.Context, client *sqs.Client, qURL string) {
 			DataType:    aws.String("Number"),
 			StringValue: aws.String("42"),
 		},
+		"retry": {
+			DataType:    aws.String("String"),
+			StringValue: aws.String("1"),
+		},
 	}
 
 	smr := sqs.SendMessageInput{
@@ -49,10 +69,3 @@ func sendOne(ctx context.Context, client *sqs.Client, qURL string) {
 	}
 
 }
-
-func sender(ctx context.Context, _ io.Writer, client *sqs.Client, qURL string) {
-	pinger := time.NewTicker(2 * time.Second)
-	for range pinger.C {
-		sendOne(ctx, client, qURL)
-	}
-}