Skip to content

Commit

Permalink
Unified how the asyncpg and psycopg event brokers are initialized
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Jun 8, 2024
1 parent 74daaf7 commit 8bf590c
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 60 deletions.
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ APScheduler, see the :doc:`migration section <migration>`.

**UNRELEASED**

- **BREAKING** Refactored ``AsyncpgEventBroker`` to directly accept a connection string,
thus eliminating the need for the ``AsyncpgEventBroker.from_dsn()`` class method
- Added the ``psycopg`` event broker
- Added useful indexes and removed useless ones in ``SQLAlchemyDatastore`` and
``MongoDBDataStore``
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ cbor = ["cbor2 >= 5.0"]
mongodb = ["pymongo >= 4"]
mqtt = ["paho-mqtt >= 2.0"]
redis = ["redis >= 5.0.1"]
sqlalchemy = ["sqlalchemy[asyncio] >= 2.0.19"]
sqlalchemy = ["sqlalchemy[asyncio] >= 2.0.24"]
test = [
"APScheduler[cbor,mongodb,mqtt,redis,sqlalchemy]",
"asyncpg >= 0.20; python_implementation == 'CPython'",
Expand Down
76 changes: 27 additions & 49 deletions src/apscheduler/eventbrokers/asyncpg.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from __future__ import annotations

from collections.abc import Awaitable, Mapping
from contextlib import AsyncExitStack
from functools import partial
from collections.abc import AsyncGenerator, Mapping
from contextlib import AsyncExitStack, asynccontextmanager
from logging import Logger
from typing import TYPE_CHECKING, Any, Callable, cast
from typing import TYPE_CHECKING, Any, cast

import asyncpg
import attrs
Expand All @@ -16,6 +15,7 @@
)
from anyio.streams.memory import MemoryObjectSendStream
from asyncpg import Connection, InterfaceError
from attr.validators import instance_of

