package redriver import ( "errors" "log" "sync" "time" "github.com/fgm/izidic" "code.osinet.fr/fgm/sqs_demo/back/services" ) // Ensure messageStore implements interface MessageStore. var _ MessageStore = &messageStore{} // MessageStore is a Message local cache with TTL. type MessageStore interface { Set(m Message) error Get(receiptHandle string) (Message, bool) Clear(receiptHandle string) Flush() Close() } type messageStoreEntry struct { value Message stored time.Time } type messageStore struct { ttl time.Duration sync.Mutex sync.Map done chan struct{} } func (ms *messageStore) Close() { ms.done <- struct{}{} } func (ms *messageStore) Set(m Message) error { if m.ReceiptHandle == "" { return errors.New("cannot store a message with a nil ReceiptHandle") } mse := messageStoreEntry{ value: m, stored: time.Now(), } ms.Store(m.ReceiptHandle, mse) return nil } func (ms *messageStore) Get(receiptHandle string) (Message, bool) { v, ok := ms.Load(receiptHandle) if !ok { return Message{}, false } m, ok := v.(messageStoreEntry) if !ok { return Message{}, false } return m.value, true } func (ms *messageStore) Clear(receiptHandle string) { ms.Map.Delete(receiptHandle) } func (ms *messageStore) Flush() { ms.Lock() ms.Map = sync.Map{} ms.Unlock() } func (ms *messageStore) expire() { start := time.Now() ms.Range(func(k, v any) bool { log.Printf("expiring") rh, ok := k.(string) if !ok { log.Printf("map contains a non-string key: %T", k) } mse, ok := v.(messageStoreEntry) if !ok { log.Printf("map contains a non-Messsage entry: %T", v) return false } if start.Sub(mse.stored) > ms.ttl { log.Printf("clearing message %s", rh) ms.Clear(rh) } return true }) } func (ms *messageStore) scheduleExpiries() { t := time.NewTicker(ms.ttl / 2) live := true for live { select { case <-ms.done: live = false case <-t.C: ms.expire() } } } func newMessageStore(ttl time.Duration) *messageStore { ms := messageStore{ ttl: ttl, Map: sync.Map{}, done: make(chan struct{}), } go ms.scheduleExpiries() return &ms } func MessageStoreService(dic *izidic.Container) (any, error) { ttl := dic.MustParam(services.PTTL).(time.Duration) return newMessageStore(ttl), nil }