Skip to content

Commit

Permalink
Created blocking Ack/Nack (#10)
Browse files Browse the repository at this point in the history
* Created blocking Ack/Nack, no tests yet

* changelog

* I'm not sure why these made it in
  • Loading branch information
mattdeak authored Jul 12, 2024
1 parent ffb4605 commit a7c27ed
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 15 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ go.work.sum
# databases
*.db
*.db-shm
*.wal
*.db-wal
*.sqlite-journal

5 changes: 3 additions & 2 deletions ack_utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gopq

import (
"context"
"database/sql"
"fmt"
"time"
Expand All @@ -21,8 +22,8 @@ const (
`
)

func nackImpl(db *sql.DB, tableName string, id int64, opts AckOpts) error {
tx, err := db.Begin()
func nackImpl(ctx context.Context, db *sql.DB, tableName string, id int64, opts AckOpts) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion ackopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import "time"

// AckOpts represents the queue-level settings for how acknowledgement
// of messages is handled.

const (
InfiniteRetries = -1
)
Expand Down
54 changes: 52 additions & 2 deletions blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"
)

type TryDequeuer interface {
type tryDequeuer interface {
TryDequeueCtx(ctx context.Context) (Msg, error)
}

Expand Down Expand Up @@ -42,7 +42,7 @@ func handleDequeueResult(id int64, item []byte, err error) (Msg, error) {
}

// dequeueBlocking blocks until an item is available to dequeue, or the context is cancelled.
func dequeueBlocking(ctx context.Context, dequeuer TryDequeuer, pollInterval time.Duration, notifyChan chan struct{}) (Msg, error) {
func dequeueBlocking(ctx context.Context, dequeuer tryDequeuer, pollInterval time.Duration, notifyChan chan struct{}) (Msg, error) {
for {
item, err := dequeuer.TryDequeueCtx(ctx)
if err == nil {
Expand Down Expand Up @@ -104,3 +104,53 @@ func enqueueBlocking(ctx context.Context, enqueuer tryEnqueuer, item []byte, pol
}
}
}

type tryAcker interface {
TryAckCtx(ctx context.Context, id int64) error
TryNackCtx(ctx context.Context, id int64) error
}

func ackBlocking(ctx context.Context, acker tryAcker, id int64, pollInterval time.Duration) error {
for {
err := acker.TryAckCtx(ctx, id)

if err == nil {
return nil
}

// We return on all errors except a locked DB
// which we just wait on by trying again.
if !strings.Contains(err.Error(), "database table is locked") {
return err
}

select {
case <-ctx.Done():
return context.Canceled
case <-time.After(pollInterval):
// Continue to next attempt
}
}
}

func nackBlocking(ctx context.Context, acker tryAcker, id int64, pollInterval time.Duration) error {
for {
err := acker.TryNackCtx(ctx, id)
if err == nil {
return nil
}

// We return on all errors except a locked DB
// which we just wait on by trying again.
if !strings.Contains(err.Error(), "database table is locked") {
return err
}

select {
case <-ctx.Done():
return context.Canceled
case <-time.After(pollInterval):
// Continue to next attempt
}
}
}
10 changes: 10 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Changelog
## Unrealeased - 2024-07-11
### Added
- Added TryAck/TryNack and context-supported Ack/Nack.

### Fixed
- Ack/Nack will now block and retry if DB table is locked.

## [0.2.0]
Considered first release.
Binary file removed examples/unique_queue_example/unique_ack_queue.db
Binary file not shown.
Binary file removed examples/unique_queue_example/unique_queue.db
Binary file not shown.
76 changes: 67 additions & 9 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,21 @@ func (q *Queue) Enqueue(item []byte) error {
return q.EnqueueCtx(context.Background(), item)
}

// EnqueueCtx adds an item to the queue.
// It returns an error if the operation fails or the context is cancelled.
func (q *Queue) EnqueueCtx(ctx context.Context, item []byte) error {
return enqueueBlocking(ctx, q, item, defaultPollInterval)
}

// TryEnqueue attempts to add an item to the queue.
// It returns immediately, even if the queue is empty.
// It uses a background context internally.
func (q *Queue) TryEnqueue(item []byte) error {
return q.TryEnqueueCtx(context.Background(), item)
}

// TryEnqueueCtx attempts to add an item to the queue.
// This is non-blocking, and will return immediately.
func (q *Queue) TryEnqueueCtx(ctx context.Context, item []byte) error {
_, err := q.db.ExecContext(ctx, q.queries.enqueue, item)
if err != nil {
Expand All @@ -150,12 +157,17 @@ func (q *Queue) DequeueCtx(ctx context.Context) (Msg, error) {
return dequeueBlocking(ctx, q, q.pollInterval, q.notifyChan)
}

// TryDequeue attempts to remove and return the next item from the queue.
// It returns immediately, even if the queue is empty.
// It uses a background context internally.
func (q *Queue) TryDequeue() (Msg, error) {
return q.TryDequeueCtx(context.Background())
}

// TryDequeueCtx attempts to remove and return the next item from the queue.
// This is non-blocking, and will return immediately.
func (q *Queue) TryDequeueCtx(ctx context.Context) (Msg, error) {
row := q.db.QueryRow(q.queries.tryDequeue)
row := q.db.QueryRowContext(ctx, q.queries.tryDequeue)
var id int64
var item []byte
err := row.Scan(&id, &item)
Expand All @@ -171,6 +183,8 @@ func (q *Queue) Len() (int, error) {
return count, err
}

// Len returns the number of items in the queue.
// It returns the count and any error encountered during the operation.
func (q *AcknowledgeableQueue) Len() (int, error) {
row := q.db.QueryRow(q.queries.len, q.now())
var count int
Expand All @@ -180,11 +194,62 @@ func (q *AcknowledgeableQueue) Len() (int, error) {

// Ack acknowledges that an item has been successfully processed.
// It takes the ID of the message to acknowledge and returns an error if the operation fails.
func (q *AcknowledgeableQueue) Ack(id int64) error {
func (q *AcknowledgeableQueue) TryAck(id int64) error {
_, err := q.db.Exec(q.ackQueries.ack, id, q.now())
return err
}

// TryAckCtx acknowledges that an item has been successfully processed.
// It takes the ID of the message to acknowledge and returns an error if the operation fails.
// This is non-blocking, and will return immediately.
func (q *AcknowledgeableQueue) TryAckCtx(ctx context.Context, id int64) error {
_, err := q.db.ExecContext(ctx, q.ackQueries.ack, id, q.now())
return err
}

// Ack acknowledges that an item has been successfully processed.
// It takes the ID of the message to acknowledge and returns an error if the operation fails.
// It uses a background context internally.
func (q *AcknowledgeableQueue) Ack(id int64) error {
return q.AckCtx(context.Background(), id)
}

// AckCtx acknowledges that an item has been successfully processed.
// It takes the ID of the message to acknowledge and returns an error if the operation fails.
// If the db is locked, this will block until the db is unlocked.
func (q *AcknowledgeableQueue) AckCtx(ctx context.Context, id int64) error {
return ackBlocking(ctx, q, id, q.pollInterval)
}

// TryNack indicates that an item processing has failed and should be requeued.
// It takes the ID of the message to negative acknowledge.
// This is non-blocking, and will return immediately.
func (q *AcknowledgeableQueue) TryNack(id int64) error {
return nackImpl(context.Background(), q.db, q.name, id, q.AckOpts)
}

// TryNackCtx indicates that an item processing has failed and should be requeued.
// It takes the ID of the message to negative acknowledge.
// This is non-blocking, and will return immediately.
func (q *AcknowledgeableQueue) TryNackCtx(ctx context.Context, id int64) error {
return nackImpl(ctx, q.db, q.name, id, q.AckOpts)
}

// Nack indicates that an item processing has failed and should be requeued.
// It takes the ID of the message to negative acknowledge.
// Returns an error if the operation fails or the message doesn't exist.
// It uses a background context internally.
func (q *AcknowledgeableQueue) Nack(id int64) error {
return q.NackCtx(context.Background(), id)
}

// NackCtx indicates that an item processing has failed and should be requeued.
// It takes the ID of the message to negative acknowledge and returns an error if the operation fails.
// If the db is locked, this will block until the db is unlocked.
func (q *AcknowledgeableQueue) NackCtx(ctx context.Context, id int64) error {
return nackBlocking(ctx, q, id, q.pollInterval)
}

// Dequeue removes and returns the next item from the queue.
// It blocks if the queue is empty until an item becomes available.
// It uses a background context internally.
Expand Down Expand Up @@ -216,13 +281,6 @@ func (q *AcknowledgeableQueue) TryDequeueCtx(ctx context.Context) (Msg, error) {
return handleDequeueResult(id, item, err)
}

// Nack indicates that an item processing has failed and should be requeued.
// It takes the ID of the message to negative acknowledge.
// Returns an error if the operation fails or the message doesn't exist.
func (q *AcknowledgeableQueue) Nack(id int64) error {
return nackImpl(q.db, q.name, id, q.AckOpts)
}

// ExpireAck expires the acknowledgement deadline for an item,
// which requeues it to the front of the queue.
// It takes the ID of the message to expire the acknowledgement deadline for.
Expand Down

0 comments on commit a7c27ed

Please sign in to comment.