Skip to content

Commit

Permalink
Merge pull request #7 from jason810496/fix/issues/6
Browse files Browse the repository at this point in the history
Fix/issues/6
  • Loading branch information
jason810496 authored Aug 3, 2024
2 parents d7457a2 + 98e8107 commit 56924eb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
34 changes: 18 additions & 16 deletions pgmq_sqlalchemy/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,13 +485,11 @@ def send_batch(
)
return self._send_batch_sync(queue_name, encode_list_to_psql(messages), delay)

def _read_sync(
self, queue_name: str, vt: Optional[int] = None
) -> Optional[Message]:
def _read_sync(self, queue_name: str, vt: int) -> Optional[Message]:
with self.session_maker() as session:
row = session.execute(
text("select * from pgmq.read(:queue_name,:vt,1);"),
{"queue_name": queue_name, "vt": vt or self.vt},
{"queue_name": queue_name, "vt": vt},
).fetchone()
session.commit()
if row is None:
Expand All @@ -500,14 +498,12 @@ def _read_sync(
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
)

async def _read_async(
self, queue_name: str, vt: Optional[int] = None
) -> Optional[Message]:
async def _read_async(self, queue_name: str, vt: int) -> Optional[Message]:
async with self.session_maker() as session:
row = (
await session.execute(
text("select * from pgmq.read(:queue_name,:vt,1);"),
{"queue_name": queue_name, "vt": vt or self.vt},
{"queue_name": queue_name, "vt": vt},
)
).fetchone()
await session.commit()
Expand Down Expand Up @@ -584,15 +580,17 @@ def read(self, queue_name: str, vt: Optional[int] = None) -> Optional[Message]:
def _read_batch_sync(
self,
queue_name: str,
vt: int,
batch_size: int = 1,
vt: Optional[int] = None,
) -> Optional[List[Message]]:
if vt is None:
vt = self.vt
with self.session_maker() as session:
rows = session.execute(
text("select * from pgmq.read(:queue_name,:vt,:batch_size);"),
{
"queue_name": queue_name,
"vt": vt or self.vt,
"vt": vt,
"batch_size": batch_size,
},
).fetchall()
Expand All @@ -613,16 +611,16 @@ def _read_batch_sync(
async def _read_batch_async(
self,
queue_name: str,
vt: int,
batch_size: int = 1,
vt: Optional[int] = None,
) -> Optional[List[Message]]:
async with self.session_maker() as session:
rows = (
await session.execute(
text("select * from pgmq.read(:queue_name,:vt,:batch_size);"),
{
"queue_name": queue_name,
"vt": vt or self.vt,
"vt": vt,
"batch_size": batch_size,
},
)
Expand Down Expand Up @@ -663,6 +661,8 @@ def read_batch(
msgs:List[Message] = pgmq_client.read_batch('my_queue', batch_size=10, vt=10)
"""
if vt is None:
vt = self.vt
if self.is_async:
return self.loop.run_until_complete(
self._read_batch_async(queue_name, batch_size, vt)
Expand All @@ -672,7 +672,7 @@ def read_batch(
def _read_with_poll_sync(
self,
queue_name: str,
vt: Optional[int] = None,
vt: int,
qty: int = 1,
max_poll_seconds: int = 5,
poll_interval_ms: int = 100,
Expand All @@ -685,7 +685,7 @@ def _read_with_poll_sync(
),
{
"queue_name": queue_name,
"vt": vt or self.vt,
"vt": vt,
"qty": qty,
"max_poll_seconds": max_poll_seconds,
"poll_interval_ms": poll_interval_ms,
Expand All @@ -708,7 +708,7 @@ def _read_with_poll_sync(
async def _read_with_poll_async(
self,
queue_name: str,
vt: Optional[int] = None,
vt: int,
qty: int = 1,
max_poll_seconds: int = 5,
poll_interval_ms: int = 100,
Expand All @@ -722,7 +722,7 @@ async def _read_with_poll_async(
),
{
"queue_name": queue_name,
"vt": vt or self.vt,
"vt": vt,
"qty": qty,
"max_poll_seconds": max_poll_seconds,
"poll_interval_ms": poll_interval_ms,
Expand Down Expand Up @@ -799,6 +799,8 @@ def read_with_poll(
assert len(msgs) == 3 # will read at most 3 messages (qty=3)
"""
if vt is None:
vt = self.vt

if self.is_async:
return self.loop.run_until_complete(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pgmq-sqlalchemy"
version = "0.1.0"
version = "0.1.1"
description = "More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn."
authors = ["jason810496 <810496@email.wlsh.tyc.edu.tw>"]
license = "MIT"
Expand Down

0 comments on commit 56924eb

Please sign in to comment.