Skip to content

Commit

Permalink
feat: implement FIFO queues
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Oct 20, 2023
1 parent 9f1ee40 commit 443716d
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 9 deletions.
8 changes: 3 additions & 5 deletions sdk/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ func (m *Message[T]) MarkAsEnqueued(now time.Time) {
m.Queued = 1
m.DLQ = 0
m.LastUpdatedTimestamp = ts
m.LastUpdatedTimestamp = ts
m.AddToQueueTimestamp = ts
m.Status = StatusReady
}

func (m *Message[T]) MarkAsPeeked(now time.Time) {
ts := clock.FormatRFC3339(now)
m.Queued = 1
m.LastUpdatedTimestamp = ts
m.LastUpdatedTimestamp = ts
// IMPORTANT
// please note, we are not updating top-level attribute `last_updated_timestamp` in order to avoid re-indexing the order
// m.LastUpdatedTimestamp = ts
m.PeekFromQueueTimestamp = ts
m.Status = StatusProcessing
}
Expand All @@ -167,7 +167,6 @@ func (m *Message[T]) MarkAsDone(now time.Time) {
ts := clock.FormatRFC3339(now)
m.Queued = 0
m.DLQ = 0
m.LastUpdatedTimestamp = ts
m.Status = StatusCompleted
m.LastUpdatedTimestamp = ts
m.CompleteFromQueueTimestamp = ts
Expand All @@ -178,7 +177,6 @@ func (m *Message[T]) MarkAsDLQ(now time.Time) {
m.Queued = 0
m.DLQ = 1
m.LastUpdatedTimestamp = ts
m.LastUpdatedTimestamp = ts
m.AddToDLQTimestamp = ts
m.Status = StatusInDLQ
}
Expand Down
23 changes: 19 additions & 4 deletions sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type queueSDKClient[T any] struct {
retryMaxAttempts int
visibilityTimeoutInMinutes int
maximumReceives int
useFIFO bool

clock clock.Clock
}
Expand All @@ -74,6 +75,7 @@ type options struct {
retryMaxAttempts int
visibilityTimeoutInMinutes int
maximumReceives int
useFIFO bool
dynamoDB *dynamodb.Client
clock clock.Clock
}
Expand Down Expand Up @@ -122,6 +124,12 @@ func WithAWSVisibilityTimeout(minutes int) Option {
}
}

func WithUseFIFO(useFIFO bool) Option {
return func(s *options) {
s.useFIFO = useFIFO
}
}

func WithAWSDynamoDBClient(client *dynamodb.Client) Option {
return func(s *options) {
s.dynamoDB = client
Expand All @@ -135,6 +143,7 @@ func NewQueueSDKClient[T any](ctx context.Context, opts ...Option) (QueueSDKClie
awsCredentialsProfileName: AwsProfileDefault,
retryMaxAttempts: DefaultRetryMaxAttempts,
visibilityTimeoutInMinutes: DefaultVisibilityTimeoutInMinutes,
useFIFO: false,
clock: &clock.RealClock{},
}
for _, opt := range opts {
Expand All @@ -149,6 +158,7 @@ func NewQueueSDKClient[T any](ctx context.Context, opts ...Option) (QueueSDKClie
retryMaxAttempts: o.retryMaxAttempts,
visibilityTimeoutInMinutes: o.visibilityTimeoutInMinutes,
maximumReceives: o.maximumReceives,
useFIFO: o.useFIFO,
dynamoDB: o.dynamoDB,
clock: o.clock,
}
Expand Down Expand Up @@ -605,7 +615,11 @@ func (c *queueSDKClient[T]) Peek(ctx context.Context) (*PeekResult[T], error) {
return nil, &UnmarshalingAttributeError{Cause: err}
}
visibilityTimeout := time.Duration(c.visibilityTimeoutInMinutes) * time.Minute
if !item.IsQueueSelected(c.clock.Now(), visibilityTimeout) {
isQueueSelected := item.IsQueueSelected(c.clock.Now(), visibilityTimeout)
if c.useFIFO && isQueueSelected {
goto ExitLoop
}
if !isQueueSelected {
selectedID = item.ID
selectedVersion = item.Version
recordForPeekIsFound = true
Expand All @@ -616,6 +630,7 @@ func (c *queueSDKClient[T]) Peek(ctx context.Context) (*PeekResult[T], error) {
break
}
}
ExitLoop:
if selectedID == "" {
return nil, &EmptyQueueError{}
}
Expand All @@ -624,13 +639,13 @@ func (c *queueSDKClient[T]) Peek(ctx context.Context) (*PeekResult[T], error) {
return nil, err
}
message.MarkAsPeeked(c.clock.Now())
// IMPORTANT
// please note, we are not updating top-level attribute `last_updated_timestamp` in order to avoid re-indexing the order
expr, err = expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Add(expression.Name("receive_count"), expression.Value(1)).
Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)).
// IMPORTANT
// please note, we are not updating top-level attribute `last_updated_timestamp` in order to avoid re-indexing the order
// Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)).
Set(expression.Name("queue_peek_timestamp"), expression.Value(message.PeekFromQueueTimestamp)).
Set(expression.Name("status"), expression.Value(message.Status))).
WithCondition(expression.Name("version").Equal(expression.Value(selectedVersion))).
Expand Down
Loading

0 comments on commit 443716d

Please sign in to comment.