message_store.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package redriver
  2. import (
  3. "errors"
  4. "log"
  5. "sync"
  6. "time"
  7. "github.com/fgm/izidic"
  8. "code.osinet.fr/fgm/sqs_demo/back/services"
  9. )
  10. // Ensure messageStore implements interface MessageStore.
  11. var _ MessageStore = &messageStore{}
  12. // MessageStore is a Message local cache with TTL.
  13. type MessageStore interface {
  14. Set(m Message) error
  15. Get(receiptHandle string) (Message, bool)
  16. Clear(receiptHandle string)
  17. Flush()
  18. Close()
  19. }
  20. type messageStoreEntry struct {
  21. value Message
  22. stored time.Time
  23. }
  24. type messageStore struct {
  25. ttl time.Duration
  26. sync.Mutex
  27. sync.Map
  28. done chan struct{}
  29. }
  30. func (ms *messageStore) Close() {
  31. ms.done <- struct{}{}
  32. }
  33. func (ms *messageStore) Set(m Message) error {
  34. if m.ReceiptHandle == "" {
  35. return errors.New("cannot store a message with a nil ReceiptHandle")
  36. }
  37. mse := messageStoreEntry{
  38. value: m,
  39. stored: time.Now(),
  40. }
  41. ms.Store(m.ReceiptHandle, mse)
  42. return nil
  43. }
  44. func (ms *messageStore) Get(receiptHandle string) (Message, bool) {
  45. v, ok := ms.Load(receiptHandle)
  46. if !ok {
  47. return Message{}, false
  48. }
  49. m, ok := v.(messageStoreEntry)
  50. if !ok {
  51. return Message{}, false
  52. }
  53. return m.value, true
  54. }
  55. func (ms *messageStore) Clear(receiptHandle string) {
  56. ms.Map.Delete(receiptHandle)
  57. }
  58. func (ms *messageStore) Flush() {
  59. ms.Lock()
  60. ms.Map = sync.Map{}
  61. ms.Unlock()
  62. }
  63. func (ms *messageStore) expire() {
  64. start := time.Now()
  65. ms.Range(func(k, v any) bool {
  66. log.Printf("expiring")
  67. rh, ok := k.(string)
  68. if !ok {
  69. log.Printf("map contains a non-string key: %T", k)
  70. }
  71. mse, ok := v.(messageStoreEntry)
  72. if !ok {
  73. log.Printf("map contains a non-Messsage entry: %T", v)
  74. return false
  75. }
  76. if start.Sub(mse.stored) > ms.ttl {
  77. log.Printf("clearing message %s", rh)
  78. ms.Clear(rh)
  79. }
  80. return true
  81. })
  82. }
  83. func (ms *messageStore) scheduleExpiries() {
  84. t := time.NewTicker(ms.ttl / 2)
  85. live := true
  86. for live {
  87. select {
  88. case <-ms.done:
  89. live = false
  90. case <-t.C:
  91. ms.expire()
  92. }
  93. }
  94. }
  95. func newMessageStore(ttl time.Duration) *messageStore {
  96. ms := messageStore{
  97. ttl: ttl,
  98. Map: sync.Map{},
  99. done: make(chan struct{}),
  100. }
  101. go ms.scheduleExpiries()
  102. return &ms
  103. }
  104. func MessageStoreService(dic *izidic.Container) (any, error) {
  105. ttl := dic.MustParam(services.PTTL).(time.Duration)
  106. return newMessageStore(ttl), nil
  107. }