Skip to content

Commit

Permalink
docs: write godocs and rename fields of stats structure
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Dec 11, 2023
1 parent 203c19d commit 2a7069d
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 47 deletions.
88 changes: 63 additions & 25 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
)

const (
defaultQueryLimit = 250
defaultQueryLimit = 250
maxFirstMessagesInQueue = 100
)

// Client is an interface for interacting with a DynamoDB-based message queue system.
Expand Down Expand Up @@ -471,11 +472,17 @@ func (c *ClientImpl[T]) DeleteMessage(ctx context.Context, params *DeleteMessage
return out, nil
}

// MoveMessageToDLQInput represents the input parameters for moving a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
type MoveMessageToDLQInput struct {
// ID is the unique identifier of the message to be moved to the DLQ.
ID string
}

// MoveMessageToDLQOutput represents the result of the operation to move a message to the DLQ.
// This struct uses the generic type T and contains information about the message that has been moved.
type MoveMessageToDLQOutput[T any] struct {
// MovedMessage is a pointer to the Message type containing information about the moved message.
// The type T determines the format of the message content.
MovedMessage *Message[T]
}

Expand Down Expand Up @@ -525,11 +532,17 @@ func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessag
}, nil
}

// RedriveMessageInput represents the input parameters for restoring a specific message from a DynamoDB-based Dead Letter Queue (DLQ) back to the STANDARD queue.
type RedriveMessageInput struct {
// ID is the unique identifier of the message to be redriven from the DLQ.
ID string
}

// RedriveMessageOutput represents the result of the operation to redrive a message from the DLQ.
// This struct uses the generic type T and contains information about the message that has been restored.
type RedriveMessageOutput[T any] struct {
// RedroveMessage is a pointer to the Message type containing information about the redriven message.
// The type T determines the format of the message content.
RedroveMessage *Message[T]
}

Expand Down Expand Up @@ -590,13 +603,18 @@ func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessa
// This struct does not contain any fields as it's used to request general queue statistics without the need for specific parameters.
type GetQueueStatsInput struct{}

// GetQueueStatsOutput represents the output containing statistical information about the queue.
// GetQueueStatsOutput represents the output containing statistical information about a DynamoDB-based queue.
type GetQueueStatsOutput struct {
First100IDsInQueue []string `json:"first_100_IDs_in_queue"`
First100SelectedIDsInQueue []string `json:"first_100_selected_IDs_in_queue"`
TotalRecordsInQueue int `json:"total_records_in_queue"`
TotalRecordsInProcessing int `json:"total_records_in_queue_selected_for_processing"`
TotalRecordsNotStarted int `json:"total_records_in_queue_pending_for_processing"`
// First100IDsInQueue is an array of the first 100 message IDs currently in the queue.
First100IDsInQueue []string `json:"first_100_IDs_in_queue"`
// First100IDsInQueueProcessing is an array of the first 100 message IDs that are currently being processed.
First100IDsInQueueProcessing []string `json:"first_100_IDs_in_queue_processing"`
// TotalMessagesInQueue is the total number of messages present in the queue.
TotalMessagesInQueue int `json:"total_messages_in_queue"`
// TotalMessagesInQueueProcessing is the total number of messages that are currently in the process of being handled.
TotalMessagesInQueueProcessing int `json:"total_messages_in_queue_processing"`
// TotalMessagesInQueueReady is the total number of messages in the queue that are ready to be processed and have not started processing yet.
TotalMessagesInQueueReady int `json:"total_messages_in_queue_ready"`
}

// GetQueueStats get statistical information about a DynamoDB-based queue.
Expand All @@ -621,11 +639,11 @@ func (c *ClientImpl[T]) GetQueueStats(ctx context.Context, _ *GetQueueStatsInput
func (c *ClientImpl[T]) queryAndCalculateQueueStats(ctx context.Context, expr expression.Expression) (*GetQueueStatsOutput, error) {
var (
stats = &GetQueueStatsOutput{
First100IDsInQueue: make([]string, 0),
First100SelectedIDsInQueue: make([]string, 0),
TotalRecordsInQueue: 0,
TotalRecordsInProcessing: 0,
TotalRecordsNotStarted: 0,
First100IDsInQueue: make([]string, 0),
First100IDsInQueueProcessing: make([]string, 0),
TotalMessagesInQueue: 0,
TotalMessagesInQueueProcessing: 0,
TotalMessagesInQueueReady: 0,
}
exclusiveStartKey map[string]types.AttributeValue
)
Expand Down Expand Up @@ -655,13 +673,13 @@ func (c *ClientImpl[T]) queryAndCalculateQueueStats(ctx context.Context, expr ex
break
}
}
stats.TotalRecordsNotStarted = stats.TotalRecordsInQueue - stats.TotalRecordsInProcessing
stats.TotalMessagesInQueueReady = stats.TotalMessagesInQueue - stats.TotalMessagesInQueueProcessing
return stats, nil
}

