Skip to content

Commit

Permalink
Merge branch 'master' into doc-strings
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm authored Jun 8, 2024
2 parents f539ee6 + 0cfca07 commit a8d0867
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 77 deletions.
5 changes: 1 addition & 4 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ If there are no entries after the last release, use `**UNRELEASED**` as the vers
If, say, your patch fixes issue #999, the entry should look like this:

`* Fix big bad boo-boo in the async scheduler (#999
<https://github.com/agronholm/apscheduler/issues/999>_; PR by Yourname)`
<https://github.com/agronholm/apscheduler/issues/999>_; PR by @yourgithubaccount)`

If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.

If possible, use your real name in the changelog entry. If not, use your GitHub
username.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.4
rev: v0.4.7
hooks:
- id: ruff
args: [--fix, --show-fixes]
Expand All @@ -40,7 +40,7 @@ repos:
stages: [manual]

- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
rev: v2.3.0
hooks:
- id: codespell

Expand Down
4 changes: 4 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ APScheduler, see the :doc:`migration section <migration>`.

**UNRELEASED**

- **BREAKING** Refactored ``AsyncpgEventBroker`` to directly accept a connection string,
thus eliminating the need for the ``AsyncpgEventBroker.from_dsn()`` class method
- Added the ``psycopg`` event broker
- Added useful indexes and removed useless ones in ``SQLAlchemyDatastore`` and
``MongoDBDataStore``

