| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 | 
							- package redriver
 
- import (
 
- 	"errors"
 
- 	"log"
 
- 	"sync"
 
- 	"time"
 
- 	"github.com/fgm/izidic"
 
- )
 
- // Ensure messageStore implements interface MessageStore.
 
- var _ MessageStore = &messageStore{}
 
- // MessageStore is a Message local cache with TTL.
 
- type MessageStore interface {
 
- 	Set(m Message) error
 
- 	Get(receiptHandle string) (Message, bool)
 
- 	Clear(receiptHandle string)
 
- 	Flush()
 
- 	Close()
 
- }
 
- type messageStoreEntry struct {
 
- 	value  Message
 
- 	stored time.Time
 
- }
 
- type messageStore struct {
 
- 	ttl time.Duration
 
- 	sync.Mutex
 
- 	sync.Map
 
- 	done chan struct{}
 
- }
 
- func (ms *messageStore) Close() {
 
- 	ms.done <- struct{}{}
 
- }
 
- func (ms *messageStore) Set(m Message) error {
 
- 	if m.ReceiptHandle == "" {
 
- 		return errors.New("cannot store a message with a nil ReceiptHandle")
 
- 	}
 
- 	mse := messageStoreEntry{
 
- 		value:  m,
 
- 		stored: time.Now(),
 
- 	}
 
- 	ms.Store(m.ReceiptHandle, mse)
 
- 	return nil
 
- }
 
- func (ms *messageStore) Get(receiptHandle string) (Message, bool) {
 
- 	v, ok := ms.Load(receiptHandle)
 
- 	if !ok {
 
- 		return Message{}, false
 
- 	}
 
- 	m, ok := v.(messageStoreEntry)
 
- 	if !ok {
 
- 		return Message{}, false
 
- 	}
 
- 	return m.value, true
 
- }
 
- func (ms *messageStore) Clear(receiptHandle string) {
 
- 	ms.Map.Delete(receiptHandle)
 
- }
 
- func (ms *messageStore) Flush() {
 
- 	ms.Lock()
 
- 	ms.Map = sync.Map{}
 
- 	ms.Unlock()
 
- }
 
- func (ms *messageStore) expire() {
 
- 	start := time.Now()
 
- 	ms.Range(func(k, v any) bool {
 
- 		log.Printf("expiring")
 
- 		rh, ok := k.(string)
 
- 		if !ok {
 
- 			log.Printf("map contains a non-string key: %T", k)
 
- 		}
 
- 		mse, ok := v.(messageStoreEntry)
 
- 		if !ok {
 
- 			log.Printf("map contains a non-Messsage entry: %T", v)
 
- 			return false
 
- 		}
 
- 		if start.Sub(mse.stored) > ms.ttl {
 
- 			log.Printf("clearing message %s", rh)
 
- 			ms.Clear(rh)
 
- 		}
 
- 		return true
 
- 	})
 
- }
 
- func (ms *messageStore) scheduleExpiries() {
 
- 	t := time.NewTicker(ms.ttl / 2)
 
- 	live := true
 
- 	for live {
 
- 		select {
 
- 		case <-ms.done:
 
- 			live = false
 
- 		case <-t.C:
 
- 			ms.expire()
 
- 		}
 
- 	}
 
- }
 
- func newMessageStore(ttl time.Duration) *messageStore {
 
- 	ms := messageStore{
 
- 		ttl:  ttl,
 
- 		Map:  sync.Map{},
 
- 		done: make(chan struct{}),
 
- 	}
 
- 	go ms.scheduleExpiries()
 
- 	return &ms
 
- }
 
- func MessageStoreService(dic *izidic.Container) (any, error) {
 
- 	return newMessageStore(10 * time.Second), nil
 
- }
 
 
  |