func (c *ClientImpl[T]) processQueryItemsForQueueStats(items []map[string]types.AttributeValue, stats *GetQueueStatsOutput) error {
for _, itemMap := range items {
stats.TotalRecordsInQueue++
stats.TotalMessagesInQueue++
item := Message[T]{}
err := c.unmarshalMap(itemMap, &item)
if err != nil {
Expand All @@ -673,26 +691,28 @@ func (c *ClientImpl[T]) processQueryItemsForQueueStats(items []map[string]types.
return nil
}

const maxFirst100ItemsInQueue = 100

func (c *ClientImpl[T]) updateQueueStatsFromItem(message *Message[T], stats *GetQueueStatsOutput) {
if message.GetStatus(c.clock.Now()) == StatusProcessing {
stats.TotalRecordsInProcessing++
if len(stats.First100SelectedIDsInQueue) < maxFirst100ItemsInQueue {
stats.First100SelectedIDsInQueue = append(stats.First100SelectedIDsInQueue, message.ID)
stats.TotalMessagesInQueueProcessing++
if len(stats.First100IDsInQueueProcessing) < maxFirstMessagesInQueue {
stats.First100IDsInQueueProcessing = append(stats.First100IDsInQueueProcessing, message.ID)
}
}
if len(stats.First100IDsInQueue) < maxFirst100ItemsInQueue {
if len(stats.First100IDsInQueue) < maxFirstMessagesInQueue {
stats.First100IDsInQueue = append(stats.First100IDsInQueue, message.ID)
}
}

// GetDLQStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based Dead Letter Queue (DLQ).
// This struct does not contain any fields as it's used to request general DLQ statistics without the need for specific parameters.
type GetDLQStatsInput struct{}

// GetDLQStatsOutput represents the structure to store DLQ depth statistics.
// GetDLQStatsOutput represents the output containing statistical information about the Dead Letter Queue (DLQ).
type GetDLQStatsOutput struct {
// First100IDsInQueue is an array of the first 100 message IDs currently in the DLQ.
First100IDsInQueue []string `json:"first_100_IDs_in_queue"`
TotalRecordsInDLQ int `json:"total_records_in_DLQ"`
// TotalMessagesInDLQ is the total number of messages present in the DLQ.
TotalMessagesInDLQ int `json:"total_messages_in_DLQ"`
}

// GetDLQStats get statistical information about a DynamoDB-based Dead Letter Queue (DLQ).
Expand All @@ -718,7 +738,7 @@ func (c *ClientImpl[T]) queryAndCalculateDLQStats(ctx context.Context, expr expr
var (
stats = &GetDLQStatsOutput{
First100IDsInQueue: make([]string, 0),
TotalRecordsInDLQ: 0,
TotalMessagesInDLQ: 0,
}
lastEvaluatedKey map[string]types.AttributeValue
)
Expand Down Expand Up @@ -752,8 +772,8 @@ func (c *ClientImpl[T]) queryAndCalculateDLQStats(ctx context.Context, expr expr

func (c *ClientImpl[T]) processQueryItemsForDLQStats(items []map[string]types.AttributeValue, stats *GetDLQStatsOutput) error {
for _, itemMap := range items {
stats.TotalRecordsInDLQ++
if len(stats.First100IDsInQueue) < maxFirst100ItemsInQueue {
stats.TotalMessagesInDLQ++
if len(stats.First100IDsInQueue) < maxFirstMessagesInQueue {
item := Message[T]{}
err := c.unmarshalMap(itemMap, &item)
if err != nil {
Expand All @@ -765,11 +785,17 @@ func (c *ClientImpl[T]) processQueryItemsForDLQStats(items []map[string]types.At
return nil
}

// GetMessageInput represents the input parameters for retrieving a specific message from a DynamoDB-based queue.
type GetMessageInput struct {
// ID is the unique identifier of the message to be retrieved from the queue.
ID string
}

// GetMessageOutput represents the result of the operation to retrieve a message.
// This struct uses the generic type T and contains information about the retrieved message.
type GetMessageOutput[T any] struct {
// Message is a pointer to the Message type containing information about the retrieved message.
// The type T determines the format of the message content.
Message *Message[T]
}

Expand Down Expand Up @@ -805,11 +831,17 @@ func (c *ClientImpl[T]) GetMessage(ctx context.Context, params *GetMessageInput)
}, nil
}

// ListMessagesInput represents the input parameters for listing messages from a DynamoDB-based queue.
type ListMessagesInput struct {
// Size is the number of messages to be listed from the queue. It determines the maximum size of the returned message list.
Size int32
}

// ListMessagesOutput represents the result of the operation to list messages from the queue.
// This struct uses the generic type T and contains an array of messages.
type ListMessagesOutput[T any] struct {
// Messages is an array of pointers to Message types, containing information about each listed message.
// The type T determines the format of the message content for each message in the array.
Messages []*Message[T]
}

Expand Down Expand Up @@ -841,10 +873,16 @@ func (c *ClientImpl[T]) ListMessages(ctx context.Context, params *ListMessagesIn
return &ListMessagesOutput[T]{Messages: messages}, nil
}

// ReplaceMessageInput represents the input parameters for replacing a specific message in a DynamoDB-based queue.
// This struct uses the generic type T for the message content.
type ReplaceMessageInput[T any] struct {
// Message is pointer to the Message type containing the new message data that will replace the existing message in the queue.
// The type T determines the format of the new message content.
Message *Message[T]
}

// ReplaceMessageOutput represents the result of the operation to replace a message in the queue.
// This struct is empty as the replace message operation does not return any specific information.
type ReplaceMessageOutput struct {
}

Expand Down
44 changes: 22 additions & 22 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,33 +559,33 @@ func TestDynamoMQClientGetQueueStats(t *testing.T) {
name: "should return empty items stats when no item in standard queue",
setup: NewSetupFunc(),
want: &dynamomq.GetQueueStatsOutput{
First100IDsInQueue: []string{},
First100SelectedIDsInQueue: []string{},
TotalRecordsInQueue: 0,
TotalRecordsInProcessing: 0,
TotalRecordsNotStarted: 0,
First100IDsInQueue: []string{},
First100IDsInQueueProcessing: []string{},
TotalMessagesInQueue: 0,
TotalMessagesInQueueProcessing: 0,
TotalMessagesInQueueReady: 0,
},
},
{
name: "should return one item stats when one item in standard queue",
setup: NewSetupFunc(newPutRequestWithReadyItem("A-101", clock.Now())),
want: &dynamomq.GetQueueStatsOutput{
First100IDsInQueue: []string{"A-101"},
First100SelectedIDsInQueue: []string{},
TotalRecordsInQueue: 1,
TotalRecordsInProcessing: 0,
TotalRecordsNotStarted: 1,
First100IDsInQueue: []string{"A-101"},
First100IDsInQueueProcessing: []string{},
TotalMessagesInQueue: 1,
TotalMessagesInQueueProcessing: 0,
TotalMessagesInQueueReady: 1,
},
},
{
name: "should return one processing item stats when one item in standard queue",
setup: NewSetupFunc(newPutRequestWithProcessingItem("A-101", clock.Now())),
want: &dynamomq.GetQueueStatsOutput{
First100IDsInQueue: []string{"A-101"},
First100SelectedIDsInQueue: []string{"A-101"},
TotalRecordsInQueue: 1,
TotalRecordsInProcessing: 1,
TotalRecordsNotStarted: 0,
First100IDsInQueue: []string{"A-101"},
First100IDsInQueueProcessing: []string{"A-101"},
TotalMessagesInQueue: 1,
TotalMessagesInQueueProcessing: 1,
TotalMessagesInQueueReady: 0,
},
},
{
Expand All @@ -597,11 +597,11 @@ func TestDynamoMQClientGetQueueStats(t *testing.T) {
newPutRequestWithProcessingItem("D-404", clock.Now().Add(3*time.Second)),
),
want: &dynamomq.GetQueueStatsOutput{
First100IDsInQueue: []string{"A-101", "B-202", "C-303", "D-404"},
First100SelectedIDsInQueue: []string{"C-303", "D-404"},
TotalRecordsInQueue: 4,
TotalRecordsInProcessing: 2,
TotalRecordsNotStarted: 2,
First100IDsInQueue: []string{"A-101", "B-202", "C-303", "D-404"},
First100IDsInQueueProcessing: []string{"C-303", "D-404"},
TotalMessagesInQueue: 4,
TotalMessagesInQueueProcessing: 2,
TotalMessagesInQueueReady: 2,
},
},
}
Expand All @@ -623,7 +623,7 @@ func TestDynamoMQClientGetDLQStats(t *testing.T) {
),
want: &dynamomq.GetDLQStatsOutput{
First100IDsInQueue: []string{},
TotalRecordsInDLQ: 0,
TotalMessagesInDLQ: 0,
},
},
{
Expand All @@ -638,7 +638,7 @@ func TestDynamoMQClientGetDLQStats(t *testing.T) {
),
want: &dynamomq.GetDLQStatsOutput{
First100IDsInQueue: []string{"D-404", "E-505", "F-606"},
TotalRecordsInDLQ: 3,
TotalMessagesInDLQ: 3,
},
},
}
Expand Down

0 comments on commit 2a7069d

Please sign in to comment.