**4.0.0a5**

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ cbor = ["cbor2 >= 5.0"]
mongodb = ["pymongo >= 4"]
mqtt = ["paho-mqtt >= 2.0"]
redis = ["redis >= 5.0.1"]
sqlalchemy = ["sqlalchemy[asyncio] >= 2.0.19"]
sqlalchemy = ["sqlalchemy[asyncio] >= 2.0.24"]
test = [
"APScheduler[cbor,mongodb,mqtt,redis,sqlalchemy]",
"asyncpg >= 0.20; python_implementation == 'CPython'",
Expand Down
5 changes: 3 additions & 2 deletions src/apscheduler/datastores/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,12 @@ def _initialize(self) -> None:
self._jobs.delete_many({}, session=session)
self._jobs_results.delete_many({}, session=session)

self._schedules.create_index("task_id", session=session)
self._schedules.create_index("next_fire_time", session=session)
self._schedules.create_index("acquired_by", session=session)
self._jobs.create_index("task_id", session=session)
self._jobs.create_index("schedule_id", session=session)
self._jobs.create_index("created_at", session=session)
self._jobs_results.create_index("finished_at", session=session)
self._jobs.create_index("acquired_by", session=session)
self._jobs_results.create_index("expires_at", session=session)

async def start(
Expand Down
6 changes: 3 additions & 3 deletions src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def get_table_definitions(self) -> MetaData:
Column("max_jitter", interval_type),
*next_fire_time_tzoffset_columns,
Column("last_fire_time", timestamp_type),
Column("acquired_by", Unicode(500)),
Column("acquired_by", Unicode(500), index=True),
Column("acquired_until", timestamp_type),
)
Table(
Expand All @@ -317,7 +317,7 @@ def get_table_definitions(self) -> MetaData:
Column("start_deadline", timestamp_type),
Column("result_expiration_time", interval_type),
Column("created_at", timestamp_type, nullable=False),
Column("acquired_by", Unicode(500)),
Column("acquired_by", Unicode(500), index=True),
Column("acquired_until", timestamp_type),
)
Table(
Expand All @@ -326,7 +326,7 @@ def get_table_definitions(self) -> MetaData:
Column("job_id", Uuid, primary_key=True),
Column("outcome", Enum(JobOutcome, metadata=metadata), nullable=False),
Column("started_at", timestamp_type, index=True),
Column("finished_at", timestamp_type, nullable=False, index=True),
Column("finished_at", timestamp_type, nullable=False),
Column("expires_at", timestamp_type, nullable=False, index=True),
Column("exception", LargeBinary),
Column("return_value", LargeBinary),
Expand Down
76 changes: 27 additions & 49 deletions src/apscheduler/eventbrokers/asyncpg.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from __future__ import annotations

from collections.abc import Awaitable, Mapping
from contextlib import AsyncExitStack
from functools import partial
from collections.abc import AsyncGenerator, Mapping
from contextlib import AsyncExitStack, asynccontextmanager
from logging import Logger
from typing import TYPE_CHECKING, Any, Callable, cast
from typing import TYPE_CHECKING, Any, cast

import asyncpg
import attrs
Expand All @@ -16,6 +15,7 @@
)
from anyio.streams.memory import MemoryObjectSendStream
from asyncpg import Connection, InterfaceError
from attr.validators import instance_of

from .._events import Event
from .._exceptions import SerializationError
Expand All @@ -33,33 +33,22 @@ class AsyncpgEventBroker(BaseExternalEventBroker):
.. _asyncpg: https://pypi.org/project/asyncpg/
:param connection_factory: a callable that creates an asyncpg connection
:param dsn: a libpq connection string (e.g.
``postgres://user:pass@host:port/dbname``)
:param options: extra keyword arguments passed to :func:`asyncpg.connect`
:param channel: the ``NOTIFY`` channel to use
:param max_idle_time: maximum time to let the connection go idle, before sending a
``SELECT 1`` query to prevent a connection timeout
"""

connection_factory: Callable[[], Awaitable[Connection]]
dsn: str
options: Mapping[str, Any] = attrs.field(
factory=dict, validator=instance_of(Mapping)
)
channel: str = attrs.field(kw_only=True, default="apscheduler")
max_idle_time: float = attrs.field(kw_only=True, default=10)
_send: MemoryObjectSendStream[str] = attrs.field(init=False)

@classmethod
def from_dsn(
cls, dsn: str, options: Mapping[str, Any] | None = None, **kwargs: Any
) -> AsyncpgEventBroker:
"""
Create a new asyncpg event broker from an existing asyncpg connection pool.

:param dsn: data source name, passed as first positional argument to
:func:`asyncpg.connect`
:param options: keyword arguments passed to :func:`asyncpg.connect`
:param kwargs: keyword arguments to pass to the initializer of this class
:return: the newly created event broker
"""
factory = partial(asyncpg.connect, dsn, **(options or {}))
return cls(factory, **kwargs)
_send: MemoryObjectSendStream[str] = attrs.field(init=False)

@classmethod
def from_async_sqla_engine(
Expand All @@ -76,8 +65,7 @@ def from_async_sqla_engine(
:param engine: an asynchronous SQLAlchemy engine using asyncpg as the driver
:type engine: ~sqlalchemy.ext.asyncio.AsyncEngine
:param options: extra keyword arguments passed to :func:`asyncpg.connect` (will
override any automatically generated arguments based on the engine)
:param options: extra keyword arguments passed to :func:`asyncpg.connect`
:param kwargs: keyword arguments to pass to the initializer of this class
:return: the newly created event broker
Expand All @@ -88,25 +76,24 @@ def from_async_sqla_engine(
f"{engine.dialect.driver})"
)

connect_args = dict(engine.url.query)
for optname in ("host", "port", "database", "username", "password"):
value = getattr(engine.url, optname)
if value is not None:
if optname == "username":
optname = "user"

connect_args[optname] = value

if options:
connect_args |= options

factory = partial(asyncpg.connect, **connect_args)
return cls(factory, **kwargs)
dsn = engine.url.render_as_string(hide_password=False).replace("+asyncpg", "")
return cls(dsn, options or {}, **kwargs)

@property
def _temporary_failure_exceptions(self) -> tuple[type[Exception], ...]:
return OSError, InterfaceError

@asynccontextmanager
async def _connect(self) -> AsyncGenerator[asyncpg.Connection, None]:
async for attempt in self._retry():
with attempt:
conn = await asyncpg.connect(self.dsn, **self.options)
try:
yield conn
finally:
with move_on_after(5, shield=True):
await conn.close(timeout=3)

async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await super().start(exit_stack, logger)
self._send = cast(
Expand All @@ -125,11 +112,6 @@ def listen_callback(
if event is not None:
self._task_group.start_soon(self.publish_local, event)

async def close_connection() -> None:
if not conn.is_closed():
with move_on_after(3, shield=True):
await conn.close()

async def unsubscribe() -> None:
if not conn.is_closed():
with move_on_after(3, shield=True):
Expand All @@ -139,11 +121,7 @@ async def unsubscribe() -> None:
send, receive = create_memory_object_stream[str](100)
while True:
async with AsyncExitStack() as exit_stack:
async for attempt in self._retry():
with attempt:
conn = await self.connection_factory()

exit_stack.push_async_callback(close_connection)
conn = await exit_stack.enter_async_context(self._connect())
self._logger.info("Connection established")
try:
await conn.add_listener(self.channel, listen_callback)
Expand Down
25 changes: 10 additions & 15 deletions src/apscheduler/eventbrokers/psycopg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from contextlib import AsyncExitStack, asynccontextmanager
from logging import Logger
from typing import TYPE_CHECKING, Any, NoReturn
from urllib.parse import urlunparse

import attrs
from anyio import (
Expand Down Expand Up @@ -40,6 +39,8 @@ class PsycopgEventBroker(BaseExternalEventBroker):
:param conninfo: a libpq connection string (e.g.
``postgres://user:pass@host:port/dbname``)
:param options: extra keyword arguments passed to
:meth:`psycopg.AsyncConnection.connect`
:param channel: the ``NOTIFY`` channel to use
:param max_idle_time: maximum time (in seconds) to let the connection go idle,
before sending a ``SELECT 1`` query to prevent a connection timeout
Expand Down Expand Up @@ -71,10 +72,10 @@ def from_async_sqla_engine(
The engine will only be used to create the appropriate options for
:meth:`psycopg.AsyncConnection.connect`.
:param engine: an asynchronous SQLAlchemy engine using asyncpg as the driver
:param engine: an asynchronous SQLAlchemy engine using psycopg as the driver
:type engine: ~sqlalchemy.ext.asyncio.AsyncEngine
:param options: extra keyword arguments passed to :func:`asyncpg.connect` (will
override any automatically generated arguments based on the engine)
:param options: extra keyword arguments passed to
:meth:`psycopg.AsyncConnection.connect`
:param kwargs: keyword arguments to pass to the initializer of this class
:return: the newly created event broker
Expand All @@ -85,17 +86,10 @@ def from_async_sqla_engine(
f"{engine.dialect.driver})"
)

conninfo = urlunparse(
[
"postgres",
engine.url.username,
engine.url.password,
engine.url.host,
engine.url.database,
]
conninfo = engine.url.render_as_string(hide_password=False).replace(
"+psycopg", ""
)
opts = dict(options, autocommit=True)
return cls(conninfo, opts, **kwargs)
return cls(conninfo, options or {}, **kwargs)

@property
def _temporary_failure_exceptions(self) -> tuple[type[Exception], ...]:
Expand All @@ -109,7 +103,8 @@ async def _connect(self) -> AsyncGenerator[AsyncConnection, None]:
try:
yield conn
finally:
await conn.close()
with move_on_after(5, shield=True):
await conn.close()

async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await super().start(exit_stack, logger)
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def asyncpg_broker(serializer: Serializer) -> EventBroker:
pytest.importorskip("asyncpg", reason="asyncpg is not installed")
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

broker = AsyncpgEventBroker.from_dsn(
broker = AsyncpgEventBroker(
"postgres://postgres:secret@localhost:5432/testdb", serializer=serializer
)
return broker
Expand Down
48 changes: 48 additions & 0 deletions tests/test_eventbrokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,51 @@ async def test_cancel_stop(raw_event_broker: EventBroker, logger: Logger) -> Non
async with AsyncExitStack() as exit_stack:
await raw_event_broker.start(exit_stack, logger)
scope.cancel()


def test_asyncpg_broker_from_async_engine() -> None:
pytest.importorskip("asyncpg", reason="asyncpg is not installed")
from sqlalchemy import URL
from sqlalchemy.ext.asyncio import create_async_engine

from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

url = URL(
"postgresql+asyncpg",
"myuser",
"c /%@",
"localhost",
7654,
"dbname",
{"opt1": "foo", "opt2": "bar"},
)
engine = create_async_engine(url)
broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
assert isinstance(broker, AsyncpgEventBroker)
assert broker.dsn == (
"postgresql://myuser:c %2F%25%40@localhost:7654/dbname?opt1=foo&opt2=bar"
)


def test_psycopg_broker_from_async_engine() -> None:
pytest.importorskip("psycopg", reason="psycopg is not installed")
from sqlalchemy import URL
from sqlalchemy.ext.asyncio import create_async_engine

from apscheduler.eventbrokers.psycopg import PsycopgEventBroker

url = URL(
"postgresql+psycopg",
"myuser",
"c /%@",
"localhost",
7654,
"dbname",
{"opt1": "foo", "opt2": "bar"},
)
engine = create_async_engine(url)
broker = PsycopgEventBroker.from_async_sqla_engine(engine)
assert isinstance(broker, PsycopgEventBroker)
assert broker.conninfo == (
"postgresql://myuser:c %2F%25%40@localhost:7654/dbname?opt1=foo&opt2=bar"
)

0 comments on commit a8d0867

Please sign in to comment.