Skip to content

Commit

Permalink
docs: write godocs in dynamomq consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Dec 15, 2023
1 parent d01b225 commit 398ef3d
Showing 1 changed file with 60 additions and 17 deletions.
77 changes: 60 additions & 17 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,69 +20,100 @@ const (
defaultConcurrency = 3
)

// ErrConsumerClosed is an error that indicates the Consumer has been closed.
// This error is returned when operations are attempted on a Consumer that has already been shut down.
var ErrConsumerClosed = errors.New("DynamoMQ: Consumer closed")

// ConsumerOptions contains configuration options for a Consumer instance.
// It allows customization of polling intervals, concurrency levels, visibility timeouts, and more.
//
// Consumer functions for setting various ConsumerOptions.
type ConsumerOptions struct {
// PollingInterval specifies the time interval at which the Consumer polls the DynamoDB queue for new messages.
PollingInterval time.Duration
// Concurrency sets the number of concurrent message processing workers.
Concurrency int
// MaximumReceives defines the maximum number of times a message can be delivered.
MaximumReceives int
// VisibilityTimeout sets the duration (in seconds) a message remains invisible in the queue after being received.
VisibilityTimeout int
// RetryInterval defines the time interval (in seconds) before a failed message is retried.
RetryInterval int
// QueueType determines the type of queue (STANDARD or DLQ) the Consumer will operate on.
QueueType QueueType
// ErrorLog is an optional logger for errors. If nil, the standard logger is used.
ErrorLog *log.Logger
// OnShutdown is a slice of functions called when the Consumer is shutting down.
OnShutdown []func()
}

// WithPollingInterval sets the polling interval for the Consumer.
// This function configures the time interval at which the Consumer polls the DynamoDB queue for new messages.
func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.PollingInterval = pollingInterval
}
}

// WithConcurrency sets the number of concurrent workers for processing messages in the Consumer.
// This function determines how many messages can be processed at the same time.
func WithConcurrency(concurrency int) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.Concurrency = concurrency
}
}

// WithMaximumReceives sets the maximum number of times a message can be delivered to the Consumer.
// This function configures the limit on how many times a message will be attempted for delivery
// before being considered a failure or moved to a Dead Letter Queue, if applicable.
func WithMaximumReceives(maximumReceives int) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.MaximumReceives = maximumReceives
}
}

// WithVisibilityTimeout sets the visibility timeout for messages in the Consumer.
// This function configures the duration (in seconds) a message remains invisible in the queue after being received.
func WithVisibilityTimeout(sec int) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.VisibilityTimeout = sec
}
}

// WithRetryInterval sets the retry interval for failed messages in the Consumer.
// This function specifies the time interval (in seconds) before a failed message is retried.
func WithRetryInterval(sec int) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.RetryInterval = sec
}
}

// WithQueueType sets the type of queue (STANDARD or DLQ) for the Consumer.
// This function allows specification of the queue type the Consumer will operate on.
func WithQueueType(queueType QueueType) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.QueueType = queueType
}
}

// WithErrorLog sets a custom logger for the Consumer.
// This function configures an optional logger for errors. If nil, the standard logger is used.
func WithErrorLog(errorLog *log.Logger) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.ErrorLog = errorLog
}
}

// WithOnShutdown adds functions to be called during the Consumer's shutdown process.
// This function appends to the list of callbacks executed when the Consumer is shutting down.
func WithOnShutdown(onShutdown []func()) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.OnShutdown = onShutdown
}
}

type ConsumerOptions struct {
PollingInterval time.Duration
Concurrency int
MaximumReceives int
VisibilityTimeout int
RetryInterval int
QueueType QueueType
// errorLog specifies an optional logger for errors accepting
// connections, unexpected behavior from handlers, and
// underlying FileSystem errors.
// If nil, logging is done via the log package's standard logger.
ErrorLog *log.Logger
OnShutdown []func()
}

// NewConsumer creates a new Consumer instance with the specified client, message processor, and options.
// It configures the Consumer with default values which can be overridden by the provided option functions.
func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ...func(o *ConsumerOptions)) *Consumer[T] {
o := &ConsumerOptions{
PollingInterval: defaultPollingInterval,
Expand Down Expand Up @@ -114,16 +145,27 @@ func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ..
}
}

// MessageProcessor is an interface defining a method to process messages of a generic type T.
// It is used in the context of consuming messages from a DynamoDB-based queue.
type MessageProcessor[T any] interface {
// Process handles the processing of a message.
// It takes a pointer to a Message of type T and returns an error if the processing fails.
Process(msg *Message[T]) error
}

// MessageProcessorFunc is a functional type that implements the MessageProcessor interface.
// It allows using a function as a MessageProcessor.
type MessageProcessorFunc[T any] func(msg *Message[T]) error

// Process calls the MessageProcessorFunc itself to process the message.
// It enables the function type to adhere to the MessageProcessor interface.
func (f MessageProcessorFunc[T]) Process(msg *Message[T]) error {
return f(msg)
}

// Consumer is a struct responsible for consuming messages from a DynamoDB-based queue.
// It supports generic message types and includes settings such as concurrency, polling intervals, and more.
// Note: To create a new instance of Consumer, it is necessary to use the NewConsumer function.
type Consumer[T any] struct {
client Client[T]
messageProcessor MessageProcessor[T]
Expand All @@ -143,8 +185,8 @@ type Consumer[T any] struct {
doneChan chan struct{}
}

var ErrConsumerClosed = errors.New("DynamoMQ: Consumer closed")

// StartConsuming starts the message consumption process, polling the queue for messages and processing them.
// The method handles message retrieval, processing, error handling, retries, and moving messages to the DLQ if necessary.
func (c *Consumer[T]) StartConsuming() error {
msgChan := make(chan *Message[T], c.concurrency)
defer close(msgChan)
Expand Down Expand Up @@ -256,6 +298,7 @@ func (c *Consumer[T]) shuttingDown() bool {
return atomic.LoadInt32(&c.inShutdown) != 0
}

// Shutdown gracefully shuts down the Consumer, stopping the message consumption and executing any registered shutdown callbacks.
func (c *Consumer[T]) Shutdown(ctx context.Context) error {
atomic.StoreInt32(&c.inShutdown, 1)

Expand Down

0 comments on commit 398ef3d

Please sign in to comment.