Jelajahi Sumber

Implement a working periodic producer with metadata.

Frederic G. MARAND 1 tahun lalu
induk
melakukan
20ffeacefa
4 mengubah file dengan 81 tambahan dan 14 penghapusan
  1. 15 0
      .run/Demo.run.xml
  2. 8 8
      back/consumer.go
  3. 11 5
      back/demo.go
  4. 47 1
      back/producer.go

+ 15 - 0
.run/Demo.run.xml

@@ -0,0 +1,15 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="Demo" type="GoApplicationRunConfiguration" factoryName="Go Application">
+    <module name="sqs_demo" />
+    <working_directory value="$PROJECT_DIR$" />
+    <parameters value="-profile=sqs-tutorial -url=https://sqs.eu-west-3.amazonaws.com" />
+    <envs>
+      <env name="AWS_PROFILE" value="sqs-tutorial" />
+    </envs>
+    <kind value="PACKAGE" />
+    <package value="code.osinet.fr/fgm/sqs_demo" />
+    <directory value="$PROJECT_DIR$" />
+    <filePath value="$PROJECT_DIR$/demo.go" />
+    <method v="2" />
+  </configuration>
+</component>

+ 8 - 8
back/consumer.go

@@ -38,8 +38,8 @@ func receiver(ctx context.Context, w io.Writer, client *sqs.Client, qURL string)
 		QueueUrl:              &qURL,
 		AttributeNames:        []types.QueueAttributeName{"All"},
 		MessageAttributeNames: []string{"All"},
-		VisibilityTimeout:     0,
-		WaitTimeSeconds:       0,
+		VisibilityTimeout:     1,
+		WaitTimeSeconds:       5,
 	}
 	msg, err := client.ReceiveMessage(ctx, &rmi)
 	if err != nil {
@@ -75,7 +75,7 @@ func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.C
 			return fmt.Errorf("failed receiving from queue: %w, aborting", err)
 		}
 		if len(recv.Messages) == 0 {
-			fmt.Fprintf(w, "No message with %d seconds timeout\n", rmi.WaitTimeSeconds)
+			log.Printf("No message with %d seconds timeout\n", rmi.WaitTimeSeconds)
 			continue
 		}
 		if len(recv.Messages) != 1 {
@@ -84,12 +84,12 @@ func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.C
 		msg := message(recv.Messages[0])
 		evt, err := validateMessage(msg)
 		if err != nil {
-			fmt.Fprintf(w, "invalid message %s: %w, dropping it anyway", msg, err)
+			log.Printf("invalid message %s: %w, dropping it anyway", msg, err)
 		} else {
 			if err := hdl(ctx, enc, evt.MessageID, evt.SentTime, evt.Body, evt.MessageAttributes); err != nil {
-				fmt.Fprintf(w, "Error processing message: %s: %v, dropping it anyway\n", msg, err)
+				log.Printf("message %s failed processing : %v, dropping it anyway\n", msg, err)
 			} else {
-				fmt.Fprintf(w, "Message %s processed successfully\n", msg)
+				log.Printf("message %s processed successfully\n", msg)
 			}
 		}
 		dmi := sqs.DeleteMessageInput{
@@ -98,10 +98,10 @@ func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.C
 		}
 		_, err = client.DeleteMessage(ctx, &dmi)
 		if err != nil {
-			fmt.Fprintf(w, "Error deleting message %s after successful processing: %v\n", msg, err)
+			log.Printf("Error deleting message %s after successful processing: %v\n", msg, err)
 			continue
 		}
-		fmt.Fprintf(w, "Deleted processed message %s\n", msg)
+		log.Printf("message %s deleted after processing\n", msg)
 	}
 }
 

+ 11 - 5
back/demo.go

@@ -22,10 +22,16 @@ func main2(w io.Writer, name string, args []string) byte {
 	ctx := context.Background()
 	dic := resolve(w, name, args)
 	lister := dic.MustService("lister").(func(ctx context.Context) string)
-	receiver := dic.MustService("receiver").(func(ctx context.Context, qURL string))
-	consumer := dic.MustService("consumer").(func(ctx context.Context, qURL string) error)
 	qURL := lister(ctx)
-	receiver(ctx, qURL)
+
+	if false {
+		receiver := dic.MustService("receiver").(func(ctx context.Context, qURL string))
+		receiver(ctx, qURL)
+	}
+	producer := dic.MustService("producer").(func(ctx context.Context, qName string))
+	go producer(ctx, qURL)
+
+	consumer := dic.MustService("consumer").(func(ctx context.Context, qURL string) error)
 	err := consumer(ctx, qURL)
 	if err != nil {
 		log.Printf("error in consumer: %v, aborting", err)
@@ -46,7 +52,7 @@ func resolve(w io.Writer, name string, args []string) *izidic.Container {
 	dic.Register("sqs", sqsClientService)
 	dic.Register("lister", listerService)
 	dic.Register("receiver", receiverService)
-	dic.Register("sender", senderService)
+	dic.Register("producer", producerService)
 	dic.Register("consumer", consumerService)
 
 	dic.MustService("flags") // Store generated params before freeze.
@@ -113,7 +119,7 @@ func receiverService(dic *izidic.Container) (any, error) {
 	}, nil
 }
 
-func senderService(dic *izidic.Container) (any, error) {
+func producerService(dic *izidic.Container) (any, error) {
 	cli := dic.MustService("sqs").(*sqs.Client)
 	w := dic.MustParam("writer").(io.Writer)
 	return func(ctx context.Context, qName string) {

+ 47 - 1
back/producer.go

@@ -2,11 +2,57 @@ package main
 
 import (
 	"context"
+	"encoding/json"
 	"io"
+	"log"
+	"math/rand"
+	"time"
 
+	"github.com/aws/aws-sdk-go-v2/aws"
 	"github.com/aws/aws-sdk-go-v2/service/sqs"
+	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
 )
 
-func sender(ctx context.Context, w io.Writer, client *sqs.Client, qName string) {
+func sendOne(ctx context.Context, client *sqs.Client, qURL string) {
+	data := map[string]int{"a": rand.Int()}
+	data["b"] = data["a"] + 1
+	bs, _ := json.Marshal(data)
+	body := string(bs)
 
+	ma := map[string]types.MessageAttributeValue{
+		"x": {
+			DataType:    aws.String("String"),
+			StringValue: aws.String("a string value"),
+		},
+		"y": {
+			DataType:    aws.String("Number"),
+			StringValue: aws.String("42"),
+		},
+	}
+
+	smr := sqs.SendMessageInput{
+		MessageBody:             &body,
+		QueueUrl:                &qURL,
+		DelaySeconds:            0,
+		MessageAttributes:       ma,
+		MessageDeduplicationId:  nil,
+		MessageGroupId:          nil,
+		MessageSystemAttributes: nil,
+	}
+	smo, err := client.SendMessage(ctx, &smr)
+	if err != nil {
+		log.Printf("failed producing message: %v", err)
+	} else if smo.MessageId == nil {
+		log.Println("message produced with a nil ID")
+	} else {
+		log.Printf("message %s produced with %#v", *smo.MessageId, data)
+	}
+
+}
+
+func sender(ctx context.Context, _ io.Writer, client *sqs.Client, qURL string) {
+	pinger := time.NewTicker(2 * time.Second)
+	for range pinger.C {
+		sendOne(ctx, client, qURL)
+	}
 }