123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- package redriver
- import (
- "errors"
- "log"
- "sync"
- "time"
- "github.com/fgm/izidic"
- )
- // Ensure messageStore implements interface MessageStore.
- var _ MessageStore = &messageStore{}
- // MessageStore is a Message local cache with TTL.
- type MessageStore interface {
- Set(m Message) error
- Get(receiptHandle string) (Message, bool)
- Clear(receiptHandle string)
- Flush()
- Close()
- }
- type messageStoreEntry struct {
- value Message
- stored time.Time
- }
- type messageStore struct {
- ttl time.Duration
- sync.Mutex
- sync.Map
- done chan struct{}
- }
- func (ms *messageStore) Close() {
- ms.done <- struct{}{}
- }
- func (ms *messageStore) Set(m Message) error {
- if m.ReceiptHandle == "" {
- return errors.New("cannot store a message with a nil ReceiptHandle")
- }
- mse := messageStoreEntry{
- value: m,
- stored: time.Now(),
- }
- ms.Store(m.ReceiptHandle, mse)
- return nil
- }
- func (ms *messageStore) Get(receiptHandle string) (Message, bool) {
- v, ok := ms.Load(receiptHandle)
- if !ok {
- return Message{}, false
- }
- m, ok := v.(messageStoreEntry)
- if !ok {
- return Message{}, false
- }
- return m.value, true
- }
- func (ms *messageStore) Clear(receiptHandle string) {
- ms.Map.Delete(receiptHandle)
- }
- func (ms *messageStore) Flush() {
- ms.Lock()
- ms.Map = sync.Map{}
- ms.Unlock()
- }
- func (ms *messageStore) expire() {
- start := time.Now()
- ms.Range(func(k, v any) bool {
- log.Printf("expiring")
- rh, ok := k.(string)
- if !ok {
- log.Printf("map contains a non-string key: %T", k)
- }
- mse, ok := v.(messageStoreEntry)
- if !ok {
- log.Printf("map contains a non-Messsage entry: %T", v)
- return false
- }
- if start.Sub(mse.stored) > ms.ttl {
- log.Printf("clearing message %s", rh)
- ms.Clear(rh)
- }
- return true
- })
- }
- func (ms *messageStore) scheduleExpiries() {
- t := time.NewTicker(ms.ttl / 2)
- live := true
- for live {
- select {
- case <-ms.done:
- live = false
- case <-t.C:
- ms.expire()
- }
- }
- }
- func newMessageStore(ttl time.Duration) *messageStore {
- ms := messageStore{
- ttl: ttl,
- Map: sync.Map{},
- done: make(chan struct{}),
- }
- go ms.scheduleExpiries()
- return &ms
- }
- func MessageStoreService(dic *izidic.Container) (any, error) {
- return newMessageStore(10 * time.Second), nil
- }
|