message_store.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package redriver
  2. import (
  3. "errors"
  4. "log"
  5. "sync"
  6. "time"
  7. "github.com/fgm/izidic"
  8. )
  9. // Ensure messageStore implements interface MessageStore.
  10. var _ MessageStore = &messageStore{}
  11. // MessageStore is a Message local cache with TTL.
  12. type MessageStore interface {
  13. Set(m Message) error
  14. Get(receiptHandle string) (Message, bool)
  15. Clear(receiptHandle string)
  16. Flush()
  17. Close()
  18. }
  19. type messageStoreEntry struct {
  20. value Message
  21. stored time.Time
  22. }
  23. type messageStore struct {
  24. ttl time.Duration
  25. sync.Mutex
  26. sync.Map
  27. done chan struct{}
  28. }
  29. func (ms *messageStore) Close() {
  30. ms.done <- struct{}{}
  31. }
  32. func (ms *messageStore) Set(m Message) error {
  33. if m.ReceiptHandle == "" {
  34. return errors.New("cannot store a message with a nil ReceiptHandle")
  35. }
  36. mse := messageStoreEntry{
  37. value: m,
  38. stored: time.Now(),
  39. }
  40. ms.Store(m.ReceiptHandle, mse)
  41. return nil
  42. }
  43. func (ms *messageStore) Get(receiptHandle string) (Message, bool) {
  44. v, ok := ms.Load(receiptHandle)
  45. if !ok {
  46. return Message{}, false
  47. }
  48. m, ok := v.(messageStoreEntry)
  49. if !ok {
  50. return Message{}, false
  51. }
  52. return m.value, true
  53. }
  54. func (ms *messageStore) Clear(receiptHandle string) {
  55. ms.Map.Delete(receiptHandle)
  56. }
  57. func (ms *messageStore) Flush() {
  58. ms.Lock()
  59. ms.Map = sync.Map{}
  60. ms.Unlock()
  61. }
  62. func (ms *messageStore) expire() {
  63. start := time.Now()
  64. ms.Range(func(k, v any) bool {
  65. log.Printf("expiring")
  66. rh, ok := k.(string)
  67. if !ok {
  68. log.Printf("map contains a non-string key: %T", k)
  69. }
  70. mse, ok := v.(messageStoreEntry)
  71. if !ok {
  72. log.Printf("map contains a non-Messsage entry: %T", v)
  73. return false
  74. }
  75. if start.Sub(mse.stored) > ms.ttl {
  76. log.Printf("clearing message %s", rh)
  77. ms.Clear(rh)
  78. }
  79. return true
  80. })
  81. }
  82. func (ms *messageStore) scheduleExpiries() {
  83. t := time.NewTicker(ms.ttl / 2)
  84. live := true
  85. for live {
  86. select {
  87. case <-ms.done:
  88. live = false
  89. case <-t.C:
  90. ms.expire()
  91. }
  92. }
  93. }
  94. func newMessageStore(ttl time.Duration) *messageStore {
  95. ms := messageStore{
  96. ttl: ttl,
  97. Map: sync.Map{},
  98. done: make(chan struct{}),
  99. }
  100. go ms.scheduleExpiries()
  101. return &ms
  102. }
  103. func MessageStoreService(dic *izidic.Container) (any, error) {
  104. return newMessageStore(10 * time.Second), nil
  105. }