redrive.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package web
  2. import (
  3. "fmt"
  4. "log"
  5. "net/http"
  6. "github.com/gin-contrib/sessions"
  7. "github.com/gin-gonic/gin"
  8. "code.osinet.fr/fgm/sqs_demo/back/services/redriver"
  9. )
  10. func makeRedriveHandler(rd redriver.Redriver, ms redriver.MessageStore) gin.HandlerFunc {
  11. return func(c *gin.Context) {
  12. req := c.Request
  13. ctx := req.Context()
  14. qName := c.Param("name")
  15. redirect := "/queue/" + qName
  16. sess := sessions.Default(c)
  17. // Do not consume sess.Flashes(): this is a redirect-only handler, and they would be lost.
  18. defer func() {
  19. _ = sess.Save()
  20. c.Redirect(http.StatusSeeOther, redirect)
  21. }()
  22. var (
  23. messages []redriver.Message
  24. done bool
  25. )
  26. if messages, done = messagesFromRequest(req, qName, sess, ms); done {
  27. return
  28. }
  29. var flash string
  30. if err := rd.RedriveItems(ctx, qName, messages); err != nil {
  31. flash = fmt.Sprintf("failed redriving selected messages on queue %q: %v",
  32. qName, err)
  33. } else {
  34. flash = fmt.Sprintf("%d messages redriven to queue %q."+
  35. `They now carry new MessageIDs and their past MessageID will be visible in the "previous-message-id" attribute`,
  36. len(messages), qName)
  37. }
  38. log.Print(flash)
  39. sess.AddFlash(flash)
  40. }
  41. }
  42. func messagesFromRequest(req *http.Request, qName string, sess sessions.Session, ms redriver.MessageStore) ([]redriver.Message, bool) {
  43. if err := req.ParseForm(); err != nil {
  44. log.Printf("Failed to parse deletion confirm form for queue %s: %v",
  45. qName, err)
  46. sess.AddFlash(fmt.Sprintf("Failed to parsed deletion confirmm form for queue %s",
  47. qName))
  48. return nil, true
  49. }
  50. ids := parseIDs(req.Form, validateUint)
  51. var messages []redriver.Message
  52. if len(ids) == 0 {
  53. flash := fmt.Sprintf("Got no message to delete from queue %q", qName)
  54. log.Print(flash)
  55. sess.AddFlash(flash)
  56. return nil, true
  57. }
  58. messages = parseMessages(req.Form, ids)
  59. for i, message := range messages {
  60. m, ok := ms.Get(message.ReceiptHandle)
  61. if !ok {
  62. sess.AddFlash(fmt.Sprintf("Failed retrieving message with ID %s and receipt handle %s from queue %q. Aborting redrive.",
  63. message.MessageId, message.ReceiptHandle, qName))
  64. return nil, true
  65. }
  66. messages[i] = m
  67. }
  68. return messages, false
  69. }