diff --git a/consumer.go b/consumer.go index 14b5f36..2083197 100644 --- a/consumer.go +++ b/consumer.go @@ -20,69 +20,100 @@ const ( defaultConcurrency = 3 ) +// ErrConsumerClosed is an error that indicates the Consumer has been closed. +// This error is returned when operations are attempted on a Consumer that has already been shut down. +var ErrConsumerClosed = errors.New("DynamoMQ: Consumer closed") + +// ConsumerOptions contains configuration options for a Consumer instance. +// It allows customization of polling intervals, concurrency levels, visibility timeouts, and more. +// +// Consumer functions for setting various ConsumerOptions. +type ConsumerOptions struct { + // PollingInterval specifies the time interval at which the Consumer polls the DynamoDB queue for new messages. + PollingInterval time.Duration + // Concurrency sets the number of concurrent message processing workers. + Concurrency int + // MaximumReceives defines the maximum number of times a message can be delivered. + MaximumReceives int + // VisibilityTimeout sets the duration (in seconds) a message remains invisible in the queue after being received. + VisibilityTimeout int + // RetryInterval defines the time interval (in seconds) before a failed message is retried. + RetryInterval int + // QueueType determines the type of queue (STANDARD or DLQ) the Consumer will operate on. + QueueType QueueType + // ErrorLog is an optional logger for errors. If nil, the standard logger is used. + ErrorLog *log.Logger + // OnShutdown is a slice of functions called when the Consumer is shutting down. + OnShutdown []func() +} + +// WithPollingInterval sets the polling interval for the Consumer. +// This function configures the time interval at which the Consumer polls the DynamoDB queue for new messages. func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions) { return func(o *ConsumerOptions) { o.PollingInterval = pollingInterval } } +// WithConcurrency sets the number of concurrent workers for processing messages in the Consumer. +// This function determines how many messages can be processed at the same time. func WithConcurrency(concurrency int) func(o *ConsumerOptions) { return func(o *ConsumerOptions) { o.Concurrency = concurrency } } +// WithMaximumReceives sets the maximum number of times a message can be delivered to the Consumer. +// This function configures the limit on how many times a message will be attempted for delivery +// before being considered a failure or moved to a Dead Letter Queue, if applicable. func WithMaximumReceives(maximumReceives int) func(o *ConsumerOptions) { return func(o *ConsumerOptions) { o.MaximumReceives = maximumReceives } } +// WithVisibilityTimeout sets the visibility timeout for messages in the Consumer. +// This function configures the duration (in seconds) a message remains invisible in the queue after being received. func WithVisibilityTimeout(sec int) func(o *ConsumerOptions) { return func(o *ConsumerOptions) { o.VisibilityTimeout = sec } } +// WithRetryInterval sets the retry interval for failed messages in the Consumer. +// This function specifies the time interval (in seconds) before a failed message is retried. func WithRetryInterval(sec int) func(o *ConsumerOptions) { return func(o *ConsumerOptions) { o.RetryInterval = sec } } +// WithQueueType sets the type of queue (STANDARD or DLQ) for the Consumer. +// This function allows specification of the queue type the Consumer will operate on. func WithQueueType(queueType QueueType) func(o *ConsumerOptions) { return func(o *ConsumerOptions) { o.QueueType = queueType } } +// WithErrorLog sets a custom logger for the Consumer. +// This function configures an optional logger for errors. If nil, the standard logger is used. func WithErrorLog(errorLog *log.Logger) func(o *ConsumerOptions) { return func(o *ConsumerOptions) { o.ErrorLog = errorLog } } +// WithOnShutdown adds functions to be called during the Consumer's shutdown process. +// This function appends to the list of callbacks executed when the Consumer is shutting down. func WithOnShutdown(onShutdown []func()) func(o *ConsumerOptions) { return func(o *ConsumerOptions) { o.OnShutdown = onShutdown } } -type ConsumerOptions struct { - PollingInterval time.Duration - Concurrency int - MaximumReceives int - VisibilityTimeout int - RetryInterval int - QueueType QueueType - // errorLog specifies an optional logger for errors accepting - // connections, unexpected behavior from handlers, and - // underlying FileSystem errors. - // If nil, logging is done via the log package's standard logger. - ErrorLog *log.Logger - OnShutdown []func() -} - +// NewConsumer creates a new Consumer instance with the specified client, message processor, and options. +// It configures the Consumer with default values which can be overridden by the provided option functions. func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ...func(o *ConsumerOptions)) *Consumer[T] { o := &ConsumerOptions{ PollingInterval: defaultPollingInterval, @@ -114,16 +145,27 @@ func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts .. } } +// MessageProcessor is an interface defining a method to process messages of a generic type T. +// It is used in the context of consuming messages from a DynamoDB-based queue. type MessageProcessor[T any] interface { + // Process handles the processing of a message. + // It takes a pointer to a Message of type T and returns an error if the processing fails. Process(msg *Message[T]) error } +// MessageProcessorFunc is a functional type that implements the MessageProcessor interface. +// It allows using a function as a MessageProcessor. type MessageProcessorFunc[T any] func(msg *Message[T]) error +// Process calls the MessageProcessorFunc itself to process the message. +// It enables the function type to adhere to the MessageProcessor interface. func (f MessageProcessorFunc[T]) Process(msg *Message[T]) error { return f(msg) } +// Consumer is a struct responsible for consuming messages from a DynamoDB-based queue. +// It supports generic message types and includes settings such as concurrency, polling intervals, and more. +// Note: To create a new instance of Consumer, it is necessary to use the NewConsumer function. type Consumer[T any] struct { client Client[T] messageProcessor MessageProcessor[T] @@ -143,8 +185,8 @@ type Consumer[T any] struct { doneChan chan struct{} } -var ErrConsumerClosed = errors.New("DynamoMQ: Consumer closed") - +// StartConsuming starts the message consumption process, polling the queue for messages and processing them. +// The method handles message retrieval, processing, error handling, retries, and moving messages to the DLQ if necessary. func (c *Consumer[T]) StartConsuming() error { msgChan := make(chan *Message[T], c.concurrency) defer close(msgChan) @@ -256,6 +298,7 @@ func (c *Consumer[T]) shuttingDown() bool { return atomic.LoadInt32(&c.inShutdown) != 0 } +// Shutdown gracefully shuts down the Consumer, stopping the message consumption and executing any registered shutdown callbacks. func (c *Consumer[T]) Shutdown(ctx context.Context) error { atomic.StoreInt32(&c.inShutdown, 1)