diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index c1afb43..07e8615 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -810,6 +810,108 @@ def read_with_poll( queue_name, vt, qty, max_poll_seconds, poll_interval_ms ) + def _set_vt_sync( + self, queue_name: str, msg_id: int, vt_offset: int + ) -> Optional[Message]: + """Set the visibility timeout for a message.""" + with self.session_maker() as session: + row = session.execute( + text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"), + {"queue_name": queue_name, "msg_id": msg_id, "vt_offset": vt_offset}, + ).fetchone() + session.commit() + if row is None: + return None + return Message( + msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] + ) + + async def _set_vt_async( + self, queue_name: str, msg_id: int, vt_offset: int + ) -> Optional[Message]: + """Set the visibility timeout for a message.""" + async with self.session_maker() as session: + row = ( + await session.execute( + text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"), + { + "queue_name": queue_name, + "msg_id": msg_id, + "vt_offset": vt_offset, + }, + ) + ).fetchone() + await session.commit() + print("row", row) + if row is None: + return None + return Message( + msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] + ) + + def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Message]: + """ + .. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt` + .. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt` + + Set the visibility timeout for a message. + + Args: + queue_name (str): The name of the queue. + msg_id (int): The message id. + vt_offset (int): The visibility timeout in seconds. + + Returns: + |schema_message_class|_ or ``None`` if the message does not exist. + + Usage: + + .. code-block:: python + + msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=10) + msg = pgmq_client.read('my_queue') + assert msg is not None + msg = pgmq_client.set_vt('my_queue', msg.msg_id, 10) + assert msg is not None + + .. tip:: + | |read_method|_ and |set_vt_method|_ can be used together to implement **exponential backoff** mechanism. + | `ref: Exponential Backoff And Jitter `_. + | **For example:** + + .. code-block:: python + + from pgmq_sqlalchemy import PGMQueue + from pgmq_sqlalchemy.schema import Message + + def _exp_backoff_retry(msg: Message)->int: + # exponential backoff retry + if msg.read_ct < 5: + return 2 ** msg.read_ct + return 2 ** 5 + + def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str): + msg = pgmq_client.read( + queue_name=queue_name, + vt=1000, # set vt to 1000 seconds temporarily + ) + if msg is None: + return + + # set exponential backoff retry + pgmq_client.set_vt( + queue_name=query_name, + msg_id=msg.msg_id, + vt_offset=_exp_backoff_retry(msg) + ) + + """ + if self.is_async: + return self.loop.run_until_complete( + self._set_vt_async(queue_name, msg_id, vt_offset) + ) + return self._set_vt_sync(queue_name, msg_id, vt_offset) + def _pop_sync(self, queue_name: str) -> Optional[Message]: with self.session_maker() as session: row = session.execute( diff --git a/tests/test_queue.py b/tests/test_queue.py index fd53e38..6eb326f 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -242,6 +242,35 @@ def test_read_with_poll_with_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE): assert duration > 1.9 +def test_set_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE): + pgmq, queue_name = pgmq_setup_teardown + msg = MSG + msg_id = pgmq.send(queue_name, msg) + msg_read = pgmq.set_vt(queue_name, msg_id, 2) + assert msg is not None + assert pgmq.read(queue_name) is None + time.sleep(1.5) + assert pgmq.read(queue_name) is None + time.sleep(0.6) + msg_read = pgmq.read(queue_name) + assert msg_read.message == msg + + +def test_set_vt_to_smaller_value(pgmq_setup_teardown: PGMQ_WITH_QUEUE): + pgmq, queue_name = pgmq_setup_teardown + msg = MSG + msg_id = pgmq.send(queue_name, msg) + _ = pgmq.read(queue_name, vt=5) # set vt to 5 seconds + assert msg is not None + assert pgmq.read(queue_name) is None + time.sleep(0.5) + assert pgmq.set_vt(queue_name, msg_id, 1) is not None + time.sleep(0.3) + assert pgmq.read(queue_name) is None + time.sleep(0.8) + assert pgmq.read(queue_name) is not None + + def test_pop(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG