Skip to content

Commit

Permalink
refactor: update package structure for better organization
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Oct 26, 2023
1 parent b98ecf5 commit cb52631
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 44 deletions.
21 changes: 11 additions & 10 deletions cmd/dynamomq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"os"
"strings"

"github.com/vvatanabe/dynamomq/cli"
"github.com/vvatanabe/dynamomq/sdk"
"github.com/vvatanabe/dynamomq/internal/cli"

"github.com/vvatanabe/dynamomq"
)

func main() {
Expand All @@ -25,9 +26,9 @@ func main() {
executionPath, _ := os.Getwd()
fmt.Printf("current directory is: [%s]\n", executionPath)

region := flag.String("region", sdk.AwsRegionDefault, "AWS region")
credentialsProfile := flag.String("profile", sdk.AwsProfileDefault, "AWS credentials profile")
tableName := flag.String("table", sdk.DefaultTableName, "AWS DynamoDB table name")
region := flag.String("region", dynamomq.AwsRegionDefault, "AWS region")
credentialsProfile := flag.String("profile", dynamomq.AwsProfileDefault, "AWS credentials profile")
tableName := flag.String("table", dynamomq.DefaultTableName, "AWS DynamoDB table name")
endpoint := flag.String("endpoint-url", "", "AWS DynamoDB base endpoint url")

flag.Parse()
Expand All @@ -38,11 +39,11 @@ func main() {
fmt.Printf("endpoint is: [%s]\n", *endpoint)
fmt.Println("")

client, err := sdk.NewQueueSDKClient[any](context.Background(),
sdk.WithAWSRegion(*region),
sdk.WithAWSCredentialsProfileName(*credentialsProfile),
sdk.WithTableName(*tableName),
sdk.WithAWSBaseEndpoint(*endpoint))
client, err := dynamomq.NewQueueSDKClient[any](context.Background(),
dynamomq.WithAWSRegion(*region),
dynamomq.WithAWSCredentialsProfileName(*credentialsProfile),
dynamomq.WithTableName(*tableName),
dynamomq.WithAWSBaseEndpoint(*endpoint))
if err != nil {
fmt.Printf("... AWS session could not be established!: %v\n", err)
} else {
Expand Down
36 changes: 17 additions & 19 deletions consumer/consumer.go → consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package consumer
package dynamomq

import (
"context"
Expand All @@ -8,36 +8,34 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/vvatanabe/dynamomq/sdk"
)

const (
defaultPollingInterval = time.Second * 10
defaultMaximumReceives = 0 // unlimited
)

type Option func(o *Options)
type ConsumerOption func(o *Options)

func WithPollingInterval(pollingInterval time.Duration) Option {
func WithPollingInterval(pollingInterval time.Duration) ConsumerOption {
return func(o *Options) {
o.PollingInterval = pollingInterval
}
}

func WithMaximumReceives(maximumReceives int) Option {
func WithMaximumReceives(maximumReceives int) ConsumerOption {
return func(o *Options) {
o.MaximumReceives = maximumReceives
}
}

func WithErrorLog(errorLog *log.Logger) Option {
func WithErrorLog(errorLog *log.Logger) ConsumerOption {
return func(o *Options) {
o.ErrorLog = errorLog
}
}

func WithOnShutdown(onShutdown []func()) Option {
func WithOnShutdown(onShutdown []func()) ConsumerOption {
return func(o *Options) {
o.OnShutdown = onShutdown
}
Expand All @@ -54,7 +52,7 @@ type Options struct {
OnShutdown []func()
}

func NewConsumer[T any](client sdk.QueueSDKClient[T], processor MessageProcessor[T], opts ...Option) *Consumer[T] {
func NewConsumer[T any](client QueueSDKClient[T], processor MessageProcessor[T], opts ...ConsumerOption) *Consumer[T] {
o := &Options{
PollingInterval: defaultPollingInterval,
MaximumReceives: defaultMaximumReceives,
Expand All @@ -71,18 +69,18 @@ func NewConsumer[T any](client sdk.QueueSDKClient[T], processor MessageProcessor
onShutdown: o.OnShutdown,
inShutdown: 0,
mu: sync.Mutex{},
activeMessages: make(map[*sdk.Message[T]]struct{}),
activeMessages: make(map[*Message[T]]struct{}),
activeMessagesWG: sync.WaitGroup{},
doneChan: make(chan struct{}),
}
}

type MessageProcessor[T any] interface {
Process(msg *sdk.Message[T]) error
Process(msg *Message[T]) error
}

type Consumer[T any] struct {
client sdk.QueueSDKClient[T]
client QueueSDKClient[T]
messageProcessor MessageProcessor[T]

pollingInterval time.Duration
Expand All @@ -92,7 +90,7 @@ type Consumer[T any] struct {

inShutdown int32
mu sync.Mutex
activeMessages map[*sdk.Message[T]]struct{}
activeMessages map[*Message[T]]struct{}
activeMessagesWG sync.WaitGroup
doneChan chan struct{}
}
Expand All @@ -118,13 +116,13 @@ func (c *Consumer[T]) Listen() error {
}
}

func (c *Consumer[T]) listen(ctx context.Context, msg *sdk.Message[T]) {
func (c *Consumer[T]) listen(ctx context.Context, msg *Message[T]) {
c.trackMessage(msg, true)
c.processMessage(ctx, msg)
c.trackMessage(msg, false)
}

func (c *Consumer[T]) shouldRetry(msg *sdk.Message[T]) bool {
func (c *Consumer[T]) shouldRetry(msg *Message[T]) bool {
if c.maximumReceives == 0 {
return true
}
Expand All @@ -134,7 +132,7 @@ func (c *Consumer[T]) shouldRetry(msg *sdk.Message[T]) bool {
return false
}

func (c *Consumer[T]) processMessage(ctx context.Context, msg *sdk.Message[T]) {
func (c *Consumer[T]) processMessage(ctx context.Context, msg *Message[T]) {
err := c.messageProcessor.Process(msg)
if err != nil {
if c.shouldRetry(msg) {
Expand All @@ -159,11 +157,11 @@ func (c *Consumer[T]) processMessage(ctx context.Context, msg *sdk.Message[T]) {
}
}

func (c *Consumer[T]) trackMessage(msg *sdk.Message[T], add bool) {
func (c *Consumer[T]) trackMessage(msg *Message[T], add bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.activeMessages == nil {
c.activeMessages = make(map[*sdk.Message[T]]struct{})
c.activeMessages = make(map[*Message[T]]struct{})
}
if add {
if !c.shuttingDown() {
Expand Down Expand Up @@ -239,7 +237,7 @@ func (c *Consumer[T]) logf(format string, args ...any) {

func isTemporary(err error) bool {
switch err.(type) {
case sdk.ConditionalCheckFailedError, sdk.DynamoDBAPIError, sdk.EmptyQueueError, sdk.IDNotProvidedError, sdk.IDNotFoundError:
case ConditionalCheckFailedError, DynamoDBAPIError, EmptyQueueError, IDNotProvidedError, IDNotFoundError:
return true
default:
return false
Expand Down
2 changes: 1 addition & 1 deletion sdk/error.go → error.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sdk
package dynamomq

import "fmt"

Expand Down
19 changes: 10 additions & 9 deletions cli/cli.go → internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"encoding/json"
"fmt"

"github.com/vvatanabe/dynamomq"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/vvatanabe/dynamomq/internal/clock"
"github.com/vvatanabe/dynamomq/internal/test"
"github.com/vvatanabe/dynamomq/sdk"
)

const (
Expand All @@ -22,8 +23,8 @@ type CLI struct {
CredentialsProfile string
TableName string

Client sdk.QueueSDKClient[any]
Message *sdk.Message[any]
Client dynamomq.QueueSDKClient[any]
Message *dynamomq.Message[any]
}

func (c *CLI) Run(ctx context.Context, command string, params []string) {
Expand Down Expand Up @@ -106,11 +107,11 @@ func (c *CLI) aws(ctx context.Context, params []string) {
if endpoint != "" {
c.BaseEndpoint = endpoint
}
client, err := sdk.NewQueueSDKClient[any](ctx,
sdk.WithAWSRegion(c.Region),
sdk.WithAWSCredentialsProfileName(profile),
sdk.WithTableName(c.TableName),
sdk.WithAWSBaseEndpoint(c.BaseEndpoint))
client, err := dynamomq.NewQueueSDKClient[any](ctx,
dynamomq.WithAWSRegion(c.Region),
dynamomq.WithAWSCredentialsProfileName(profile),
dynamomq.WithTableName(c.TableName),
dynamomq.WithAWSBaseEndpoint(c.BaseEndpoint))
if err != nil {
fmt.Printf(" ... AWS session could not be established!: %v\n", err)
} else {
Expand Down Expand Up @@ -189,7 +190,7 @@ func (c *CLI) enqueueTest(ctx context.Context, _ []string) {
fmt.Println("Enqueue message with IDs:")
ids := []string{"A-101", "A-202", "A-303", "A-404"}
for _, id := range ids {
message := sdk.NewDefaultMessage[test.MessageData](id, test.NewMessageData(id), clock.Now())
message := dynamomq.NewDefaultMessage[test.MessageData](id, test.NewMessageData(id), clock.Now())
item, err := message.MarshalMap()
if err != nil {
fmt.Printf("* ID: %s, error: %s\n", id, err)
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion sdk/message.go → message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sdk
package dynamomq

import (
"time"
Expand Down
2 changes: 1 addition & 1 deletion sdk/result.go → result.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sdk
package dynamomq

type Result struct {
ID string `json:"id"`
Expand Down
2 changes: 1 addition & 1 deletion sdk/sdk.go → sdk.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sdk
package dynamomq

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion sdk/sdk_test.go → sdk_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sdk
package dynamomq

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion sdk/stats.go → stats.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sdk
package dynamomq

// QueueStats represents the structure to store Queue depth statistics.
type QueueStats struct {
Expand Down

0 comments on commit cb52631

Please sign in to comment.