Skip to content

Commit

Permalink
feat: update DynamoMQClient to store visibility timeout as datetime i…
Browse files Browse the repository at this point in the history
…n invisible_until_at and add retryInterval parameter to Consumer
  • Loading branch information
vvatanabe committed Dec 8, 2023
1 parent 6aa0b46 commit f147728
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 126 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,14 +337,14 @@ The DynamoDB table for the DynamoMQ message queue system is designed to efficien
|-------|--------------------|--------|-------------------------------------|
| PK | id | string | A-101 |
| | data | any | any |
| | visibility_timeout | number | 10 |
| | receive_count | number | 1 |
| GSIPK | queue_type | string | STANDARD or DLQ |
| | version | number | 1 |
| | created_at | string | 2006-01-02T15:04:05.999999999Z07:00 |
| | updated_at | string | 2006-01-02T15:04:05.999999999Z07:00 |
| GSISK | sent_at | string | 2006-01-02T15:04:05.999999999Z07:00 |
| | received_at | string | 2006-01-02T15:04:05.999999999Z07:00 |
| | invisible_until_at | string | 2006-01-02T15:04:05.999999999Z07:00 |

**PK (Primary Key)** `ID`: A unique identifier for each message, such as 'A-101'. This is a string value that facilitates the retrieval and management of messages.

Expand All @@ -354,12 +354,12 @@ The DynamoDB table for the DynamoMQ message queue system is designed to efficien

**Attributes**: These are the various properties associated with each message:
- `data`: This attribute holds the content of the message and can be of any type.
- `isibility_timeout`: The new value for the message's visibility timeout (in seconds).
- `receive_count`: A numerical count of how many times the message has been retrieved from the queue.
- `version`: A number that can be used for optimistic locking and to ensure that the message is not being concurrently modified.
- `created_at`: The date and time when the message was created. ISO 8601 format.
- `updated_at`: The date and time when the message was last updated. ISO 8601 format.
- `received_at`: The timestamp when the message was last viewed without being altered. ISO 8601 format.
- `invisible_until_at`: The timestamp indicating when the message becomes visible in the queue for processing. ISO 8601 format.

### Data Transition

Expand Down
28 changes: 18 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
DefaultQueueingIndexName = "dynamo-mq-index-queue_type-sent_at"
DefaultRetryMaxAttempts = 10
DefaultVisibilityTimeoutInSeconds = 30
DefaultVisibilityTimeout = DefaultVisibilityTimeoutInSeconds * time.Second
DefaultMaxListMessages = 10
DefaultQueryLimit = 250
)
Expand Down Expand Up @@ -206,6 +207,9 @@ func (c *client[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessageIn
if params.QueueType == "" {
params.QueueType = QueueTypeStandard
}
if params.VisibilityTimeout <= 0 {
params.VisibilityTimeout = DefaultVisibilityTimeoutInSeconds
}

selected, err := c.selectMessage(ctx, params)
if err != nil {
Expand Down Expand Up @@ -287,7 +291,7 @@ func (c *client[T]) processQueryResult(params *ReceiveMessageInput, queryResult
return nil, UnmarshalingAttributeError{Cause: err}
}

if err := message.markAsProcessing(c.clock.Now(), params.VisibilityTimeout); err == nil {
if err := message.markAsProcessing(c.clock.Now(), secToDur(params.VisibilityTimeout)); err == nil {
selected = &message
break
}
Expand All @@ -303,9 +307,9 @@ func (c *client[T]) processSelectedMessage(ctx context.Context, message *Message
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Add(expression.Name("receive_count"), expression.Value(1)).
Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout)).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("received_at"), expression.Value(message.ReceivedAt))).
Set(expression.Name("received_at"), expression.Value(message.ReceivedAt)).
Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
Expand Down Expand Up @@ -343,12 +347,12 @@ func (c *client[T]) ChangeMessageVisibility(ctx context.Context, params *ChangeM
return &ChangeMessageVisibilityOutput[T]{}, &IDNotFoundError{}
}
message := retrieved.Message
message.changeVisibilityTimeout(c.clock.Now(), params.VisibilityTimeout)
message.changeVisibility(c.clock.Now(), secToDur(params.VisibilityTimeout))
builder := expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout))).
Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
Expand Down Expand Up @@ -434,12 +438,12 @@ func (c *client[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToD
builder := expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout)).
Set(expression.Name("receive_count"), expression.Value(message.ReceiveCount)).
Set(expression.Name("queue_type"), expression.Value(message.QueueType)).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("sent_at"), expression.Value(message.SentAt)).
Set(expression.Name("received_at"), expression.Value(message.SentAt))).
Set(expression.Name("received_at"), expression.Value(message.SentAt)).
Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
Expand Down Expand Up @@ -493,15 +497,15 @@ func (c *client[T]) RedriveMessage(ctx context.Context, params *RedriveMessageIn
).Set(
expression.Name("queue_type"),
expression.Value(message.QueueType),
).Set(
expression.Name("visibility_timeout"),
expression.Value(message.VisibilityTimeout),
).Set(
expression.Name("updated_at"),
expression.Value(message.UpdatedAt),
).Set(
expression.Name("sent_at"),
expression.Value(message.SentAt),
).Set(
expression.Name("invisible_until_at"),
expression.Value(message.InvisibleUntilAt),
)).
WithCondition(expression.Name("version").
Equal(expression.Value(message.Version)))
Expand Down Expand Up @@ -845,6 +849,10 @@ func handleDynamoDBError(err error) error {
return DynamoDBAPIError{Cause: err}
}

func secToDur(sec int) time.Duration {
return time.Duration(sec) * time.Second
}

type Status string

const (
Expand Down
16 changes: 11 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) {

for i, want := range wants {
result, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{
QueueType: dynamomq.QueueTypeStandard,
VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds,
})
test.AssertError(t, err, nil, fmt.Sprintf("ReceiveMessage() [%d-1]", i))
Expand Down Expand Up @@ -383,10 +384,11 @@ func TestDynamoMQClientReceiveMessageNotUseFIFO(t *testing.T) {
testDynamoMQClientReceiveMessageSequence(t, false)
}

func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) {
func TestDynamoMQClientChangeMessageVisibility(t *testing.T) {
t.Parallel()
type args struct {
id string
id string
visibilityTimeout int
}
now := test.DefaultTestDate.Add(10 * time.Second)
tests := []ClientTestCase[args, *dynamomq.ChangeMessageVisibilityOutput[test.MessageData]]{
Expand All @@ -397,7 +399,8 @@ func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) {
T: now,
},
args: args{
id: "A-101",
id: "A-101",
visibilityTimeout: -1,
},
want: &dynamomq.ChangeMessageVisibilityOutput[test.MessageData]{
Result: &dynamomq.Result{
Expand All @@ -408,7 +411,9 @@ func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) {
},
Message: func() *dynamomq.Message[test.MessageData] {
m := NewTestMessageItemAsProcessing("A-101", now)
MarkAsReady(m, now)
ts := clock.FormatRFC3339Nano(now)
m.UpdatedAt = ts
m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(-1 * time.Second))
m.Version = 2
return m
}(),
Expand All @@ -418,7 +423,8 @@ func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) {
runTestsParallel[args, *dynamomq.ChangeMessageVisibilityOutput[test.MessageData]](t, "ChangeMessageVisibility()", tests,
func(client dynamomq.Client[test.MessageData], args args) (*dynamomq.ChangeMessageVisibilityOutput[test.MessageData], error) {
return client.ChangeMessageVisibility(context.Background(), &dynamomq.ChangeMessageVisibilityInput{
ID: args.id,
ID: args.id,
VisibilityTimeout: args.visibilityTimeout,
})
})
}
Expand Down
95 changes: 60 additions & 35 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
)

const (
defaultPollingInterval = time.Second
defaultMaximumReceives = 0 // unlimited
defaultQueueType = QueueTypeStandard
defaultConcurrency = 3
defaultPollingInterval = time.Second
defaultMaximumReceives = 0 // unlimited
defaultRetryIntervalInSeconds = 1
defaultQueueType = QueueTypeStandard
defaultConcurrency = 3
)

func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions) {
Expand All @@ -35,6 +36,18 @@ func WithMaximumReceives(maximumReceives int) func(o *ConsumerOptions) {
}
}

func WithVisibilityTimeout(sec int) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.VisibilityTimeout = sec
}
}

func WithRetryInterval(sec int) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.RetryInterval = sec
}
}

func WithQueueType(queueType QueueType) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.QueueType = queueType
Expand All @@ -54,10 +67,12 @@ func WithOnShutdown(onShutdown []func()) func(o *ConsumerOptions) {
}

type ConsumerOptions struct {
PollingInterval time.Duration
Concurrency int
MaximumReceives int
QueueType QueueType
PollingInterval time.Duration
Concurrency int
MaximumReceives int
VisibilityTimeout int
RetryInterval int
QueueType QueueType
// errorLog specifies an optional logger for errors accepting
// connections, unexpected behavior from handlers, and
// underlying FileSystem errors.
Expand All @@ -68,28 +83,32 @@ type ConsumerOptions struct {

func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ...func(o *ConsumerOptions)) *Consumer[T] {
o := &ConsumerOptions{
PollingInterval: defaultPollingInterval,
Concurrency: defaultConcurrency,
MaximumReceives: defaultMaximumReceives,
QueueType: defaultQueueType,
PollingInterval: defaultPollingInterval,
Concurrency: defaultConcurrency,
MaximumReceives: defaultMaximumReceives,
VisibilityTimeout: DefaultVisibilityTimeoutInSeconds,
RetryInterval: defaultRetryIntervalInSeconds,
QueueType: defaultQueueType,
}
for _, opt := range opts {
opt(o)
}
return &Consumer[T]{
client: client,
messageProcessor: processor,
pollingInterval: o.PollingInterval,
concurrency: o.Concurrency,
maximumReceives: o.MaximumReceives,
queueType: o.QueueType,
errorLog: o.ErrorLog,
onShutdown: o.OnShutdown,
inShutdown: 0,
mu: sync.Mutex{},
activeMessages: make(map[*Message[T]]struct{}),
activeMessagesWG: sync.WaitGroup{},
doneChan: make(chan struct{}),
client: client,
messageProcessor: processor,
pollingInterval: o.PollingInterval,
concurrency: o.Concurrency,
maximumReceives: o.MaximumReceives,
visibilityTimeout: o.VisibilityTimeout,
retryInterval: o.RetryInterval,
queueType: o.QueueType,
errorLog: o.ErrorLog,
onShutdown: o.OnShutdown,
inShutdown: 0,
mu: sync.Mutex{},
activeMessages: make(map[*Message[T]]struct{}),
activeMessagesWG: sync.WaitGroup{},
doneChan: make(chan struct{}),
}
}

Expand All @@ -104,14 +123,16 @@ func (f MessageProcessorFunc[T]) Process(msg *Message[T]) error {
}

type Consumer[T any] struct {
client Client[T]
messageProcessor MessageProcessor[T]
concurrency int
pollingInterval time.Duration
maximumReceives int
queueType QueueType
errorLog *log.Logger
onShutdown []func()
client Client[T]
messageProcessor MessageProcessor[T]
concurrency int
pollingInterval time.Duration
maximumReceives int
visibilityTimeout int
retryInterval int
queueType QueueType
errorLog *log.Logger
onShutdown []func()

inShutdown int32
mu sync.Mutex
Expand All @@ -138,7 +159,7 @@ func (c *Consumer[T]) StartConsuming() error {
ctx := context.Background()
r, err := c.client.ReceiveMessage(ctx, &ReceiveMessageInput{
QueueType: c.queueType,
VisibilityTimeout: DefaultVisibilityTimeoutInSeconds,
VisibilityTimeout: c.visibilityTimeout,
})
if err != nil {
if c.shuttingDown() {
Expand Down Expand Up @@ -187,7 +208,11 @@ func (c *Consumer[T]) shouldRetry(msg *Message[T]) bool {
}

func (c *Consumer[T]) retryMessage(ctx context.Context, msg *Message[T]) {
if _, err := c.client.ChangeMessageVisibility(ctx, &ChangeMessageVisibilityInput{ID: msg.ID}); err != nil {
in := &ChangeMessageVisibilityInput{
ID: msg.ID,
VisibilityTimeout: c.retryInterval,
}
if _, err := c.client.ChangeMessageVisibility(ctx, in); err != nil {
c.logf("DynamoMQ: Failed to update a message as visible. %s", err)
}
}
Expand Down
2 changes: 2 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func TestConsumerStartConsuming(t *testing.T) {
dynamomq.WithPollingInterval(0),
dynamomq.WithConcurrency(3),
dynamomq.WithMaximumReceives(tt.MaximumReceives),
dynamomq.WithVisibilityTimeout(30),
dynamomq.WithRetryInterval(1),
dynamomq.WithQueueType(tt.QueueType),
dynamomq.WithErrorLog(log.New(os.Stderr, "", 0)),
dynamomq.WithOnShutdown([]func(){}))
Expand Down
12 changes: 3 additions & 9 deletions dynamomq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,21 @@ import (
"github.com/vvatanabe/dynamomq/internal/test"
)

func MarkAsReady[T any](m *dynamomq.Message[T], now time.Time) {
ts := clock.FormatRFC3339Nano(now)
m.VisibilityTimeout = 0
m.UpdatedAt = ts
}

func MarkAsProcessing[T any](m *dynamomq.Message[T], now time.Time) {
ts := clock.FormatRFC3339Nano(now)
m.VisibilityTimeout = dynamomq.DefaultVisibilityTimeoutInSeconds
m.UpdatedAt = ts
m.ReceivedAt = ts
m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(dynamomq.DefaultVisibilityTimeout))
}

func MarkAsMovedToDLQ[T any](m *dynamomq.Message[T], now time.Time) {
ts := clock.FormatRFC3339Nano(now)
m.QueueType = dynamomq.QueueTypeDLQ
m.VisibilityTimeout = 0
m.ReceiveCount = 0
m.UpdatedAt = ts
m.SentAt = ts
m.ReceivedAt = ""
m.InvisibleUntilAt = ""
}

func NewTestMessageItemAsReady(id string, now time.Time) *dynamomq.Message[test.MessageData] {
Expand All @@ -53,7 +47,7 @@ func NewMessageFromReadyToProcessing(id string,
MarkAsProcessing(m, processingTime)
m.Version = 2
m.ReceiveCount = 1
m.VisibilityTimeout = dynamomq.DefaultVisibilityTimeoutInSeconds
m.InvisibleUntilAt = clock.FormatRFC3339Nano(processingTime.Add(dynamomq.DefaultVisibilityTimeout))
r := &dynamomq.ReceiveMessageOutput[test.MessageData]{
Result: &dynamomq.Result{
ID: m.ID,
Expand Down
Loading

0 comments on commit f147728

Please sign in to comment.