From a7c27edf73f04725652b2100274901fbf822037a Mon Sep 17 00:00:00 2001
From: Matthew Deakos <mwoolnerdeakos@gmail.com>
Date: Thu, 11 Jul 2024 21:50:55 -0300
Subject: [PATCH] Created blocking Ack/Nack (#10)

* Created blocking Ack/Nack, no tests yet

* changelog

* I'm not sure why these made it in
---
 .gitignore                                    |   3 +-
 ack_utils.go                                  |   5 +-
 ackopts.go                                    |   1 -
 blocking.go                                   |  54 ++++++++++++-
 changelog.md                                  |  10 +++
 .../unique_queue_example/unique_ack_queue.db  | Bin 24576 -> 0 bytes
 examples/unique_queue_example/unique_queue.db | Bin 16384 -> 0 bytes
 queue.go                                      |  76 +++++++++++++++---
 8 files changed, 134 insertions(+), 15 deletions(-)
 create mode 100644 changelog.md
 delete mode 100644 examples/unique_queue_example/unique_ack_queue.db
 delete mode 100644 examples/unique_queue_example/unique_queue.db

diff --git a/.gitignore b/.gitignore
index 3bcbf66..3b6d511 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,5 +24,6 @@ go.work.sum
 # databases
 *.db
 *.db-shm
-*.wal
+*.db-wal
 *.sqlite-journal
+
diff --git a/ack_utils.go b/ack_utils.go
index 56b3be2..4e8011c 100644
--- a/ack_utils.go
+++ b/ack_utils.go
@@ -1,6 +1,7 @@
 package gopq
 
 import (
+	"context"
 	"database/sql"
 	"fmt"
 	"time"
@@ -21,8 +22,8 @@ const (
 	`
 )
 
-func nackImpl(db *sql.DB, tableName string, id int64, opts AckOpts) error {
-	tx, err := db.Begin()
+func nackImpl(ctx context.Context, db *sql.DB, tableName string, id int64, opts AckOpts) error {
+	tx, err := db.BeginTx(ctx, nil)
 	if err != nil {
 		return fmt.Errorf("failed to begin transaction: %w", err)
 	}
diff --git a/ackopts.go b/ackopts.go
index 9316dc3..c3e847a 100644
--- a/ackopts.go
+++ b/ackopts.go
@@ -4,7 +4,6 @@ import "time"
 
 // AckOpts represents the queue-level settings for how acknowledgement
 // of messages is handled.
-
 const (
 	InfiniteRetries = -1
 )
diff --git a/blocking.go b/blocking.go
index c75fe13..af1109a 100644
--- a/blocking.go
+++ b/blocking.go
@@ -8,7 +8,7 @@ import (
 	"time"
 )
 
-type TryDequeuer interface {
+type tryDequeuer interface {
 	TryDequeueCtx(ctx context.Context) (Msg, error)
 }
 
@@ -42,7 +42,7 @@ func handleDequeueResult(id int64, item []byte, err error) (Msg, error) {
 }
 
 // dequeueBlocking blocks until an item is available to dequeue, or the context is cancelled.
-func dequeueBlocking(ctx context.Context, dequeuer TryDequeuer, pollInterval time.Duration, notifyChan chan struct{}) (Msg, error) {
+func dequeueBlocking(ctx context.Context, dequeuer tryDequeuer, pollInterval time.Duration, notifyChan chan struct{}) (Msg, error) {
 	for {
 		item, err := dequeuer.TryDequeueCtx(ctx)
 		if err == nil {
@@ -104,3 +104,53 @@ func enqueueBlocking(ctx context.Context, enqueuer tryEnqueuer, item []byte, pol
 		}
 	}
 }
+
+type tryAcker interface {
+	TryAckCtx(ctx context.Context, id int64) error
+	TryNackCtx(ctx context.Context, id int64) error
+}
+
+func ackBlocking(ctx context.Context, acker tryAcker, id int64, pollInterval time.Duration) error {
+	for {
+		err := acker.TryAckCtx(ctx, id)
+
+		if err == nil {
+			return nil
+		}
+
+		// We return on all errors except a locked DB
+		// which we just wait on by trying again.
+		if !strings.Contains(err.Error(), "database table is locked") {
+			return err
+		}
+
+		select {
+		case <-ctx.Done():
+			return context.Canceled
+		case <-time.After(pollInterval):
+			// Continue to next attempt
+		}
+	}
+}
+
+func nackBlocking(ctx context.Context, acker tryAcker, id int64, pollInterval time.Duration) error {
+	for {
+		err := acker.TryNackCtx(ctx, id)
+		if err == nil {
+			return nil
+		}
+
+		// We return on all errors except a locked DB
+		// which we just wait on by trying again.
+		if !strings.Contains(err.Error(), "database table is locked") {
+			return err
+		}
+
+		select {
+		case <-ctx.Done():
+			return context.Canceled
+		case <-time.After(pollInterval):
+			// Continue to next attempt
+		}
+	}
+}
diff --git a/changelog.md b/changelog.md
new file mode 100644
index 0000000..ba0985e
--- /dev/null
+++ b/changelog.md
@@ -0,0 +1,10 @@
+# Changelog
+## Unrealeased - 2024-07-11
+### Added
+- Added TryAck/TryNack and context-supported Ack/Nack.
+
+### Fixed
+- Ack/Nack will now block and retry if DB table is locked.
+
+## [0.2.0]
+Considered first release.
\ No newline at end of file
diff --git a/examples/unique_queue_example/unique_ack_queue.db b/examples/unique_queue_example/unique_ack_queue.db
deleted file mode 100644
index 05bc065ae1a55d267776c98aee339e588886d719..0000000000000000000000000000000000000000
GIT binary patch
literal 0
HcmV?d00001

literal 24576
zcmeI)zi!$<90&08pM(&JTnAKCMU`$!K=~uvfRfN94sjw>FaeC(43$NLE71f<z*Onj
zs!Q_}eSt3R+z06kRNX3NX@(9xI|K|?qz+w>zLvq~`}4cIkMUq5yH+W-T!+2t_Sy}X
zrO5*#ie#5DLWsye^ZawSNIW6m?eMJ-NZ*$f$>y7nF?vJhVo%7@DgC<iDE65ru|NO<
z5P$##AOHafKmY;|m?eSpxkzF|72eTSr|Dd_nwNIt<kW6DjpkXa;|w~ji-F^3@hwkj
z^ZBZwTLvqb`^IxNU7wXr7H*RahD^PdBZ*bsV|3Kxd9Qoo^!rXT+;lF~-Kg?CeT~ZP
zhMPK)<`Zjc!m-<UdFJ#l&iGZL-FNs@I=sWlU3u7(rSBCDHdUDHG}_KOZ<%^;2$6)U
z3Lmn5%<zcAIpRd*a5+mZMx)VIlbc$`p;2W=)j~<H9<!&$G1Kc-xnS}LCBwAV{fe&B
zW_!i*9y7}pGwa2opY3$~Bh=*6W>%qO)GWPp#P*E?y<W6fzFw{J0QRWFZxgf{H}Z>u
zhis~-=eWJM_DOfpaYsYG>qqe$*UdtuZX~&OJLMgweAzrG7V;J=9Gc~-!Do@mi}Q(R
zszCf7!BD#~aJ&A#9Y(U%aGrN9zu=v0G5U!}beYf>beVperPB{%g#ZK~009U<00Izz
z00bZa0SNpL0ex4Dka(I@7Wf6drYTxxL)qdh)0CXDmCFXR)SX;L%PE=7Y$l_sX;s@$
z(m6%Z1wTT~i=h5<g!GP@-d476L-3hA=7S(!i1c&_T^`2|72ZEWEvsb}J{xa{i@XDg
z{vh;*{^A=f5P$##AOHafKmY;|fB*y_009WhtiXa47rYw?@-U%^5h)(^?n=b?oCNxl
z@D&RLAOHafKmY;|fB*y_009U<00J{E@KBIcA^aRb{PzF<YeN6f-}HLMCyC)h00Izz
z00bZa0SG_<0uX=z1RyXJkR>9>-j^szBzk-Oe?{nb`h{Kr2?7v+00bZa0SG_<0uX=z
x1Rwx`|1Gd0RfO>){i{+fu*ZV06bL@BFHg3bs26zkfVd*nqT}GfY7_4l_zM*#PV4{x

diff --git a/examples/unique_queue_example/unique_queue.db b/examples/unique_queue_example/unique_queue.db
deleted file mode 100644
index 3fd3aad93c72700b10c2b51ee21a9d095775b7d0..0000000000000000000000000000000000000000
GIT binary patch
literal 0
HcmV?d00001

literal 16384
zcmeI%O;5rw7zglf5Huvb+_*j$6BdXGF`m3QW+98CgV7~&TBgjf1V)r$JgXN!lV8lE
z1%)^aA3*YN(soVTXYFsjG#d__IF0Cj{x}O$I%8F)>+F(<F{Z1WS9y~(^|0IC^zEy4
zcKY;Qs(i9sxy8z_l@C=RK>z{}fB*y_009U<00Izzz<&_v=L)q(LmQ>x?KE0EOx5i%
zSVRxYD7o8h<}Ht#l9M#s4yT>5^(34{O*PYaIoFPAhM~QktYa>dSVap|R%I*O(lZgW
zeN->fuN05T7Ls?lM}5!kncj%5`G`zky0%bZdR)k6c19Y_f=RfTkmNU#gsZaeIDZFG
z+i}}_`lDpM@NuAS)6(wop)`Aay5b$vcO+T9=c)Gw+n#K2U)Tem*A?maLJV>RS+3|f
zwk65#3fJS?B^wudq2?MIi<5Em{G%H#)A_m`WN~&ObLyxauZ~-3Ra9365(FRs0SG_<
z0uX=z1Rwwb2tWV=hbmB9Z4CGSL;bwi7X%;x0SG_<0uX=z1Rwwb2tZ&hfdBtE2oQh(
W1Rwwb2tWV=5P$##AOL~G7x)6&V~`O5

diff --git a/queue.go b/queue.go
index 8b7c124..f639ac0 100644
--- a/queue.go
+++ b/queue.go
@@ -117,14 +117,21 @@ func (q *Queue) Enqueue(item []byte) error {
 	return q.EnqueueCtx(context.Background(), item)
 }
 
+// EnqueueCtx adds an item to the queue.
+// It returns an error if the operation fails or the context is cancelled.
 func (q *Queue) EnqueueCtx(ctx context.Context, item []byte) error {
 	return enqueueBlocking(ctx, q, item, defaultPollInterval)
 }
 
+// TryEnqueue attempts to add an item to the queue.
+// It returns immediately, even if the queue is empty.
+// It uses a background context internally.
 func (q *Queue) TryEnqueue(item []byte) error {
 	return q.TryEnqueueCtx(context.Background(), item)
 }
 
+// TryEnqueueCtx attempts to add an item to the queue.
+// This is non-blocking, and will return immediately.
 func (q *Queue) TryEnqueueCtx(ctx context.Context, item []byte) error {
 	_, err := q.db.ExecContext(ctx, q.queries.enqueue, item)
 	if err != nil {
@@ -150,12 +157,17 @@ func (q *Queue) DequeueCtx(ctx context.Context) (Msg, error) {
 	return dequeueBlocking(ctx, q, q.pollInterval, q.notifyChan)
 }
 
+// TryDequeue attempts to remove and return the next item from the queue.
+// It returns immediately, even if the queue is empty.
+// It uses a background context internally.
 func (q *Queue) TryDequeue() (Msg, error) {
 	return q.TryDequeueCtx(context.Background())
 }
 
+// TryDequeueCtx attempts to remove and return the next item from the queue.
+// This is non-blocking, and will return immediately.
 func (q *Queue) TryDequeueCtx(ctx context.Context) (Msg, error) {
-	row := q.db.QueryRow(q.queries.tryDequeue)
+	row := q.db.QueryRowContext(ctx, q.queries.tryDequeue)
 	var id int64
 	var item []byte
 	err := row.Scan(&id, &item)
@@ -171,6 +183,8 @@ func (q *Queue) Len() (int, error) {
 	return count, err
 }
 
+// Len returns the number of items in the queue.
+// It returns the count and any error encountered during the operation.
 func (q *AcknowledgeableQueue) Len() (int, error) {
 	row := q.db.QueryRow(q.queries.len, q.now())
 	var count int
@@ -180,11 +194,62 @@ func (q *AcknowledgeableQueue) Len() (int, error) {
 
 // Ack acknowledges that an item has been successfully processed.
 // It takes the ID of the message to acknowledge and returns an error if the operation fails.
-func (q *AcknowledgeableQueue) Ack(id int64) error {
+func (q *AcknowledgeableQueue) TryAck(id int64) error {
 	_, err := q.db.Exec(q.ackQueries.ack, id, q.now())
 	return err
 }
 
+// TryAckCtx acknowledges that an item has been successfully processed.
+// It takes the ID of the message to acknowledge and returns an error if the operation fails.
+// This is non-blocking, and will return immediately.
+func (q *AcknowledgeableQueue) TryAckCtx(ctx context.Context, id int64) error {
+	_, err := q.db.ExecContext(ctx, q.ackQueries.ack, id, q.now())
+	return err
+}
+
+// Ack acknowledges that an item has been successfully processed.
+// It takes the ID of the message to acknowledge and returns an error if the operation fails.
+// It uses a background context internally.
+func (q *AcknowledgeableQueue) Ack(id int64) error {
+	return q.AckCtx(context.Background(), id)
+}
+
+// AckCtx acknowledges that an item has been successfully processed.
+// It takes the ID of the message to acknowledge and returns an error if the operation fails.
+// If the db is locked, this will block until the db is unlocked.
+func (q *AcknowledgeableQueue) AckCtx(ctx context.Context, id int64) error {
+	return ackBlocking(ctx, q, id, q.pollInterval)
+}
+
+// TryNack indicates that an item processing has failed and should be requeued.
+// It takes the ID of the message to negative acknowledge.
+// This is non-blocking, and will return immediately.
+func (q *AcknowledgeableQueue) TryNack(id int64) error {
+	return nackImpl(context.Background(), q.db, q.name, id, q.AckOpts)
+}
+
+// TryNackCtx indicates that an item processing has failed and should be requeued.
+// It takes the ID of the message to negative acknowledge.
+// This is non-blocking, and will return immediately.
+func (q *AcknowledgeableQueue) TryNackCtx(ctx context.Context, id int64) error {
+	return nackImpl(ctx, q.db, q.name, id, q.AckOpts)
+}
+
+// Nack indicates that an item processing has failed and should be requeued.
+// It takes the ID of the message to negative acknowledge.
+// Returns an error if the operation fails or the message doesn't exist.
+// It uses a background context internally.
+func (q *AcknowledgeableQueue) Nack(id int64) error {
+	return q.NackCtx(context.Background(), id)
+}
+
+// NackCtx indicates that an item processing has failed and should be requeued.
+// It takes the ID of the message to negative acknowledge and returns an error if the operation fails.
+// If the db is locked, this will block until the db is unlocked.
+func (q *AcknowledgeableQueue) NackCtx(ctx context.Context, id int64) error {
+	return nackBlocking(ctx, q, id, q.pollInterval)
+}
+
 // Dequeue removes and returns the next item from the queue.
 // It blocks if the queue is empty until an item becomes available.
 // It uses a background context internally.
@@ -216,13 +281,6 @@ func (q *AcknowledgeableQueue) TryDequeueCtx(ctx context.Context) (Msg, error) {
 	return handleDequeueResult(id, item, err)
 }
 
-// Nack indicates that an item processing has failed and should be requeued.
-// It takes the ID of the message to negative acknowledge.
-// Returns an error if the operation fails or the message doesn't exist.
-func (q *AcknowledgeableQueue) Nack(id int64) error {
-	return nackImpl(q.db, q.name, id, q.AckOpts)
-}
-
 // ExpireAck expires the acknowledgement deadline for an item,
 // which requeues it to the front of the queue.
 // It takes the ID of the message to expire the acknowledgement deadline for.