Skip to content

Commit

Permalink
refactor: change timestamp attribute names for Clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Dec 6, 2023
1 parent 41f0f68 commit 6aa0b46
Show file tree
Hide file tree
Showing 17 changed files with 199 additions and 199 deletions.
34 changes: 17 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ The `dynamomq` command-line interface provides a range of commands to interact w

- `--endpoint-url`: Override the default URL for commands with a specified endpoint URL.
- `-h`, `--help`: Display help information for `dynamomq`.
- `--queueing-index-name`: Specify the name of the queueing index to use (default is `"dynamo-mq-index-queue_type-queue_add_timestamp"`).
- `--queueing-index-name`: Specify the name of the queueing index to use (default is `"dynamo-mq-index-queue_type-sent_at"`).
- `--table-name`: Define the name of the DynamoDB table to contain the items (default is `"dynamo-mq-table"`).

To get more detailed information about a specific command, use `dynamomq [command] --help`.
Expand Down Expand Up @@ -333,33 +333,33 @@ This design ensures that DynamoMQ maintains message reliability while enabling t

The DynamoDB table for the DynamoMQ message queue system is designed to efficiently manage and track the status of messages. Here’s a breakdown of the table schema:

| Key | Attributes | Type | Example Value |
|-------|------------------------|--------|-------------------------------------|
| PK | id | string | A-101 |
| | data | any | any |
| | visibility_timeout | number | 10 |
| | receive_count | number | 1 |
| GSIPK | queue_type | string | STANDARD or DLQ |
| | version | number | 1 |
| | creation_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 |
| | last_updated_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 |
| GSISK | queue_add_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 |
| | queue_peek_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 |
| Key | Attributes | Type | Example Value |
|-------|--------------------|--------|-------------------------------------|
| PK | id | string | A-101 |
| | data | any | any |
| | visibility_timeout | number | 10 |
| | receive_count | number | 1 |
| GSIPK | queue_type | string | STANDARD or DLQ |
| | version | number | 1 |
| | created_at | string | 2006-01-02T15:04:05.999999999Z07:00 |
| | updated_at | string | 2006-01-02T15:04:05.999999999Z07:00 |
| GSISK | sent_at | string | 2006-01-02T15:04:05.999999999Z07:00 |
| | received_at | string | 2006-01-02T15:04:05.999999999Z07:00 |

**PK (Primary Key)** `ID`: A unique identifier for each message, such as 'A-101'. This is a string value that facilitates the retrieval and management of messages.

**GSIPK (Global Secondary Index - Partition Key)** `queue_type`: Used to categorize messages by `queue_type`, such as 'STANDARD' or 'DLQ' (Dead Letter Queue), allowing for quick access and operations on subsets of the queue.

**GSISK (Global Secondary Index - Sort Key)** `queue_add_timestamp`: The timestamp when the message was added to the queue. Facilitates the ordering of messages based on the time they were added to the queue, which is useful for implementing FIFO (First-In-First-Out) or other ordering mechanisms.
**GSISK (Global Secondary Index - Sort Key)** `sent_at`: The timestamp when the message was sent to the queue. Facilitates the ordering of messages based on the time they were added to the queue, which is useful for implementing FIFO (First-In-First-Out) or other ordering mechanisms.

**Attributes**: These are the various properties associated with each message:
- `data`: This attribute holds the content of the message and can be of any type.
- `isibility_timeout`: The new value for the message's visibility timeout (in seconds).
- `receive_count`: A numerical count of how many times the message has been retrieved from the queue.
- `version`: A number that can be used for optimistic locking and to ensure that the message is not being concurrently modified.
- `creation_timestamp`: The date and time when the message was created. ISO 8601 format.
- `last_updated_timestamp`: The date and time when the message was last updated. ISO 8601 format.
- `queue_peek_timestamp`: The timestamp when the message was last viewed without being altered. ISO 8601 format.
- `created_at`: The date and time when the message was created. ISO 8601 format.
- `updated_at`: The date and time when the message was last updated. ISO 8601 format.
- `received_at`: The timestamp when the message was last viewed without being altered. ISO 8601 format.

### Data Transition

Expand Down
108 changes: 54 additions & 54 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

