Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/set vt method #8

Merged
merged 2 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions pgmq_sqlalchemy/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,108 @@ def read_with_poll(
queue_name, vt, qty, max_poll_seconds, poll_interval_ms
)

def _set_vt_sync(
self, queue_name: str, msg_id: int, vt_offset: int
) -> Optional[Message]:
"""Set the visibility timeout for a message."""
with self.session_maker() as session:
row = session.execute(
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
{"queue_name": queue_name, "msg_id": msg_id, "vt_offset": vt_offset},
).fetchone()
session.commit()
if row is None:
return None
return Message(
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
)

async def _set_vt_async(
self, queue_name: str, msg_id: int, vt_offset: int
) -> Optional[Message]:
"""Set the visibility timeout for a message."""
async with self.session_maker() as session:
row = (
await session.execute(
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
{
"queue_name": queue_name,
"msg_id": msg_id,
"vt_offset": vt_offset,
},
)
).fetchone()
await session.commit()
print("row", row)
if row is None:
return None
return Message(
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
)

def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Message]:
"""
.. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt`
.. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt`

Set the visibility timeout for a message.

Args:
queue_name (str): The name of the queue.
msg_id (int): The message id.
vt_offset (int): The visibility timeout in seconds.

Returns:
|schema_message_class|_ or ``None`` if the message does not exist.

Usage:

.. code-block:: python

msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=10)
msg = pgmq_client.read('my_queue')
assert msg is not None
msg = pgmq_client.set_vt('my_queue', msg.msg_id, 10)
assert msg is not None

.. tip::
| |read_method|_ and |set_vt_method|_ can be used together to implement **exponential backoff** mechanism.
| `ref: Exponential Backoff And Jitter <https://aws.amazon.com/tw/blogs/architecture/exponential-backoff-and-jitter/>`_.
| **For example:**

.. code-block:: python

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message

def _exp_backoff_retry(msg: Message)->int:
# exponential backoff retry
if msg.read_ct < 5:
return 2 ** msg.read_ct
return 2 ** 5

def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
msg = pgmq_client.read(
queue_name=queue_name,
vt=1000, # set vt to 1000 seconds temporarily
)
if msg is None:
return

# set exponential backoff retry
pgmq_client.set_vt(
queue_name=query_name,
msg_id=msg.msg_id,
vt_offset=_exp_backoff_retry(msg)
)

"""
if self.is_async:
return self.loop.run_until_complete(
self._set_vt_async(queue_name, msg_id, vt_offset)
)
return self._set_vt_sync(queue_name, msg_id, vt_offset)

def _pop_sync(self, queue_name: str) -> Optional[Message]:
with self.session_maker() as session:
row = session.execute(
Expand Down
29 changes: 29 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,35 @@ def test_read_with_poll_with_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
assert duration > 1.9


def test_set_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
pgmq, queue_name = pgmq_setup_teardown
msg = MSG
msg_id = pgmq.send(queue_name, msg)
msg_read = pgmq.set_vt(queue_name, msg_id, 2)
assert msg is not None
assert pgmq.read(queue_name) is None
time.sleep(1.5)
assert pgmq.read(queue_name) is None
time.sleep(0.6)
msg_read = pgmq.read(queue_name)
assert msg_read.message == msg


def test_set_vt_to_smaller_value(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
pgmq, queue_name = pgmq_setup_teardown
msg = MSG
msg_id = pgmq.send(queue_name, msg)
_ = pgmq.read(queue_name, vt=5) # set vt to 5 seconds
assert msg is not None
assert pgmq.read(queue_name) is None
time.sleep(0.5)
assert pgmq.set_vt(queue_name, msg_id, 1) is not None
time.sleep(0.3)
assert pgmq.read(queue_name) is None
time.sleep(0.8)
assert pgmq.read(queue_name) is not None


def test_pop(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
pgmq, queue_name = pgmq_setup_teardown
msg = MSG
Expand Down
Loading