From f147728de74ce0abdbba3126fc42b14ad7035dd9 Mon Sep 17 00:00:00 2001 From: vvatanabe Date: Fri, 8 Dec 2023 18:17:36 +0900 Subject: [PATCH] feat: update DynamoMQClient to store visibility timeout as datetime in invisible_until_at and add retryInterval parameter to Consumer --- README.md | 4 +- client.go | 28 ++++++++----- client_test.go | 16 +++++--- consumer.go | 95 ++++++++++++++++++++++++++++---------------- consumer_test.go | 2 + dynamomq_test.go | 12 ++---- internal/cmd/root.go | 40 ++++++++++--------- message.go | 65 +++++++++++++++--------------- message_test.go | 24 +++++------ 9 files changed, 160 insertions(+), 126 deletions(-) diff --git a/README.md b/README.md index b714a83..3354881 100644 --- a/README.md +++ b/README.md @@ -337,7 +337,6 @@ The DynamoDB table for the DynamoMQ message queue system is designed to efficien |-------|--------------------|--------|-------------------------------------| | PK | id | string | A-101 | | | data | any | any | -| | visibility_timeout | number | 10 | | | receive_count | number | 1 | | GSIPK | queue_type | string | STANDARD or DLQ | | | version | number | 1 | @@ -345,6 +344,7 @@ The DynamoDB table for the DynamoMQ message queue system is designed to efficien | | updated_at | string | 2006-01-02T15:04:05.999999999Z07:00 | | GSISK | sent_at | string | 2006-01-02T15:04:05.999999999Z07:00 | | | received_at | string | 2006-01-02T15:04:05.999999999Z07:00 | +| | invisible_until_at | string | 2006-01-02T15:04:05.999999999Z07:00 | **PK (Primary Key)** `ID`: A unique identifier for each message, such as 'A-101'. This is a string value that facilitates the retrieval and management of messages. @@ -354,12 +354,12 @@ The DynamoDB table for the DynamoMQ message queue system is designed to efficien **Attributes**: These are the various properties associated with each message: - `data`: This attribute holds the content of the message and can be of any type. -- `isibility_timeout`: The new value for the message's visibility timeout (in seconds). - `receive_count`: A numerical count of how many times the message has been retrieved from the queue. - `version`: A number that can be used for optimistic locking and to ensure that the message is not being concurrently modified. - `created_at`: The date and time when the message was created. ISO 8601 format. - `updated_at`: The date and time when the message was last updated. ISO 8601 format. - `received_at`: The timestamp when the message was last viewed without being altered. ISO 8601 format. +- `invisible_until_at`: The timestamp indicating when the message becomes visible in the queue for processing. ISO 8601 format. ### Data Transition diff --git a/client.go b/client.go index c458c8e..adc4556 100644 --- a/client.go +++ b/client.go @@ -19,6 +19,7 @@ const ( DefaultQueueingIndexName = "dynamo-mq-index-queue_type-sent_at" DefaultRetryMaxAttempts = 10 DefaultVisibilityTimeoutInSeconds = 30 + DefaultVisibilityTimeout = DefaultVisibilityTimeoutInSeconds * time.Second DefaultMaxListMessages = 10 DefaultQueryLimit = 250 ) @@ -206,6 +207,9 @@ func (c *client[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessageIn if params.QueueType == "" { params.QueueType = QueueTypeStandard } + if params.VisibilityTimeout <= 0 { + params.VisibilityTimeout = DefaultVisibilityTimeoutInSeconds + } selected, err := c.selectMessage(ctx, params) if err != nil { @@ -287,7 +291,7 @@ func (c *client[T]) processQueryResult(params *ReceiveMessageInput, queryResult return nil, UnmarshalingAttributeError{Cause: err} } - if err := message.markAsProcessing(c.clock.Now(), params.VisibilityTimeout); err == nil { + if err := message.markAsProcessing(c.clock.Now(), secToDur(params.VisibilityTimeout)); err == nil { selected = &message break } @@ -303,9 +307,9 @@ func (c *client[T]) processSelectedMessage(ctx context.Context, message *Message WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). Add(expression.Name("receive_count"), expression.Value(1)). - Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout)). Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)). - Set(expression.Name("received_at"), expression.Value(message.ReceivedAt))). + Set(expression.Name("received_at"), expression.Value(message.ReceivedAt)). + Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))). WithCondition(expression.Name("version").Equal(expression.Value(message.Version))) expr, err := c.buildExpression(builder) if err != nil { @@ -343,12 +347,12 @@ func (c *client[T]) ChangeMessageVisibility(ctx context.Context, params *ChangeM return &ChangeMessageVisibilityOutput[T]{}, &IDNotFoundError{} } message := retrieved.Message - message.changeVisibilityTimeout(c.clock.Now(), params.VisibilityTimeout) + message.changeVisibility(c.clock.Now(), secToDur(params.VisibilityTimeout)) builder := expression.NewBuilder(). WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)). - Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout))). + Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))). WithCondition(expression.Name("version").Equal(expression.Value(message.Version))) expr, err := c.buildExpression(builder) if err != nil { @@ -434,12 +438,12 @@ func (c *client[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToD builder := expression.NewBuilder(). WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). - Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout)). Set(expression.Name("receive_count"), expression.Value(message.ReceiveCount)). Set(expression.Name("queue_type"), expression.Value(message.QueueType)). Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)). Set(expression.Name("sent_at"), expression.Value(message.SentAt)). - Set(expression.Name("received_at"), expression.Value(message.SentAt))). + Set(expression.Name("received_at"), expression.Value(message.SentAt)). + Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))). WithCondition(expression.Name("version").Equal(expression.Value(message.Version))) expr, err := c.buildExpression(builder) if err != nil { @@ -493,15 +497,15 @@ func (c *client[T]) RedriveMessage(ctx context.Context, params *RedriveMessageIn ).Set( expression.Name("queue_type"), expression.Value(message.QueueType), - ).Set( - expression.Name("visibility_timeout"), - expression.Value(message.VisibilityTimeout), ).Set( expression.Name("updated_at"), expression.Value(message.UpdatedAt), ).Set( expression.Name("sent_at"), expression.Value(message.SentAt), + ).Set( + expression.Name("invisible_until_at"), + expression.Value(message.InvisibleUntilAt), )). WithCondition(expression.Name("version"). Equal(expression.Value(message.Version))) @@ -845,6 +849,10 @@ func handleDynamoDBError(err error) error { return DynamoDBAPIError{Cause: err} } +func secToDur(sec int) time.Duration { + return time.Duration(sec) * time.Second +} + type Status string const ( diff --git a/client_test.go b/client_test.go index 3ce5be1..7f92211 100644 --- a/client_test.go +++ b/client_test.go @@ -347,6 +347,7 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) { for i, want := range wants { result, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{ + QueueType: dynamomq.QueueTypeStandard, VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds, }) test.AssertError(t, err, nil, fmt.Sprintf("ReceiveMessage() [%d-1]", i)) @@ -383,10 +384,11 @@ func TestDynamoMQClientReceiveMessageNotUseFIFO(t *testing.T) { testDynamoMQClientReceiveMessageSequence(t, false) } -func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) { +func TestDynamoMQClientChangeMessageVisibility(t *testing.T) { t.Parallel() type args struct { - id string + id string + visibilityTimeout int } now := test.DefaultTestDate.Add(10 * time.Second) tests := []ClientTestCase[args, *dynamomq.ChangeMessageVisibilityOutput[test.MessageData]]{ @@ -397,7 +399,8 @@ func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) { T: now, }, args: args{ - id: "A-101", + id: "A-101", + visibilityTimeout: -1, }, want: &dynamomq.ChangeMessageVisibilityOutput[test.MessageData]{ Result: &dynamomq.Result{ @@ -408,7 +411,9 @@ func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) { }, Message: func() *dynamomq.Message[test.MessageData] { m := NewTestMessageItemAsProcessing("A-101", now) - MarkAsReady(m, now) + ts := clock.FormatRFC3339Nano(now) + m.UpdatedAt = ts + m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(-1 * time.Second)) m.Version = 2 return m }(), @@ -418,7 +423,8 @@ func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) { runTestsParallel[args, *dynamomq.ChangeMessageVisibilityOutput[test.MessageData]](t, "ChangeMessageVisibility()", tests, func(client dynamomq.Client[test.MessageData], args args) (*dynamomq.ChangeMessageVisibilityOutput[test.MessageData], error) { return client.ChangeMessageVisibility(context.Background(), &dynamomq.ChangeMessageVisibilityInput{ - ID: args.id, + ID: args.id, + VisibilityTimeout: args.visibilityTimeout, }) }) } diff --git a/consumer.go b/consumer.go index 4123600..6c4c269 100644 --- a/consumer.go +++ b/consumer.go @@ -11,10 +11,11 @@ import ( ) const ( - defaultPollingInterval = time.Second - defaultMaximumReceives = 0 // unlimited - defaultQueueType = QueueTypeStandard - defaultConcurrency = 3 + defaultPollingInterval = time.Second + defaultMaximumReceives = 0 // unlimited + defaultRetryIntervalInSeconds = 1 + defaultQueueType = QueueTypeStandard + defaultConcurrency = 3 ) func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions) { @@ -35,6 +36,18 @@ func WithMaximumReceives(maximumReceives int) func(o *ConsumerOptions) { } } +func WithVisibilityTimeout(sec int) func(o *ConsumerOptions) { + return func(o *ConsumerOptions) { + o.VisibilityTimeout = sec + } +} + +func WithRetryInterval(sec int) func(o *ConsumerOptions) { + return func(o *ConsumerOptions) { + o.RetryInterval = sec + } +} + func WithQueueType(queueType QueueType) func(o *ConsumerOptions) { return func(o *ConsumerOptions) { o.QueueType = queueType @@ -54,10 +67,12 @@ func WithOnShutdown(onShutdown []func()) func(o *ConsumerOptions) { } type ConsumerOptions struct { - PollingInterval time.Duration - Concurrency int - MaximumReceives int - QueueType QueueType + PollingInterval time.Duration + Concurrency int + MaximumReceives int + VisibilityTimeout int + RetryInterval int + QueueType QueueType // errorLog specifies an optional logger for errors accepting // connections, unexpected behavior from handlers, and // underlying FileSystem errors. @@ -68,28 +83,32 @@ type ConsumerOptions struct { func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ...func(o *ConsumerOptions)) *Consumer[T] { o := &ConsumerOptions{ - PollingInterval: defaultPollingInterval, - Concurrency: defaultConcurrency, - MaximumReceives: defaultMaximumReceives, - QueueType: defaultQueueType, + PollingInterval: defaultPollingInterval, + Concurrency: defaultConcurrency, + MaximumReceives: defaultMaximumReceives, + VisibilityTimeout: DefaultVisibilityTimeoutInSeconds, + RetryInterval: defaultRetryIntervalInSeconds, + QueueType: defaultQueueType, } for _, opt := range opts { opt(o) } return &Consumer[T]{ - client: client, - messageProcessor: processor, - pollingInterval: o.PollingInterval, - concurrency: o.Concurrency, - maximumReceives: o.MaximumReceives, - queueType: o.QueueType, - errorLog: o.ErrorLog, - onShutdown: o.OnShutdown, - inShutdown: 0, - mu: sync.Mutex{}, - activeMessages: make(map[*Message[T]]struct{}), - activeMessagesWG: sync.WaitGroup{}, - doneChan: make(chan struct{}), + client: client, + messageProcessor: processor, + pollingInterval: o.PollingInterval, + concurrency: o.Concurrency, + maximumReceives: o.MaximumReceives, + visibilityTimeout: o.VisibilityTimeout, + retryInterval: o.RetryInterval, + queueType: o.QueueType, + errorLog: o.ErrorLog, + onShutdown: o.OnShutdown, + inShutdown: 0, + mu: sync.Mutex{}, + activeMessages: make(map[*Message[T]]struct{}), + activeMessagesWG: sync.WaitGroup{}, + doneChan: make(chan struct{}), } } @@ -104,14 +123,16 @@ func (f MessageProcessorFunc[T]) Process(msg *Message[T]) error { } type Consumer[T any] struct { - client Client[T] - messageProcessor MessageProcessor[T] - concurrency int - pollingInterval time.Duration - maximumReceives int - queueType QueueType - errorLog *log.Logger - onShutdown []func() + client Client[T] + messageProcessor MessageProcessor[T] + concurrency int + pollingInterval time.Duration + maximumReceives int + visibilityTimeout int + retryInterval int + queueType QueueType + errorLog *log.Logger + onShutdown []func() inShutdown int32 mu sync.Mutex @@ -138,7 +159,7 @@ func (c *Consumer[T]) StartConsuming() error { ctx := context.Background() r, err := c.client.ReceiveMessage(ctx, &ReceiveMessageInput{ QueueType: c.queueType, - VisibilityTimeout: DefaultVisibilityTimeoutInSeconds, + VisibilityTimeout: c.visibilityTimeout, }) if err != nil { if c.shuttingDown() { @@ -187,7 +208,11 @@ func (c *Consumer[T]) shouldRetry(msg *Message[T]) bool { } func (c *Consumer[T]) retryMessage(ctx context.Context, msg *Message[T]) { - if _, err := c.client.ChangeMessageVisibility(ctx, &ChangeMessageVisibilityInput{ID: msg.ID}); err != nil { + in := &ChangeMessageVisibilityInput{ + ID: msg.ID, + VisibilityTimeout: c.retryInterval, + } + if _, err := c.client.ChangeMessageVisibility(ctx, in); err != nil { c.logf("DynamoMQ: Failed to update a message as visible. %s", err) } } diff --git a/consumer_test.go b/consumer_test.go index a342ff1..5e18ecc 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -127,6 +127,8 @@ func TestConsumerStartConsuming(t *testing.T) { dynamomq.WithPollingInterval(0), dynamomq.WithConcurrency(3), dynamomq.WithMaximumReceives(tt.MaximumReceives), + dynamomq.WithVisibilityTimeout(30), + dynamomq.WithRetryInterval(1), dynamomq.WithQueueType(tt.QueueType), dynamomq.WithErrorLog(log.New(os.Stderr, "", 0)), dynamomq.WithOnShutdown([]func(){})) diff --git a/dynamomq_test.go b/dynamomq_test.go index 8fc5dcb..92cbfe5 100644 --- a/dynamomq_test.go +++ b/dynamomq_test.go @@ -8,27 +8,21 @@ import ( "github.com/vvatanabe/dynamomq/internal/test" ) -func MarkAsReady[T any](m *dynamomq.Message[T], now time.Time) { - ts := clock.FormatRFC3339Nano(now) - m.VisibilityTimeout = 0 - m.UpdatedAt = ts -} - func MarkAsProcessing[T any](m *dynamomq.Message[T], now time.Time) { ts := clock.FormatRFC3339Nano(now) - m.VisibilityTimeout = dynamomq.DefaultVisibilityTimeoutInSeconds m.UpdatedAt = ts m.ReceivedAt = ts + m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(dynamomq.DefaultVisibilityTimeout)) } func MarkAsMovedToDLQ[T any](m *dynamomq.Message[T], now time.Time) { ts := clock.FormatRFC3339Nano(now) m.QueueType = dynamomq.QueueTypeDLQ - m.VisibilityTimeout = 0 m.ReceiveCount = 0 m.UpdatedAt = ts m.SentAt = ts m.ReceivedAt = "" + m.InvisibleUntilAt = "" } func NewTestMessageItemAsReady(id string, now time.Time) *dynamomq.Message[test.MessageData] { @@ -53,7 +47,7 @@ func NewMessageFromReadyToProcessing(id string, MarkAsProcessing(m, processingTime) m.Version = 2 m.ReceiveCount = 1 - m.VisibilityTimeout = dynamomq.DefaultVisibilityTimeoutInSeconds + m.InvisibleUntilAt = clock.FormatRFC3339Nano(processingTime.Add(dynamomq.DefaultVisibilityTimeout)) r := &dynamomq.ReceiveMessageOutput[test.MessageData]{ Result: &dynamomq.Result{ ID: m.ID, diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 75d28bf..6f9d15c 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -118,41 +118,43 @@ func Execute() { } type SystemInfo struct { - ID string `json:"id"` - Status dynamomq.Status `json:"status"` - ReceiveCount int `json:"receive_count"` - QueueType dynamomq.QueueType `json:"queue_type"` - Version int `json:"version"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` - SentAt string `json:"sent_at"` - ReceivedAt string `json:"received_at"` + ID string `json:"id"` + Status dynamomq.Status `json:"status"` + ReceiveCount int `json:"receive_count"` + QueueType dynamomq.QueueType `json:"queue_type"` + Version int `json:"version"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + SentAt string `json:"sent_at"` + ReceivedAt string `json:"received_at"` + InvisibleUntilAt string `json:"invisible_until_at"` } func GetSystemInfo[T any](m *dynamomq.Message[T]) *SystemInfo { return &SystemInfo{ - ID: m.ID, - Status: m.GetStatus(clock.Now()), - ReceiveCount: m.ReceiveCount, - QueueType: m.QueueType, - Version: m.Version, - CreatedAt: m.CreatedAt, - UpdatedAt: m.UpdatedAt, - SentAt: m.SentAt, - ReceivedAt: m.ReceivedAt, + ID: m.ID, + Status: m.GetStatus(clock.Now()), + ReceiveCount: m.ReceiveCount, + QueueType: m.QueueType, + Version: m.Version, + CreatedAt: m.CreatedAt, + UpdatedAt: m.UpdatedAt, + SentAt: m.SentAt, + ReceivedAt: m.ReceivedAt, + InvisibleUntilAt: m.InvisibleUntilAt, } } func ResetSystemInfo[T any](m *dynamomq.Message[T], now time.Time) { msg := dynamomq.NewMessage[T](m.ID, m.Data, now) m.QueueType = msg.QueueType - m.VisibilityTimeout = msg.VisibilityTimeout m.ReceiveCount = msg.ReceiveCount m.Version = msg.Version m.CreatedAt = msg.CreatedAt m.UpdatedAt = msg.UpdatedAt m.SentAt = msg.SentAt m.ReceivedAt = msg.ReceivedAt + m.InvisibleUntilAt = msg.InvisibleUntilAt } func init() { diff --git a/message.go b/message.go index 96ecb9d..1406314 100644 --- a/message.go +++ b/message.go @@ -9,50 +9,51 @@ import ( func NewMessage[T any](id string, data T, now time.Time) *Message[T] { ts := clock.FormatRFC3339Nano(now) return &Message[T]{ - ID: id, - Data: data, - ReceiveCount: 0, - VisibilityTimeout: 0, - QueueType: QueueTypeStandard, - Version: 1, - CreatedAt: ts, - UpdatedAt: ts, - SentAt: ts, - ReceivedAt: "", + ID: id, + Data: data, + ReceiveCount: 0, + QueueType: QueueTypeStandard, + Version: 1, + CreatedAt: ts, + UpdatedAt: ts, + SentAt: ts, + ReceivedAt: "", + InvisibleUntilAt: "", } } type Message[T any] struct { - ID string `json:"id" dynamodbav:"id"` - Data T `json:"data" dynamodbav:"data"` - // The new value for the message's visibility timeout (in seconds). - VisibilityTimeout int `json:"visibility_timeout" dynamodbav:"visibility_timeout"` - ReceiveCount int `json:"receive_count" dynamodbav:"receive_count"` - QueueType QueueType `json:"queue_type" dynamodbav:"queue_type,omitempty"` - Version int `json:"version" dynamodbav:"version"` - CreatedAt string `json:"created_at" dynamodbav:"created_at"` - UpdatedAt string `json:"updated_at" dynamodbav:"updated_at"` - SentAt string `json:"sent_at" dynamodbav:"sent_at"` - ReceivedAt string `json:"received_at" dynamodbav:"received_at"` + ID string `json:"id" dynamodbav:"id"` + Data T `json:"data" dynamodbav:"data"` + ReceiveCount int `json:"receive_count" dynamodbav:"receive_count"` + QueueType QueueType `json:"queue_type" dynamodbav:"queue_type,omitempty"` + Version int `json:"version" dynamodbav:"version"` + CreatedAt string `json:"created_at" dynamodbav:"created_at"` + UpdatedAt string `json:"updated_at" dynamodbav:"updated_at"` + SentAt string `json:"sent_at" dynamodbav:"sent_at"` + ReceivedAt string `json:"received_at" dynamodbav:"received_at"` + InvisibleUntilAt string `json:"invisible_until_at" dynamodbav:"invisible_until_at"` } func (m *Message[T]) GetStatus(now time.Time) Status { - peekUTCTime := clock.RFC3339NanoToTime(m.ReceivedAt) - invisibleTime := peekUTCTime.Add(time.Duration(m.VisibilityTimeout) * time.Second) - if now.Before(invisibleTime) { - return StatusProcessing + if m.InvisibleUntilAt == "" { + return StatusReady } - return StatusReady + invisibleUntilAtTime := clock.RFC3339NanoToTime(m.InvisibleUntilAt) + if now.After(invisibleUntilAtTime) { + return StatusReady + } + return StatusProcessing } func (m *Message[T]) isDLQ() bool { return m.QueueType == QueueTypeDLQ } -func (m *Message[T]) changeVisibilityTimeout(now time.Time, visibilityTimeout int) { +func (m *Message[T]) changeVisibility(now time.Time, visibilityTimeout time.Duration) { ts := clock.FormatRFC3339Nano(now) m.UpdatedAt = ts - m.VisibilityTimeout = visibilityTimeout + m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(visibilityTimeout)) } func (m *Message[T]) delayToSentAt(delay time.Duration) { @@ -60,7 +61,7 @@ func (m *Message[T]) delayToSentAt(delay time.Duration) { m.SentAt = clock.FormatRFC3339Nano(delayed) } -func (m *Message[T]) markAsProcessing(now time.Time, visibilityTimeout int) error { +func (m *Message[T]) markAsProcessing(now time.Time, visibilityTimeout time.Duration) error { status := m.GetStatus(now) if status == StatusProcessing { return InvalidStateTransitionError{ @@ -72,7 +73,7 @@ func (m *Message[T]) markAsProcessing(now time.Time, visibilityTimeout int) erro ts := clock.FormatRFC3339Nano(now) m.UpdatedAt = ts m.ReceivedAt = ts - m.VisibilityTimeout = visibilityTimeout + m.InvisibleUntilAt = clock.FormatRFC3339Nano(now.Add(visibilityTimeout)) return nil } @@ -87,10 +88,10 @@ func (m *Message[T]) markAsMovedToDLQ(now time.Time) error { ts := clock.FormatRFC3339Nano(now) m.QueueType = QueueTypeDLQ m.ReceiveCount = 0 - m.VisibilityTimeout = 0 m.UpdatedAt = ts m.SentAt = ts m.ReceivedAt = "" + m.InvisibleUntilAt = "" return nil } @@ -112,10 +113,10 @@ func (m *Message[T]) markAsRestoredFromDLQ(now time.Time) error { } ts := clock.FormatRFC3339Nano(now) m.QueueType = QueueTypeStandard - m.VisibilityTimeout = 0 m.ReceiveCount = 0 m.UpdatedAt = ts m.SentAt = ts m.ReceivedAt = "" + m.InvisibleUntilAt = "" return nil } diff --git a/message_test.go b/message_test.go index 2ec57ac..c7692f8 100644 --- a/message_test.go +++ b/message_test.go @@ -21,10 +21,9 @@ func TestMessageGetStatus(t *testing.T) { } tests := []testCase[any]{ { - name: "should return StatusReady when VisibilityTimeout is 0", + name: "should return StatusReady when InvisibleUntilAt is empty", m: dynamomq.Message[any]{ - VisibilityTimeout: 0, - ReceivedAt: clock.FormatRFC3339Nano(test.DefaultTestDate), + InvisibleUntilAt: "", }, args: args{ now: test.DefaultTestDate, @@ -32,21 +31,19 @@ func TestMessageGetStatus(t *testing.T) { want: dynamomq.StatusReady, }, { - name: "should return StatusProcessing when current time is before VisibilityTimeout", + name: "should return StatusProcessing when current time is before InvisibleUntilAt", m: dynamomq.Message[any]{ - VisibilityTimeout: 1, - ReceivedAt: clock.FormatRFC3339Nano(test.DefaultTestDate.Add(time.Second)), + InvisibleUntilAt: clock.FormatRFC3339Nano(test.DefaultTestDate.Add(time.Second)), }, args: args{ - now: test.DefaultTestDate.Add(time.Second), + now: test.DefaultTestDate, }, want: dynamomq.StatusProcessing, }, { - name: "should return StatusReady when current time is after VisibilityTimeout", + name: "should return StatusReady when current time is after InvisibleUntilAt", m: dynamomq.Message[any]{ - VisibilityTimeout: 5, - ReceivedAt: clock.FormatRFC3339Nano(test.DefaultTestDate), + InvisibleUntilAt: clock.FormatRFC3339Nano(test.DefaultTestDate.Add(time.Second * 5)), }, args: args{ now: test.DefaultTestDate.Add(time.Second * 6), @@ -54,13 +51,12 @@ func TestMessageGetStatus(t *testing.T) { want: dynamomq.StatusReady, }, { - name: "should return StatusProcessing when current time is equal VisibilityTimeout", + name: "should return StatusProcessing when current time is equal InvisibleUntilAt", m: dynamomq.Message[any]{ - VisibilityTimeout: 4, - ReceivedAt: clock.FormatRFC3339Nano(test.DefaultTestDate.Add(time.Second * 4)), + InvisibleUntilAt: clock.FormatRFC3339Nano(test.DefaultTestDate), }, args: args{ - now: time.Date(2021, 1, 1, 0, 0, 4, 0, time.UTC), + now: test.DefaultTestDate, }, want: dynamomq.StatusProcessing, },