diff --git a/client.go b/client.go index 6e13c90..0402e9e 100644 --- a/client.go +++ b/client.go @@ -16,7 +16,8 @@ import ( ) const ( - defaultQueryLimit = 250 + defaultQueryLimit = 250 + maxFirstMessagesInQueue = 100 ) // Client is an interface for interacting with a DynamoDB-based message queue system. @@ -471,11 +472,17 @@ func (c *ClientImpl[T]) DeleteMessage(ctx context.Context, params *DeleteMessage return out, nil } +// MoveMessageToDLQInput represents the input parameters for moving a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ). type MoveMessageToDLQInput struct { + // ID is the unique identifier of the message to be moved to the DLQ. ID string } +// MoveMessageToDLQOutput represents the result of the operation to move a message to the DLQ. +// This struct uses the generic type T and contains information about the message that has been moved. type MoveMessageToDLQOutput[T any] struct { + // MovedMessage is a pointer to the Message type containing information about the moved message. + // The type T determines the format of the message content. MovedMessage *Message[T] } @@ -525,11 +532,17 @@ func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessag }, nil } +// RedriveMessageInput represents the input parameters for restoring a specific message from a DynamoDB-based Dead Letter Queue (DLQ) back to the STANDARD queue. type RedriveMessageInput struct { + // ID is the unique identifier of the message to be redriven from the DLQ. ID string } +// RedriveMessageOutput represents the result of the operation to redrive a message from the DLQ. +// This struct uses the generic type T and contains information about the message that has been restored. type RedriveMessageOutput[T any] struct { + // RedroveMessage is a pointer to the Message type containing information about the redriven message. + // The type T determines the format of the message content. RedroveMessage *Message[T] } @@ -590,13 +603,18 @@ func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessa // This struct does not contain any fields as it's used to request general queue statistics without the need for specific parameters. type GetQueueStatsInput struct{} -// GetQueueStatsOutput represents the output containing statistical information about the queue. +// GetQueueStatsOutput represents the output containing statistical information about a DynamoDB-based queue. type GetQueueStatsOutput struct { - First100IDsInQueue []string `json:"first_100_IDs_in_queue"` - First100SelectedIDsInQueue []string `json:"first_100_selected_IDs_in_queue"` - TotalRecordsInQueue int `json:"total_records_in_queue"` - TotalRecordsInProcessing int `json:"total_records_in_queue_selected_for_processing"` - TotalRecordsNotStarted int `json:"total_records_in_queue_pending_for_processing"` + // First100IDsInQueue is an array of the first 100 message IDs currently in the queue. + First100IDsInQueue []string `json:"first_100_IDs_in_queue"` + // First100IDsInQueueProcessing is an array of the first 100 message IDs that are currently being processed. + First100IDsInQueueProcessing []string `json:"first_100_IDs_in_queue_processing"` + // TotalMessagesInQueue is the total number of messages present in the queue. + TotalMessagesInQueue int `json:"total_messages_in_queue"` + // TotalMessagesInQueueProcessing is the total number of messages that are currently in the process of being handled. + TotalMessagesInQueueProcessing int `json:"total_messages_in_queue_processing"` + // TotalMessagesInQueueReady is the total number of messages in the queue that are ready to be processed and have not started processing yet. + TotalMessagesInQueueReady int `json:"total_messages_in_queue_ready"` } // GetQueueStats get statistical information about a DynamoDB-based queue. @@ -621,11 +639,11 @@ func (c *ClientImpl[T]) GetQueueStats(ctx context.Context, _ *GetQueueStatsInput func (c *ClientImpl[T]) queryAndCalculateQueueStats(ctx context.Context, expr expression.Expression) (*GetQueueStatsOutput, error) { var ( stats = &GetQueueStatsOutput{ - First100IDsInQueue: make([]string, 0), - First100SelectedIDsInQueue: make([]string, 0), - TotalRecordsInQueue: 0, - TotalRecordsInProcessing: 0, - TotalRecordsNotStarted: 0, + First100IDsInQueue: make([]string, 0), + First100IDsInQueueProcessing: make([]string, 0), + TotalMessagesInQueue: 0, + TotalMessagesInQueueProcessing: 0, + TotalMessagesInQueueReady: 0, } exclusiveStartKey map[string]types.AttributeValue ) @@ -655,13 +673,13 @@ func (c *ClientImpl[T]) queryAndCalculateQueueStats(ctx context.Context, expr ex break } } - stats.TotalRecordsNotStarted = stats.TotalRecordsInQueue - stats.TotalRecordsInProcessing + stats.TotalMessagesInQueueReady = stats.TotalMessagesInQueue - stats.TotalMessagesInQueueProcessing return stats, nil } func (c *ClientImpl[T]) processQueryItemsForQueueStats(items []map[string]types.AttributeValue, stats *GetQueueStatsOutput) error { for _, itemMap := range items { - stats.TotalRecordsInQueue++ + stats.TotalMessagesInQueue++ item := Message[T]{} err := c.unmarshalMap(itemMap, &item) if err != nil { @@ -673,26 +691,28 @@ func (c *ClientImpl[T]) processQueryItemsForQueueStats(items []map[string]types. return nil } -const maxFirst100ItemsInQueue = 100 - func (c *ClientImpl[T]) updateQueueStatsFromItem(message *Message[T], stats *GetQueueStatsOutput) { if message.GetStatus(c.clock.Now()) == StatusProcessing { - stats.TotalRecordsInProcessing++ - if len(stats.First100SelectedIDsInQueue) < maxFirst100ItemsInQueue { - stats.First100SelectedIDsInQueue = append(stats.First100SelectedIDsInQueue, message.ID) + stats.TotalMessagesInQueueProcessing++ + if len(stats.First100IDsInQueueProcessing) < maxFirstMessagesInQueue { + stats.First100IDsInQueueProcessing = append(stats.First100IDsInQueueProcessing, message.ID) } } - if len(stats.First100IDsInQueue) < maxFirst100ItemsInQueue { + if len(stats.First100IDsInQueue) < maxFirstMessagesInQueue { stats.First100IDsInQueue = append(stats.First100IDsInQueue, message.ID) } } +// GetDLQStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based Dead Letter Queue (DLQ). +// This struct does not contain any fields as it's used to request general DLQ statistics without the need for specific parameters. type GetDLQStatsInput struct{} -// GetDLQStatsOutput represents the structure to store DLQ depth statistics. +// GetDLQStatsOutput represents the output containing statistical information about the Dead Letter Queue (DLQ). type GetDLQStatsOutput struct { + // First100IDsInQueue is an array of the first 100 message IDs currently in the DLQ. First100IDsInQueue []string `json:"first_100_IDs_in_queue"` - TotalRecordsInDLQ int `json:"total_records_in_DLQ"` + // TotalMessagesInDLQ is the total number of messages present in the DLQ. + TotalMessagesInDLQ int `json:"total_messages_in_DLQ"` } // GetDLQStats get statistical information about a DynamoDB-based Dead Letter Queue (DLQ). @@ -718,7 +738,7 @@ func (c *ClientImpl[T]) queryAndCalculateDLQStats(ctx context.Context, expr expr var ( stats = &GetDLQStatsOutput{ First100IDsInQueue: make([]string, 0), - TotalRecordsInDLQ: 0, + TotalMessagesInDLQ: 0, } lastEvaluatedKey map[string]types.AttributeValue ) @@ -752,8 +772,8 @@ func (c *ClientImpl[T]) queryAndCalculateDLQStats(ctx context.Context, expr expr func (c *ClientImpl[T]) processQueryItemsForDLQStats(items []map[string]types.AttributeValue, stats *GetDLQStatsOutput) error { for _, itemMap := range items { - stats.TotalRecordsInDLQ++ - if len(stats.First100IDsInQueue) < maxFirst100ItemsInQueue { + stats.TotalMessagesInDLQ++ + if len(stats.First100IDsInQueue) < maxFirstMessagesInQueue { item := Message[T]{} err := c.unmarshalMap(itemMap, &item) if err != nil { @@ -765,11 +785,17 @@ func (c *ClientImpl[T]) processQueryItemsForDLQStats(items []map[string]types.At return nil } +// GetMessageInput represents the input parameters for retrieving a specific message from a DynamoDB-based queue. type GetMessageInput struct { + // ID is the unique identifier of the message to be retrieved from the queue. ID string } +// GetMessageOutput represents the result of the operation to retrieve a message. +// This struct uses the generic type T and contains information about the retrieved message. type GetMessageOutput[T any] struct { + // Message is a pointer to the Message type containing information about the retrieved message. + // The type T determines the format of the message content. Message *Message[T] } @@ -805,11 +831,17 @@ func (c *ClientImpl[T]) GetMessage(ctx context.Context, params *GetMessageInput) }, nil } +// ListMessagesInput represents the input parameters for listing messages from a DynamoDB-based queue. type ListMessagesInput struct { + // Size is the number of messages to be listed from the queue. It determines the maximum size of the returned message list. Size int32 } +// ListMessagesOutput represents the result of the operation to list messages from the queue. +// This struct uses the generic type T and contains an array of messages. type ListMessagesOutput[T any] struct { + // Messages is an array of pointers to Message types, containing information about each listed message. + // The type T determines the format of the message content for each message in the array. Messages []*Message[T] } @@ -841,10 +873,16 @@ func (c *ClientImpl[T]) ListMessages(ctx context.Context, params *ListMessagesIn return &ListMessagesOutput[T]{Messages: messages}, nil } +// ReplaceMessageInput represents the input parameters for replacing a specific message in a DynamoDB-based queue. +// This struct uses the generic type T for the message content. type ReplaceMessageInput[T any] struct { + // Message is pointer to the Message type containing the new message data that will replace the existing message in the queue. + // The type T determines the format of the new message content. Message *Message[T] } +// ReplaceMessageOutput represents the result of the operation to replace a message in the queue. +// This struct is empty as the replace message operation does not return any specific information. type ReplaceMessageOutput struct { } diff --git a/client_test.go b/client_test.go index d96a927..c894234 100644 --- a/client_test.go +++ b/client_test.go @@ -559,33 +559,33 @@ func TestDynamoMQClientGetQueueStats(t *testing.T) { name: "should return empty items stats when no item in standard queue", setup: NewSetupFunc(), want: &dynamomq.GetQueueStatsOutput{ - First100IDsInQueue: []string{}, - First100SelectedIDsInQueue: []string{}, - TotalRecordsInQueue: 0, - TotalRecordsInProcessing: 0, - TotalRecordsNotStarted: 0, + First100IDsInQueue: []string{}, + First100IDsInQueueProcessing: []string{}, + TotalMessagesInQueue: 0, + TotalMessagesInQueueProcessing: 0, + TotalMessagesInQueueReady: 0, }, }, { name: "should return one item stats when one item in standard queue", setup: NewSetupFunc(newPutRequestWithReadyItem("A-101", clock.Now())), want: &dynamomq.GetQueueStatsOutput{ - First100IDsInQueue: []string{"A-101"}, - First100SelectedIDsInQueue: []string{}, - TotalRecordsInQueue: 1, - TotalRecordsInProcessing: 0, - TotalRecordsNotStarted: 1, + First100IDsInQueue: []string{"A-101"}, + First100IDsInQueueProcessing: []string{}, + TotalMessagesInQueue: 1, + TotalMessagesInQueueProcessing: 0, + TotalMessagesInQueueReady: 1, }, }, { name: "should return one processing item stats when one item in standard queue", setup: NewSetupFunc(newPutRequestWithProcessingItem("A-101", clock.Now())), want: &dynamomq.GetQueueStatsOutput{ - First100IDsInQueue: []string{"A-101"}, - First100SelectedIDsInQueue: []string{"A-101"}, - TotalRecordsInQueue: 1, - TotalRecordsInProcessing: 1, - TotalRecordsNotStarted: 0, + First100IDsInQueue: []string{"A-101"}, + First100IDsInQueueProcessing: []string{"A-101"}, + TotalMessagesInQueue: 1, + TotalMessagesInQueueProcessing: 1, + TotalMessagesInQueueReady: 0, }, }, { @@ -597,11 +597,11 @@ func TestDynamoMQClientGetQueueStats(t *testing.T) { newPutRequestWithProcessingItem("D-404", clock.Now().Add(3*time.Second)), ), want: &dynamomq.GetQueueStatsOutput{ - First100IDsInQueue: []string{"A-101", "B-202", "C-303", "D-404"}, - First100SelectedIDsInQueue: []string{"C-303", "D-404"}, - TotalRecordsInQueue: 4, - TotalRecordsInProcessing: 2, - TotalRecordsNotStarted: 2, + First100IDsInQueue: []string{"A-101", "B-202", "C-303", "D-404"}, + First100IDsInQueueProcessing: []string{"C-303", "D-404"}, + TotalMessagesInQueue: 4, + TotalMessagesInQueueProcessing: 2, + TotalMessagesInQueueReady: 2, }, }, } @@ -623,7 +623,7 @@ func TestDynamoMQClientGetDLQStats(t *testing.T) { ), want: &dynamomq.GetDLQStatsOutput{ First100IDsInQueue: []string{}, - TotalRecordsInDLQ: 0, + TotalMessagesInDLQ: 0, }, }, { @@ -638,7 +638,7 @@ func TestDynamoMQClientGetDLQStats(t *testing.T) { ), want: &dynamomq.GetDLQStatsOutput{ First100IDsInQueue: []string{"D-404", "E-505", "F-606"}, - TotalRecordsInDLQ: 3, + TotalMessagesInDLQ: 3, }, }, }