| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 | package redriverimport (	"context"	"encoding/json"	"fmt"	"io"	"strconv"	"github.com/aws/aws-sdk-go-v2/aws"	"github.com/aws/aws-sdk-go-v2/service/sqs"	"github.com/aws/aws-sdk-go-v2/service/sqs/types"	"github.com/fgm/izidic"	"code.osinet.fr/fgm/sqs_demo/back/services")const (	// MessageSystemAttributeNameDeadLetterQueueSourceArn is an undocumented	// types.Message attribute, used by the SQS console to support redrive.	MessageSystemAttributeNameDeadLetterQueueSourceArn types.MessageSystemAttributeName = "DeadLetterQueueSourceArn")type ItemsKeys struct {	MessageID     string	ReceiptHandle string}// Redriver is a redrive-oriented facade in front of the sqs.Client API.type Redriver interface {	ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error)	GetDLQ(ctx context.Context, qName string) (QueueUrls string, err error)	GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error)	GetQueueItems(ctx context.Context, qName string) ([]Message, error)	DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error	Purge(ctx context.Context, qName string) error	RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error}type redriver struct {	Wait int32	io.Writer	*sqs.Client}func (r *redriver) ListQueues(ctx context.Context, prefix string) (QueueUrls []string, err error) {	lqi := &sqs.ListQueuesInput{		MaxResults:      aws.Int32(1000),		NextToken:       nil,		QueueNamePrefix: aws.String(prefix),	}	lqo, err := r.Client.ListQueues(ctx, lqi)	if err != nil {		return nil, fmt.Errorf("listing queues: %w", err)	}	return lqo.QueueUrls, nil}func (*redriver) parseQueueInfoRedriveAllowPolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrap *QueueInfoAttributesRedriveAllowPolicy, err error) {	srap, ok := qao.Attributes[string(types.QueueAttributeNameRedriveAllowPolicy)]	if !ok || srap == "" {		return nil, nil	}	qrap = &QueueInfoAttributesRedriveAllowPolicy{}	if err = json.Unmarshal([]byte(srap), qrap); err != nil {		return nil, fmt.Errorf(			"failed parsing redrive policy for queue %q %w", qName, err)	}	return qrap, nil}func (*redriver) parseQueueInfoRedrivePolicy(qName string, qao sqs.GetQueueAttributesOutput) (qrp *QueueInfoAttributesRedrivePolicy, err error) {	srp, ok := qao.Attributes[string(types.QueueAttributeNameRedrivePolicy)]	if !ok || srp == "" {		return nil, nil	}	qrp = &QueueInfoAttributesRedrivePolicy{}	if err = json.Unmarshal([]byte(srp), qrp); err != nil {		return qrp, fmt.Errorf(			"failed parsing redrive policy for queue %q %w", qName, err)	}	if _, err = URLFromARNString(qrp.DeadLetterTargetARN); err != nil {		return qrp, fmt.Errorf(			"failed converting queue %q ARN to URL: %w", qName, err)	}	return qrp, nil}func (r *redriver) GetDLQ(ctx context.Context, qName string) (QueueUrl string, err error) {	qui := &sqs.GetQueueUrlInput{QueueName: &qName}	qu, err := r.GetQueueUrl(ctx, qui)	if err != nil {		return "", fmt.Errorf("failed getting URL for queue %q: %w", qName, err)	}	qai := &sqs.GetQueueAttributesInput{		QueueUrl:       qu.QueueUrl,		AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameRedrivePolicy},	}	qao, err := r.Client.GetQueueAttributes(ctx, qai)	if err != nil {		return "", fmt.Errorf("failed getting DLQ for queue %q: %w", qName, err)	}	if qao == nil {		return "", fmt.Errorf("redrive policy for queue %q is empty", qName)	}	qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao)	if err != nil {		return "", fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)	}	if qrp == nil {		return "", nil // Queue has no DLQ: this is not an error	}	qURL, _ := URLFromARNString(qrp.DeadLetterTargetARN) // Already checked in parseQueueInfoRedrivePolicy	return qURL, nil}func (r *redriver) GetQueueInfo(ctx context.Context, qName string) (*QueueInfo, error) {	qui := &sqs.GetQueueUrlInput{QueueName: &qName}	qu, err := r.GetQueueUrl(ctx, qui)	if err != nil {		return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)	}	if qu.QueueUrl == nil {		return nil, fmt.Errorf("URL for queue %q is empty", qName)	}	qai := &sqs.GetQueueAttributesInput{		QueueUrl:       qu.QueueUrl,		AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},	}	qao, err := r.Client.GetQueueAttributes(ctx, qai)	if err != nil {		return nil, fmt.Errorf("failed getting all attributes for queue %q: %w", qName, err)	}	if qao == nil {		return nil, fmt.Errorf("no attributes returned for queue %q", qName)	}	qi, err := r.parseQueueInfoAttributes(qName, qu, qao)	if err != nil {		return nil, err	}	return qi, err}func (r *redriver) parseQueueInfoAttributes(qName string, qu *sqs.GetQueueUrlOutput, qao *sqs.GetQueueAttributesOutput) (*QueueInfo, error) {	var (		qi                               *QueueInfo		errCount                         int		anom, anomd, anomnv              int64		created, changed                 int64 // timestamps		delay, max, retention, vto, wait int64	)	u := *qu.QueueUrl	a, err := ARNFromURL(u)	if err != nil {		return nil, fmt.Errorf("ARN for queue %q cannot be parsed from URL %q: %w", qName, u, err)	}	for _, field := range []struct {		name  types.QueueAttributeName		value *int64		sDef  string		def   int64	}{		{types.QueueAttributeNameApproximateNumberOfMessages, &anom, "-1", -1},		{types.QueueAttributeNameApproximateNumberOfMessagesDelayed, &anomd, "-1", -1},		{types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, &anomnv, "-1", -1},		{types.QueueAttributeNameCreatedTimestamp, &created, "0", 0},		{types.QueueAttributeNameDelaySeconds, &delay, "-1", -1},		{types.QueueAttributeNameLastModifiedTimestamp, &changed, "0", 0},		{types.QueueAttributeNameMaximumMessageSize, &max, "262144", 1 << (8 + 10)},          // 256ko		{types.QueueAttributeNameMessageRetentionPeriod, &max, "1209600", 14 * 24 * 60 * 60}, // 2 weeks		{types.QueueAttributeNameReceiveMessageWaitTimeSeconds, &wait, "0", 0},               // short polling		{types.QueueAttributeNameVisibilityTimeout, &vto, "0", 0},                            // short polling	} {		s, ok := qao.Attributes[string(field.name)]		if !ok {			errCount++			s = field.sDef		}		n, err := strconv.Atoi(s)		if err != nil {			errCount++		}		*field.value = int64(n)	}	qrp, err := r.parseQueueInfoRedrivePolicy(qName, *qao)	if err != nil {		return nil, fmt.Errorf("failed parsing redrive policy for queue %q: %w", qName, err)	}	qrap, err := r.parseQueueInfoRedriveAllowPolicy(qName, *qao)	if err != nil {		return nil, fmt.Errorf("failed parsing redrive allow policy for queue %q: %w", qName, err)	}	qi = &QueueInfo{		Name: qName,		URL:  u,		Attributes: &QueueInfoAttributes{			ApproximateNumberOfMessages:           anom,			ApproximateNumberOfMessagesDelayed:    anomd,			ApproximateNumberOfMessagesNotVisible: anomnv,			CreatedTimestamp:                      created,			DelaySeconds:                          delay,			LastModifiedTimestamp:                 changed,			MaximumMessageSize:                    max,			MessageRetentionPeriod:                retention,			QueueARN:                              a.String(),			ReceiveMessageWaitTimeSeconds:         wait,			VisibilityTimeout:                     vto,			RedrivePolicy:                         qrp,			RedriveAllowPolicy:                    qrap,		},	}	return qi, nil}func (*redriver) parseReceivedAttributes(msg types.Message) (*SystemMessageAttributes, error) {	var (		afrt, arc, sent int64		errCount        int		err             error	)	for _, field := range []struct {		name  types.MessageSystemAttributeName		value *int64		sDef  string		def   int64	}{		{types.MessageSystemAttributeNameApproximateFirstReceiveTimestamp, &afrt, "0", 0},		{types.MessageSystemAttributeNameApproximateReceiveCount, &arc, "0", 0},		{types.MessageSystemAttributeNameSentTimestamp, &sent, "0", 0},	} {		s, ok := msg.Attributes[string(field.name)]		if !ok {			errCount++			s = field.sDef		}		n, err := strconv.Atoi(s)		if err != nil {			errCount++		}		*field.value = int64(n)	}	ma := SystemMessageAttributes{		ApproximateFirstReceiveTimestamp: afrt,		ApproximateReceiveCount:          arc,		DeadLetterQueueSourceARN:         msg.Attributes[string(MessageSystemAttributeNameDeadLetterQueueSourceArn)],		SenderId:                         msg.Attributes[string(types.MessageSystemAttributeNameSenderId)],		SentTimestamp:                    sent,	}	if errCount > 0 {		err = fmt.Errorf("encounted %d errors parsing system message attributes", errCount)		return nil, err	}	return &ma, nil}func (r *redriver) GetQueueItems(ctx context.Context, qName string) ([]Message, error) {	qui := &sqs.GetQueueUrlInput{QueueName: &qName}	qu, err := r.GetQueueUrl(ctx, qui)	if err != nil {		return nil, fmt.Errorf("failed getting URL for queue %q: %w", qName, err)	}	if qu.QueueUrl == nil {		return nil, fmt.Errorf("URL for queue %q is empty", qName)	}	rmi := sqs.ReceiveMessageInput{		QueueUrl:              qu.QueueUrl,		AttributeNames:        []types.QueueAttributeName{types.QueueAttributeNameAll},		MaxNumberOfMessages:   10,		MessageAttributeNames: []string{".*"},		VisibilityTimeout:     0,		WaitTimeSeconds:       r.Wait,	}	rmo, err := r.Client.ReceiveMessage(ctx, &rmi)	ms := make([]Message, 0, len(rmo.Messages))	for _, m := range rmo.Messages {		ma, err := r.parseReceivedAttributes(m)		if err != nil {			return nil, err		}		for _, field := range []struct {			name  string			value any		}{			{"body", m.Body},			{"md5 of body", m.MD5OfBody},			{"md5 of message attributes", m.MD5OfMessageAttributes},			{"message ID", m.MessageId},			{"receipt handle", m.ReceiptHandle},		} {			if field.value == nil {				return nil, fmt.Errorf("missing field %s on message", field.name)			}		}		j, err := JSONableFromMessageAttributeValues(m.MessageAttributes)		if err != nil {			return nil, fmt.Errorf("failed decoding message boby: %w", err)		}		m := Message{			Attributes:             ma,			Body:                   *m.Body,			Md5OfBody:              *m.MD5OfBody,			Md5OfMessageAttributes: *m.MD5OfMessageAttributes,			MessageAttributes:      j,			MessageId:              *m.MessageId,			ReceiptHandle:          *m.ReceiptHandle,		}		ms = append(ms, m)	}	return ms, err}func (r *redriver) DeleteItems(ctx context.Context, qName string, itemsIDs []ItemsKeys) error {	// TODO implement me	panic("implement me")}func (r *redriver) Purge(ctx context.Context, qName string) error {	// TODO implement me	panic("implement me")}func (r *redriver) RedriveItems(ctx context.Context, qName string, itemIDs []ItemsKeys) error {	// TODO implement me	panic("implement me")}func RedriverService(dic *izidic.Container) (any, error) {	cli := dic.MustService(services.SvcClient).(*sqs.Client)	w := dic.MustParam(services.PWriter).(io.Writer)	wait := int32(dic.MustParam(services.PWait).(int))	return &redriver{		Wait: wait,		Writer: w,		Client: cli,	}, nil}
 |