|
@@ -1,4 +1,4 @@
|
|
|
-package main
|
|
|
+package services
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
@@ -11,41 +11,39 @@ import (
|
|
|
"strconv"
|
|
|
"time"
|
|
|
|
|
|
- "github.com/aws/aws-sdk-go-v2/aws"
|
|
|
"github.com/aws/aws-sdk-go-v2/service/sqs"
|
|
|
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
|
|
|
"github.com/davecgh/go-spew/spew"
|
|
|
+ "github.com/fgm/izidic"
|
|
|
"github.com/google/uuid"
|
|
|
- "gopkg.in/yaml.v2"
|
|
|
)
|
|
|
|
|
|
-func lister(ctx context.Context, w io.Writer, client *sqs.Client) string {
|
|
|
- lqo, err := client.ListQueues(ctx, &sqs.ListQueuesInput{
|
|
|
- MaxResults: aws.Int32(10),
|
|
|
- NextToken: nil,
|
|
|
- QueueNamePrefix: aws.String(""),
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- log.Fatalf("failed listing queues: %v", err)
|
|
|
- }
|
|
|
- y := yaml.NewEncoder(w)
|
|
|
- y.Encode(lqo.QueueUrls)
|
|
|
- return lqo.QueueUrls[0]
|
|
|
+type Event struct {
|
|
|
+ MessageID uuid.UUID `json:"MessageId"`
|
|
|
+ BodySum string `json:"MD5OfBody"`
|
|
|
+ AttrSum string `json:"MD5MofMessageAttributes"`
|
|
|
+
|
|
|
+ EventAttributes
|
|
|
+ MessageAttributes map[string]types.MessageAttributeValue
|
|
|
+ Body []byte
|
|
|
}
|
|
|
|
|
|
-func receiver(ctx context.Context, w io.Writer, client *sqs.Client, qURL string) {
|
|
|
- rmi := sqs.ReceiveMessageInput{
|
|
|
- QueueUrl: &qURL,
|
|
|
- AttributeNames: []types.QueueAttributeName{"All"},
|
|
|
- MessageAttributeNames: []string{"All"},
|
|
|
- VisibilityTimeout: 0,
|
|
|
- WaitTimeSeconds: 0,
|
|
|
+func (e Event) IsRetryable() bool {
|
|
|
+ maybeRetry, ok := e.MessageAttributes["retry"]
|
|
|
+ if !ok {
|
|
|
+ return false
|
|
|
}
|
|
|
- msg, err := client.ReceiveMessage(ctx, &rmi)
|
|
|
- if err != nil {
|
|
|
- log.Fatalf("failed receiving from queue %s: %v", err)
|
|
|
+ if maybeRetry.DataType == nil || *maybeRetry.DataType != "String" || maybeRetry.StringValue == nil {
|
|
|
+ return false
|
|
|
}
|
|
|
- spew.Fdump(w, msg.Messages)
|
|
|
+ return *maybeRetry.StringValue == "1"
|
|
|
+}
|
|
|
+
|
|
|
+type EventAttributes struct {
|
|
|
+ SenderID string `json:"SenderId"`
|
|
|
+ SentTime time.Time `json:"SentTimestamp"`
|
|
|
+ ApproximateReceiveCount int `json:"ApproximateReceiveCount"`
|
|
|
+ ApproximateFirstReceiveTimestamp time.Time `json:"ApproximateFirstReceiveTimestamp"`
|
|
|
}
|
|
|
|
|
|
type Handler func(ctx context.Context, enc *json.Encoder, msgID uuid.UUID, sent time.Time, input []byte, meta map[string]types.MessageAttributeValue) error
|
|
@@ -59,7 +57,25 @@ func (m message) String() string {
|
|
|
return *m.MessageId
|
|
|
}
|
|
|
|
|
|
-func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.Client, qURL string, hdl Handler) error {
|
|
|
+func ConsumerService(dic *izidic.Container) (any, error) {
|
|
|
+ cli := dic.MustService("sqs").(*sqs.Client)
|
|
|
+ w := dic.MustParam(PWriter).(io.Writer)
|
|
|
+ hdl := dic.MustParam(PHandler).(Handler)
|
|
|
+ enc := json.NewEncoder(w)
|
|
|
+ return func(ctx context.Context, qURL string) error {
|
|
|
+ return consumeMessage(ctx, w, enc, cli, qURL, hdl)
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+func ReceiverService(dic *izidic.Container) (any, error) {
|
|
|
+ cli := dic.MustService("sqs").(*sqs.Client)
|
|
|
+ w := dic.MustParam(PWriter).(io.Writer)
|
|
|
+ return func(ctx context.Context, qURL string) {
|
|
|
+ receiveMessage(ctx, w, cli, qURL)
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+func consumeMessage(ctx context.Context, _ io.Writer, enc *json.Encoder, client *sqs.Client, qURL string, hdl Handler) error {
|
|
|
rmi := sqs.ReceiveMessageInput{
|
|
|
QueueUrl: &qURL,
|
|
|
AttributeNames: []types.QueueAttributeName{"All"},
|
|
@@ -75,7 +91,7 @@ func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.C
|
|
|
return fmt.Errorf("failed receiving from queue: %w, aborting", err)
|
|
|
}
|
|
|
if len(recv.Messages) == 0 {
|
|
|
- fmt.Fprintf(w, "No message with %d seconds timeout\n", rmi.WaitTimeSeconds)
|
|
|
+ log.Printf("No message with %d seconds timeout\n", rmi.WaitTimeSeconds)
|
|
|
continue
|
|
|
}
|
|
|
if len(recv.Messages) != 1 {
|
|
@@ -84,41 +100,44 @@ func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.C
|
|
|
msg := message(recv.Messages[0])
|
|
|
evt, err := validateMessage(msg)
|
|
|
if err != nil {
|
|
|
- fmt.Fprintf(w, "invalid message %s: %w, dropping it anyway", msg, err)
|
|
|
+ log.Printf("invalid message %s: %v, dropping it anyway", msg, err)
|
|
|
} else {
|
|
|
if err := hdl(ctx, enc, evt.MessageID, evt.SentTime, evt.Body, evt.MessageAttributes); err != nil {
|
|
|
- fmt.Fprintf(w, "Error processing message: %s: %v, dropping it anyway\n", msg, err)
|
|
|
+ log.Printf("message %s failed processing : %v, dropping it anyway\n", msg, err)
|
|
|
} else {
|
|
|
- fmt.Fprintf(w, "Message %s processed successfully\n", msg)
|
|
|
+ log.Printf("message %s processed successfully\n", msg)
|
|
|
}
|
|
|
}
|
|
|
- dmi := sqs.DeleteMessageInput{
|
|
|
- QueueUrl: &qURL,
|
|
|
- ReceiptHandle: msg.ReceiptHandle,
|
|
|
- }
|
|
|
- _, err = client.DeleteMessage(ctx, &dmi)
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error deleting message %s after successful processing: %v\n", msg, err)
|
|
|
- continue
|
|
|
+ if evt.IsRetryable() {
|
|
|
+ log.Printf("message %s not deleted, for retry", msg)
|
|
|
+ } else {
|
|
|
+ dmi := sqs.DeleteMessageInput{
|
|
|
+ QueueUrl: &qURL,
|
|
|
+ ReceiptHandle: msg.ReceiptHandle,
|
|
|
+ }
|
|
|
+ _, err = client.DeleteMessage(ctx, &dmi)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("Error deleting message %s after successful processing: %v\n", msg, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ log.Printf("message %s deleted after processing\n", msg)
|
|
|
}
|
|
|
- fmt.Fprintf(w, "Deleted processed message %s\n", msg)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-type EventAttributes struct {
|
|
|
- SenderID string `json:"SenderId"`
|
|
|
- SentTime time.Time `json:"SentTimestamp"`
|
|
|
- ApproximateReceiveCount int `json:"ApproximateReceiveCount"`
|
|
|
- ApproximateFirstReceiveTimestamp time.Time `json:"ApproximateFirstReceiveTimestamp"`
|
|
|
-}
|
|
|
-type Event struct {
|
|
|
- MessageID uuid.UUID `json:"MessageId"`
|
|
|
- BodySum string `json:"MD5OfBody"`
|
|
|
- AttrSum string `json:"MD5MofMessageAttributes"`
|
|
|
-
|
|
|
- EventAttributes
|
|
|
- MessageAttributes map[string]types.MessageAttributeValue
|
|
|
- Body []byte
|
|
|
+func receiveMessage(ctx context.Context, w io.Writer, client *sqs.Client, qURL string) {
|
|
|
+ rmi := sqs.ReceiveMessageInput{
|
|
|
+ QueueUrl: &qURL,
|
|
|
+ AttributeNames: []types.QueueAttributeName{"All"},
|
|
|
+ MessageAttributeNames: []string{"All"},
|
|
|
+ VisibilityTimeout: 1,
|
|
|
+ WaitTimeSeconds: 5,
|
|
|
+ }
|
|
|
+ msg, err := client.ReceiveMessage(ctx, &rmi)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("failed receiving from queue %s: %v", err)
|
|
|
+ }
|
|
|
+ spew.Fdump(w, msg.Messages)
|
|
|
}
|
|
|
|
|
|
func validateMessage(msg message) (*Event, error) {
|