from .._events import Event
from .._exceptions import SerializationError
Expand All @@ -33,33 +33,22 @@ class AsyncpgEventBroker(BaseExternalEventBroker):
.. _asyncpg: https://pypi.org/project/asyncpg/
:param connection_factory: a callable that creates an asyncpg connection
:param dsn: a libpq connection string (e.g.
``postgres://user:pass@host:port/dbname``)
:param options: extra keyword arguments passed to :func:`asyncpg.connect`
:param channel: the ``NOTIFY`` channel to use
:param max_idle_time: maximum time to let the connection go idle, before sending a
``SELECT 1`` query to prevent a connection timeout
"""

connection_factory: Callable[[], Awaitable[Connection]]
dsn: str
options: Mapping[str, Any] = attrs.field(
factory=dict, validator=instance_of(Mapping)
)
channel: str = attrs.field(kw_only=True, default="apscheduler")
max_idle_time: float = attrs.field(kw_only=True, default=10)
_send: MemoryObjectSendStream[str] = attrs.field(init=False)

@classmethod
def from_dsn(
cls, dsn: str, options: Mapping[str, Any] | None = None, **kwargs: Any
) -> AsyncpgEventBroker:
"""
Create a new asyncpg event broker from an existing asyncpg connection pool.

:param dsn: data source name, passed as first positional argument to
:func:`asyncpg.connect`
:param options: keyword arguments passed to :func:`asyncpg.connect`
:param kwargs: keyword arguments to pass to the initializer of this class
:return: the newly created event broker
"""
factory = partial(asyncpg.connect, dsn, **(options or {}))
return cls(factory, **kwargs)
_send: MemoryObjectSendStream[str] = attrs.field(init=False)

@classmethod
def from_async_sqla_engine(
Expand All @@ -76,8 +65,7 @@ def from_async_sqla_engine(
:param engine: an asynchronous SQLAlchemy engine using asyncpg as the driver
:type engine: ~sqlalchemy.ext.asyncio.AsyncEngine
:param options: extra keyword arguments passed to :func:`asyncpg.connect` (will
override any automatically generated arguments based on the engine)
:param options: extra keyword arguments passed to :func:`asyncpg.connect`
:param kwargs: keyword arguments to pass to the initializer of this class
:return: the newly created event broker
Expand All @@ -88,25 +76,24 @@ def from_async_sqla_engine(
f"{engine.dialect.driver})"
)

connect_args = dict(engine.url.query)
for optname in ("host", "port", "database", "username", "password"):
value = getattr(engine.url, optname)
if value is not None:
if optname == "username":
optname = "user"

connect_args[optname] = value

if options:
connect_args |= options

factory = partial(asyncpg.connect, **connect_args)
return cls(factory, **kwargs)
dsn = engine.url.render_as_string(hide_password=False).replace("+asyncpg", "")
return cls(dsn, options or {}, **kwargs)

@property
def _temporary_failure_exceptions(self) -> tuple[type[Exception], ...]:
return OSError, InterfaceError

@asynccontextmanager
async def _connect(self) -> AsyncGenerator[asyncpg.Connection, None]:
async for attempt in self._retry():
with attempt:
conn = await asyncpg.connect(self.dsn, **self.options)
try:
yield conn
finally:
with move_on_after(5, shield=True):
await conn.close(timeout=3)

async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await super().start(exit_stack, logger)
self._send = cast(
Expand All @@ -125,11 +112,6 @@ def listen_callback(
if event is not None:
self._task_group.start_soon(self.publish_local, event)

async def close_connection() -> None:
if not conn.is_closed():
with move_on_after(3, shield=True):
await conn.close()

async def unsubscribe() -> None:
if not conn.is_closed():
with move_on_after(3, shield=True):
Expand All @@ -139,11 +121,7 @@ async def unsubscribe() -> None:
send, receive = create_memory_object_stream[str](100)
while True:
async with AsyncExitStack() as exit_stack:
async for attempt in self._retry():
with attempt:
conn = await self.connection_factory()

exit_stack.push_async_callback(close_connection)
conn = await exit_stack.enter_async_context(self._connect())
self._logger.info("Connection established")
try:
await conn.add_listener(self.channel, listen_callback)
Expand Down
14 changes: 8 additions & 6 deletions src/apscheduler/eventbrokers/psycopg.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class PsycopgEventBroker(BaseExternalEventBroker):
:param conninfo: a libpq connection string (e.g.
``postgres://user:pass@host:port/dbname``)
:param options: extra keyword arguments passed to
:meth:`psycopg.AsyncConnection.connect`
:param channel: the ``NOTIFY`` channel to use
:param max_idle_time: maximum time (in seconds) to let the connection go idle,
before sending a ``SELECT 1`` query to prevent a connection timeout
Expand Down Expand Up @@ -70,10 +72,10 @@ def from_async_sqla_engine(
The engine will only be used to create the appropriate options for
:meth:`psycopg.AsyncConnection.connect`.
:param engine: an asynchronous SQLAlchemy engine using asyncpg as the driver
:param engine: an asynchronous SQLAlchemy engine using psycopg as the driver
:type engine: ~sqlalchemy.ext.asyncio.AsyncEngine
:param options: extra keyword arguments passed to :func:`asyncpg.connect` (will
override any automatically generated arguments based on the engine)
:param options: extra keyword arguments passed to
:meth:`psycopg.AsyncConnection.connect`
:param kwargs: keyword arguments to pass to the initializer of this class
:return: the newly created event broker
Expand All @@ -87,8 +89,7 @@ def from_async_sqla_engine(
conninfo = engine.url.render_as_string(hide_password=False).replace(
"+psycopg", ""
)
opts = dict(options or {}, autocommit=True)
return cls(conninfo, opts, **kwargs)
return cls(conninfo, options or {}, **kwargs)

@property
def _temporary_failure_exceptions(self) -> tuple[type[Exception], ...]:
Expand All @@ -102,7 +103,8 @@ async def _connect(self) -> AsyncGenerator[AsyncConnection, None]:
try:
yield conn
finally:
await conn.close()
with move_on_after(5, shield=True):
await conn.close()

async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await super().start(exit_stack, logger)
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def asyncpg_broker(serializer: Serializer) -> EventBroker:
pytest.importorskip("asyncpg", reason="asyncpg is not installed")
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

broker = AsyncpgEventBroker.from_dsn(
broker = AsyncpgEventBroker(
"postgres://postgres:secret@localhost:5432/testdb", serializer=serializer
)
return broker
Expand Down
6 changes: 3 additions & 3 deletions tests/test_eventbrokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def test_asyncpg_broker_from_async_engine() -> None:
engine = create_async_engine(url)
broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
assert isinstance(broker, AsyncpgEventBroker)
assert broker.conninfo == (
"postgresql://myuser:c %2F%%40@localhost:7654/dbname?opt1=foo&opt2=bar"
assert broker.dsn == (
"postgresql://myuser:c %2F%25%40@localhost:7654/dbname?opt1=foo&opt2=bar"
)


Expand All @@ -165,5 +165,5 @@ def test_psycopg_broker_from_async_engine() -> None:
broker = PsycopgEventBroker.from_async_sqla_engine(engine)
assert isinstance(broker, PsycopgEventBroker)
assert broker.conninfo == (
"postgresql://myuser:c %2F%%40@localhost:7654/dbname?opt1=foo&opt2=bar"
"postgresql://myuser:c %2F%25%40@localhost:7654/dbname?opt1=foo&opt2=bar"
)

0 comments on commit 8bf590c

Please sign in to comment.