Forráskód Böngészése

Support AWS instead of localstack, report message meta, ID, sent time.

Frederic G. MARAND 2 éve
szülő
commit
b41823194f
4 módosított fájl, 25 hozzáadás és 10 törlés
  1. 4 1
      aws.go
  2. 6 3
      consumer.go
  3. 5 3
      demo.go
  4. 10 3
      handler.go

+ 4 - 1
aws.go

@@ -10,7 +10,7 @@ type endpointResolver struct {
 	region string
 	region string
 }
 }
 
 
-func (e endpointResolver) ResolveEndpoint(service, _ string, options ...interface{}) (aws.Endpoint, error) {
+func (e endpointResolver) ResolveEndpoint(service, region string, options ...interface{}) (aws.Endpoint, error) {
 	if service != `SQS` {
 	if service != `SQS` {
 		return aws.Endpoint{}, fmt.Errorf("trying to resolve non-SQS service: %s", service)
 		return aws.Endpoint{}, fmt.Errorf("trying to resolve non-SQS service: %s", service)
 	}
 	}
@@ -23,5 +23,8 @@ func (e endpointResolver) ResolveEndpoint(service, _ string, options ...interfac
 		SigningMethod:     "",
 		SigningMethod:     "",
 		Source:            0,
 		Source:            0,
 	}
 	}
+	if region != "" {
+		ep.URL = fmt.Sprintf("https://sqs.%s.amazonaws.com/", region)
+	}
 	return ep, nil
 	return ep, nil
 }
 }

+ 6 - 3
consumer.go

@@ -48,7 +48,7 @@ func receiver(ctx context.Context, w io.Writer, client *sqs.Client, qURL string)
 	spew.Fdump(w, msg.Messages)
 	spew.Fdump(w, msg.Messages)
 }
 }
 
 
-type Handler func(ctx context.Context, enc *json.Encoder, input []byte) error
+type Handler func(ctx context.Context, enc *json.Encoder, msgID uuid.UUID, sent time.Time, input []byte, meta map[string]types.MessageAttributeValue) error
 
 
 type message types.Message
 type message types.Message
 
 
@@ -86,7 +86,7 @@ func consumer(ctx context.Context, w io.Writer, enc *json.Encoder, client *sqs.C
 		if err != nil {
 		if err != nil {
 			fmt.Fprintf(w, "invalid message %s: %w, dropping it anyway", msg, err)
 			fmt.Fprintf(w, "invalid message %s: %w, dropping it anyway", msg, err)
 		} else {
 		} else {
-			if err := hdl(ctx, enc, evt.Body); err != nil {
+			if err := hdl(ctx, enc, evt.MessageID, evt.SentTime, evt.Body, evt.MessageAttributes); err != nil {
 				fmt.Fprintf(w, "Error processing message: %s: %v, dropping it anyway\n", msg, err)
 				fmt.Fprintf(w, "Error processing message: %s: %v, dropping it anyway\n", msg, err)
 			} else {
 			} else {
 				fmt.Fprintf(w, "Message %s processed successfully\n", msg)
 				fmt.Fprintf(w, "Message %s processed successfully\n", msg)
@@ -117,7 +117,8 @@ type Event struct {
 	AttrSum   string    `json:"MD5MofMessageAttributes"`
 	AttrSum   string    `json:"MD5MofMessageAttributes"`
 
 
 	EventAttributes
 	EventAttributes
-	Body []byte
+	MessageAttributes map[string]types.MessageAttributeValue
+	Body              []byte
 }
 }
 
 
 func validateMessage(msg message) (*Event, error) {
 func validateMessage(msg message) (*Event, error) {
@@ -159,6 +160,8 @@ func validateMessage(msg message) (*Event, error) {
 	}
 	}
 	evt.EventAttributes.ApproximateFirstReceiveTimestamp = time.Unix(int64(msec)/1000, int64(msec)*1000)
 	evt.EventAttributes.ApproximateFirstReceiveTimestamp = time.Unix(int64(msec)/1000, int64(msec)*1000)
 
 
+	evt.MessageAttributes = msg.MessageAttributes
+
 	// EventBody field
 	// EventBody field
 	if msg.Body == nil {
 	if msg.Body == nil {
 		return nil, fmt.Errorf("message body is nil")
 		return nil, fmt.Errorf("message body is nil")

+ 5 - 3
demo.go

@@ -15,10 +15,10 @@ import (
 )
 )
 
 
 func main() {
 func main() {
-	main2(os.Stdout, os.Args[0], os.Args[1:])
+	os.Exit(int(main2(os.Stdout, os.Args[0], os.Args[1:])))
 }
 }
 
 
-func main2(w io.Writer, name string, args []string) {
+func main2(w io.Writer, name string, args []string) byte {
 	ctx := context.Background()
 	ctx := context.Background()
 	dic := resolve(w, name, args)
 	dic := resolve(w, name, args)
 	lister := dic.MustService("lister").(func(ctx context.Context) string)
 	lister := dic.MustService("lister").(func(ctx context.Context) string)
@@ -28,9 +28,11 @@ func main2(w io.Writer, name string, args []string) {
 	receiver(ctx, qURL)
 	receiver(ctx, qURL)
 	err := consumer(ctx, qURL)
 	err := consumer(ctx, qURL)
 	if err != nil {
 	if err != nil {
-		log.Fatalf("error in consumer: %v, aborting", err)
+		log.Printf("error in consumer: %v, aborting", err)
+		return 1
 	}
 	}
 	log.Printf("exiting cleanly")
 	log.Printf("exiting cleanly")
+	return 0
 }
 }
 
 
 func resolve(w io.Writer, name string, args []string) *izidic.Container {
 func resolve(w io.Writer, name string, args []string) *izidic.Container {

+ 10 - 3
handler.go

@@ -6,18 +6,25 @@ import (
 	"fmt"
 	"fmt"
 	"time"
 	"time"
 
 
+	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
 	"github.com/google/uuid"
 	"github.com/google/uuid"
 )
 )
 
 
 // HandleDummy is blissfully unaware of SQS, and could therefore be used as a handler for another message queue.
 // HandleDummy is blissfully unaware of SQS, and could therefore be used as a handler for another message queue.
-func HandleDummy(_ context.Context, enc *json.Encoder, msg []byte) error {
+func HandleDummy(_ context.Context, enc *json.Encoder,
+	msgID uuid.UUID, sentTime time.Time, msg []byte, meta map[string]types.MessageAttributeValue) error {
 	type EventBody struct {
 	type EventBody struct {
 		EventID   uuid.UUID      `json:"event_id"`
 		EventID   uuid.UUID      `json:"event_id"`
 		EventTime time.Time      `json:"event_time"`
 		EventTime time.Time      `json:"event_time"`
 		Data      map[string]int `json:"data,omitempty"`
 		Data      map[string]int `json:"data,omitempty"`
+		Meta      map[string]types.MessageAttributeValue
 	}
 	}
-	evt := EventBody{}
-	if err := json.Unmarshal(msg, &evt); err != nil {
+	evt := EventBody{
+		EventID:   msgID,
+		EventTime: sentTime,
+		Meta:      meta,
+	}
+	if err := json.Unmarshal(msg, &evt.Data); err != nil {
 		return fmt.Errorf("error parsing body as JSON: %w", err)
 		return fmt.Errorf("error parsing body as JSON: %w", err)
 	}
 	}