const (
DefaultTableName = "dynamo-mq-table"
DefaultQueueingIndexName = "dynamo-mq-index-queue_type-queue_add_timestamp"
DefaultQueueingIndexName = "dynamo-mq-index-queue_type-sent_at"
DefaultRetryMaxAttempts = 10
DefaultVisibilityTimeoutInSeconds = 30
DefaultMaxListMessages = 10
Expand Down Expand Up @@ -170,18 +170,18 @@ func (c *client[T]) SendMessage(ctx context.Context, params *SendMessageInput[T]
now := c.clock.Now()
message := NewMessage(params.ID, params.Data, now)
if params.DelaySeconds > 0 {
message.delayToAddQueueTimestamp(time.Duration(params.DelaySeconds) * time.Second)
message.delayToSentAt(time.Duration(params.DelaySeconds) * time.Second)
}
err = c.put(ctx, message)
if err != nil {
return &SendMessageOutput[T]{}, err
}
return &SendMessageOutput[T]{
Result: &Result{
ID: message.ID,
Status: message.GetStatus(now),
LastUpdatedTimestamp: message.LastUpdatedTimestamp,
Version: message.Version,
ID: message.ID,
Status: message.GetStatus(now),
UpdatedAt: message.UpdatedAt,
Version: message.Version,
},
Message: message,
}, nil
Expand All @@ -194,9 +194,9 @@ type ReceiveMessageInput struct {

// ReceiveMessageOutput represents the result for the ReceiveMessage() API call.
type ReceiveMessageOutput[T any] struct {
*Result // Embedded type for inheritance-like behavior in Go
PeekFromQueueTimestamp string `json:"queue_peek_timestamp"`
PeekedMessageObject *Message[T] `json:"-"`
*Result // Embedded type for inheritance-like behavior in Go
ReceivedAt string `json:"received_at"`
ReceivedMessage *Message[T] `json:"-"`
}

func (c *client[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessageInput) (*ReceiveMessageOutput[T], error) {
Expand All @@ -219,13 +219,13 @@ func (c *client[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessageIn

return &ReceiveMessageOutput[T]{
Result: &Result{
ID: updated.ID,
Status: updated.GetStatus(c.clock.Now()),
LastUpdatedTimestamp: updated.LastUpdatedTimestamp,
Version: updated.Version,
ID: updated.ID,
Status: updated.GetStatus(c.clock.Now()),
UpdatedAt: updated.UpdatedAt,
Version: updated.Version,
},
PeekFromQueueTimestamp: updated.PeekFromQueueTimestamp,
PeekedMessageObject: updated,
ReceivedAt: updated.ReceivedAt,
ReceivedMessage: updated,
}, nil
}

Expand Down Expand Up @@ -304,8 +304,8 @@ func (c *client[T]) processSelectedMessage(ctx context.Context, message *Message
Add(expression.Name("version"), expression.Value(1)).
Add(expression.Name("receive_count"), expression.Value(1)).
Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout)).
Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)).
Set(expression.Name("queue_peek_timestamp"), expression.Value(message.PeekFromQueueTimestamp))).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("received_at"), expression.Value(message.ReceivedAt))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
Expand Down Expand Up @@ -347,7 +347,7 @@ func (c *client[T]) ChangeMessageVisibility(ctx context.Context, params *ChangeM
builder := expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
Expand All @@ -360,10 +360,10 @@ func (c *client[T]) ChangeMessageVisibility(ctx context.Context, params *ChangeM
}
return &ChangeMessageVisibilityOutput[T]{
Result: &Result{
ID: retried.ID,
Status: retried.GetStatus(c.clock.Now()),
LastUpdatedTimestamp: retried.LastUpdatedTimestamp,
Version: retried.Version,
ID: retried.ID,
Status: retried.GetStatus(c.clock.Now()),
UpdatedAt: retried.UpdatedAt,
Version: retried.Version,
},
Message: retried,
}, nil
Expand Down Expand Up @@ -402,10 +402,10 @@ type MoveMessageToDLQInput struct {
}

type MoveMessageToDLQOutput struct {
ID string `json:"id"`
Status Status `json:"status"`
LastUpdatedTimestamp string `json:"last_updated_timestamp"`
Version int `json:"version"`
ID string `json:"id"`
Status Status `json:"status"`
UpdatedAt string `json:"updated_at"`
Version int `json:"version"`
}

func (c *client[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput, error) {
Expand All @@ -425,10 +425,10 @@ func (c *client[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToD
if markedErr := message.markAsMovedToDLQ(c.clock.Now()); markedErr != nil {
//lint:ignore nilerr reason
return &MoveMessageToDLQOutput{
ID: params.ID,
Status: message.GetStatus(c.clock.Now()),
LastUpdatedTimestamp: message.LastUpdatedTimestamp,
Version: message.Version,
ID: params.ID,
Status: message.GetStatus(c.clock.Now()),
UpdatedAt: message.UpdatedAt,
Version: message.Version,
}, nil
}
builder := expression.NewBuilder().
Expand All @@ -437,9 +437,9 @@ func (c *client[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToD
Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout)).
Set(expression.Name("receive_count"), expression.Value(message.ReceiveCount)).
Set(expression.Name("queue_type"), expression.Value(message.QueueType)).
Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)).
Set(expression.Name("queue_add_timestamp"), expression.Value(message.AddToQueueTimestamp)).
Set(expression.Name("queue_peek_timestamp"), expression.Value(message.AddToQueueTimestamp))).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("sent_at"), expression.Value(message.SentAt)).
Set(expression.Name("received_at"), expression.Value(message.SentAt))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
Expand All @@ -450,10 +450,10 @@ func (c *client[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToD
return &MoveMessageToDLQOutput{}, err
}
return &MoveMessageToDLQOutput{
ID: params.ID,
Status: updated.GetStatus(c.clock.Now()),
LastUpdatedTimestamp: updated.LastUpdatedTimestamp,
Version: updated.Version,
ID: params.ID,
Status: updated.GetStatus(c.clock.Now()),
UpdatedAt: updated.UpdatedAt,
Version: updated.Version,
}, nil
}

Expand All @@ -462,10 +462,10 @@ type RedriveMessageInput struct {
}

type RedriveMessageOutput struct {
ID string `json:"id"`
Status Status `json:"status"`
LastUpdatedTimestamp string `json:"last_updated_timestamp"`
Version int `json:"version"`
ID string `json:"id"`
Status Status `json:"status"`
UpdatedAt string `json:"updated_at"`
Version int `json:"version"`
}

func (c *client[T]) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput, error) {
Expand Down Expand Up @@ -497,11 +497,11 @@ func (c *client[T]) RedriveMessage(ctx context.Context, params *RedriveMessageIn
expression.Name("visibility_timeout"),
expression.Value(message.VisibilityTimeout),
).Set(
expression.Name("last_updated_timestamp"),
expression.Value(message.LastUpdatedTimestamp),
expression.Name("updated_at"),
expression.Value(message.UpdatedAt),
).Set(
expression.Name("queue_add_timestamp"),
expression.Value(message.AddToQueueTimestamp),
expression.Name("sent_at"),
expression.Value(message.SentAt),
)).
WithCondition(expression.Name("version").
Equal(expression.Value(message.Version)))
Expand All @@ -514,10 +514,10 @@ func (c *client[T]) RedriveMessage(ctx context.Context, params *RedriveMessageIn
return &RedriveMessageOutput{}, err
}
return &RedriveMessageOutput{
ID: updated.ID,
Status: updated.GetStatus(c.clock.Now()),
LastUpdatedTimestamp: updated.LastUpdatedTimestamp,
Version: updated.Version,
ID: updated.ID,
Status: updated.GetStatus(c.clock.Now()),
UpdatedAt: updated.UpdatedAt,
Version: updated.Version,
}, nil
}

Expand Down Expand Up @@ -758,7 +758,7 @@ func (c *client[T]) ListMessages(ctx context.Context, params *ListMessagesInput)
return &ListMessagesOutput[T]{}, UnmarshalingAttributeError{Cause: err}
}
sort.Slice(messages, func(i, j int) bool {
return messages[i].LastUpdatedTimestamp < messages[j].LastUpdatedTimestamp
return messages[i].UpdatedAt < messages[j].UpdatedAt
})
return &ListMessagesOutput[T]{Messages: messages}, nil
}
Expand Down Expand Up @@ -860,8 +860,8 @@ const (
)

type Result struct {
ID string `json:"id"`
Status Status `json:"status"`
LastUpdatedTimestamp string `json:"last_updated_timestamp"`
Version int `json:"version"`
ID string `json:"id"`
Status Status `json:"status"`
UpdatedAt string `json:"updated_at"`
Version int `json:"version"`
}
Loading

0 comments on commit 6aa0b46

Please sign in to comment.