diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index c1afb43..9b393a0 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -485,13 +485,11 @@ def send_batch( ) return self._send_batch_sync(queue_name, encode_list_to_psql(messages), delay) - def _read_sync( - self, queue_name: str, vt: Optional[int] = None - ) -> Optional[Message]: + def _read_sync(self, queue_name: str, vt: int) -> Optional[Message]: with self.session_maker() as session: row = session.execute( text("select * from pgmq.read(:queue_name,:vt,1);"), - {"queue_name": queue_name, "vt": vt or self.vt}, + {"queue_name": queue_name, "vt": vt}, ).fetchone() session.commit() if row is None: @@ -500,14 +498,12 @@ def _read_sync( msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] ) - async def _read_async( - self, queue_name: str, vt: Optional[int] = None - ) -> Optional[Message]: + async def _read_async(self, queue_name: str, vt: int) -> Optional[Message]: async with self.session_maker() as session: row = ( await session.execute( text("select * from pgmq.read(:queue_name,:vt,1);"), - {"queue_name": queue_name, "vt": vt or self.vt}, + {"queue_name": queue_name, "vt": vt}, ) ).fetchone() await session.commit() @@ -584,15 +580,17 @@ def read(self, queue_name: str, vt: Optional[int] = None) -> Optional[Message]: def _read_batch_sync( self, queue_name: str, + vt: int, batch_size: int = 1, - vt: Optional[int] = None, ) -> Optional[List[Message]]: + if vt is None: + vt = self.vt with self.session_maker() as session: rows = session.execute( text("select * from pgmq.read(:queue_name,:vt,:batch_size);"), { "queue_name": queue_name, - "vt": vt or self.vt, + "vt": vt, "batch_size": batch_size, }, ).fetchall() @@ -613,8 +611,8 @@ def _read_batch_sync( async def _read_batch_async( self, queue_name: str, + vt: int, batch_size: int = 1, - vt: Optional[int] = None, ) -> Optional[List[Message]]: async with self.session_maker() as session: rows = ( @@ -622,7 +620,7 @@ async def _read_batch_async( text("select * from pgmq.read(:queue_name,:vt,:batch_size);"), { "queue_name": queue_name, - "vt": vt or self.vt, + "vt": vt, "batch_size": batch_size, }, ) @@ -663,6 +661,8 @@ def read_batch( msgs:List[Message] = pgmq_client.read_batch('my_queue', batch_size=10, vt=10) """ + if vt is None: + vt = self.vt if self.is_async: return self.loop.run_until_complete( self._read_batch_async(queue_name, batch_size, vt) @@ -672,7 +672,7 @@ def read_batch( def _read_with_poll_sync( self, queue_name: str, - vt: Optional[int] = None, + vt: int, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, @@ -685,7 +685,7 @@ def _read_with_poll_sync( ), { "queue_name": queue_name, - "vt": vt or self.vt, + "vt": vt, "qty": qty, "max_poll_seconds": max_poll_seconds, "poll_interval_ms": poll_interval_ms, @@ -708,7 +708,7 @@ def _read_with_poll_sync( async def _read_with_poll_async( self, queue_name: str, - vt: Optional[int] = None, + vt: int, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, @@ -722,7 +722,7 @@ async def _read_with_poll_async( ), { "queue_name": queue_name, - "vt": vt or self.vt, + "vt": vt, "qty": qty, "max_poll_seconds": max_poll_seconds, "poll_interval_ms": poll_interval_ms, @@ -799,6 +799,8 @@ def read_with_poll( assert len(msgs) == 3 # will read at most 3 messages (qty=3) """ + if vt is None: + vt = self.vt if self.is_async: return self.loop.run_until_complete( diff --git a/pyproject.toml b/pyproject.toml index a354664..e909bf5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pgmq-sqlalchemy" -version = "0.1.1" +version = "0.1.2" description = "More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn." authors = ["jason810496 <810496@email.wlsh.tyc.edu.tw>"] license = "MIT"