demo.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "os"
  10. "github.com/aws/aws-sdk-go-v2/config"
  11. "github.com/aws/aws-sdk-go-v2/service/sqs"
  12. "github.com/fgm/izidic"
  13. )
  14. func main() {
  15. main2(os.Stdout, os.Args[0], os.Args[1:])
  16. }
  17. func main2(w io.Writer, name string, args []string) {
  18. ctx := context.Background()
  19. dic := resolve(w, name, args)
  20. lister := dic.MustService("lister").(func(ctx context.Context) string)
  21. receiver := dic.MustService("receiver").(func(ctx context.Context, qURL string))
  22. consumer := dic.MustService("consumer").(func(ctx context.Context, qURL string) error)
  23. qURL := lister(ctx)
  24. receiver(ctx, qURL)
  25. err := consumer(ctx, qURL)
  26. if err != nil {
  27. log.Fatalf("error in consumer: %v, aborting", err)
  28. }
  29. log.Printf("exiting cleanly")
  30. }
  31. func resolve(w io.Writer, name string, args []string) *izidic.Container {
  32. dic := izidic.New()
  33. dic.Store("name", name)
  34. dic.Store("args", args)
  35. dic.Store("writer", w)
  36. dic.Store("handler", Handler(HandleDummy))
  37. dic.Register("flags", flagsService)
  38. dic.Register("logger", loggerService)
  39. dic.Register("sqs", sqsClientService)
  40. dic.Register("lister", listerService)
  41. dic.Register("receiver", receiverService)
  42. dic.Register("sender", senderService)
  43. dic.Register("consumer", consumerService)
  44. dic.MustService("flags") // Store generated params before freeze.
  45. dic.Freeze()
  46. return dic
  47. }
  48. func flagsService(dic *izidic.Container) (any, error) {
  49. fs := flag.NewFlagSet(dic.MustParam("name").(string), flag.ContinueOnError)
  50. profile := fs.String("profile", "test-profile", "The AWS profile")
  51. region := fs.String("region", "eu-west-3", "The AWS region to connect to")
  52. qName := fs.String("queue-name", "dummy-queue", "The queue name")
  53. sqsURL := fs.String("url", "http://localhost:4566", "The SQS endpoint URL")
  54. if err := fs.Parse(dic.MustParam("args").([]string)); err != nil {
  55. return nil, fmt.Errorf("cannot obtain CLI args")
  56. }
  57. dic.Store("profile", *profile)
  58. dic.Store("region", *region)
  59. dic.Store("url", *sqsURL)
  60. dic.Store("queue-name", *qName)
  61. return fs, nil
  62. }
  63. // loggerService is an izidic.Service also containing a one-time initialization action.
  64. func loggerService(dic *izidic.Container) (any, error) {
  65. w := dic.MustParam("writer").(io.Writer)
  66. log.SetOutput(w) // Support dependency code not taking an injected logger.
  67. logger := log.New(w, "", log.LstdFlags)
  68. return logger, nil
  69. }
  70. func sqsClientService(dic *izidic.Container) (any, error) {
  71. ctx := context.Background()
  72. profile := dic.MustParam("profile").(string)
  73. region := dic.MustParam("region").(string)
  74. epr := endpointResolver{region: region}
  75. cfg, err := config.LoadDefaultConfig(ctx,
  76. config.WithRegion(region),
  77. config.WithSharedConfigProfile(profile),
  78. config.WithEndpointResolverWithOptions(epr),
  79. )
  80. if err != nil {
  81. return nil, fmt.Errorf("failed loading default AWS config: %w", err)
  82. }
  83. client := sqs.NewFromConfig(cfg)
  84. return client, nil
  85. }
  86. func listerService(dic *izidic.Container) (any, error) {
  87. cli := dic.MustService("sqs").(*sqs.Client)
  88. w := dic.MustParam("writer").(io.Writer)
  89. return func(ctx context.Context) string {
  90. return lister(ctx, w, cli)
  91. }, nil
  92. }
  93. func receiverService(dic *izidic.Container) (any, error) {
  94. cli := dic.MustService("sqs").(*sqs.Client)
  95. w := dic.MustParam("writer").(io.Writer)
  96. return func(ctx context.Context, qURL string) {
  97. receiver(ctx, w, cli, qURL)
  98. }, nil
  99. }
  100. func senderService(dic *izidic.Container) (any, error) {
  101. cli := dic.MustService("sqs").(*sqs.Client)
  102. w := dic.MustParam("writer").(io.Writer)
  103. return func(ctx context.Context, qName string) {
  104. sender(ctx, w, cli, qName)
  105. }, nil
  106. }
  107. func consumerService(dic *izidic.Container) (any, error) {
  108. cli := dic.MustService("sqs").(*sqs.Client)
  109. w := dic.MustParam("writer").(io.Writer)
  110. hdl := dic.MustParam("handler").(Handler)
  111. enc := json.NewEncoder(w)
  112. return func(ctx context.Context, qURL string) error {
  113. return consumer(ctx, w, enc, cli, qURL, hdl)
  114. }, nil
  115. }