From 4758e8ff104bd2e767d462e7aa60cc4b5c51ce72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 21 Sep 2024 11:25:44 +0300 Subject: [PATCH] Fixed race condition where MongoDBDataStore let multiple schedulers acquire the same schedules --- docs/versionhistory.rst | 2 + src/apscheduler/datastores/mongodb.py | 131 +++++++++++++++++--------- 2 files changed, 90 insertions(+), 43 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 5eb43aa6..e8eda3c1 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -58,6 +58,8 @@ APScheduler, see the :doc:`migration section `. ``DeserializationError`` as appropriate - Fixed ``repr()`` outputs of schedulers, data stores and event brokers to be much more useful and reasonable +- Fixed race condition in ``MongoDBDataStore`` that allowed multiple schedulers to + acquire the same schedules at once **4.0.0a5** diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 95b54825..4d03c317 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -381,53 +381,98 @@ async def remove_schedules(self, ids: Iterable[str]) -> None: async def acquire_schedules( self, scheduler_id: str, lease_duration: timedelta, limit: int ) -> list[Schedule]: - async for attempt in self._retry(): - with attempt, self._client.start_session() as session: - schedules: list[Schedule] = [] - now = datetime.now(timezone.utc) - async with await AsyncCursor.create( - lambda: self._schedules.find( - { - "next_fire_time": {"$lte": now.timestamp()}, - "$and": [ - { - "$or": [ - {"paused": {"$exists": False}}, - {"paused": False}, - ] - }, - { - "$or": [ - {"acquired_until": {"$exists": False}}, - {"acquired_until": {"$lt": now.timestamp()}}, - ] - }, - ], - }, - session=session, - ) - .sort("next_fire_time") - .limit(limit) - ) as cursor: - async for document in cursor: - document["id"] = document.pop("_id") - unmarshal_timestamps(document) - schedule = Schedule.unmarshal(self.serializer, document) - schedules.append(schedule) + schedules: list[Schedule] = [] + + # Fetch up to {limit} schedules + while len(schedules) < limit: + async for attempt in self._retry(): + with attempt, self._client.start_session() as session: + now = datetime.now(timezone.utc) + async with await AsyncCursor.create( + lambda: self._schedules.find( + { + "next_fire_time": {"$lte": now.timestamp()}, + "$and": [ + { + "$or": [ + {"paused": {"$exists": False}}, + {"paused": False}, + ] + }, + { + "$or": [ + {"acquired_until": {"$exists": False}}, + { + "acquired_until": { + "$lt": now.timestamp() + } + }, + ] + }, + ], + }, + session=session, + ) + .sort("next_fire_time") + .limit(limit - len(schedules)) + ) as cursor: + documents = [doc async for doc in cursor] + + # Bail out if there are no more schedules to be acquired + if not documents: + return schedules - if schedules: now = datetime.now(timezone.utc) acquired_until = now + lease_duration - filters = {"_id": {"$in": [schedule.id for schedule in schedules]}} - update = { - "$set": { - "acquired_by": scheduler_id, - **marshal_timestamp(acquired_until, "acquired_until"), - } - } - self._schedules.update_many(filters, update, session=session) + schedule_ids = [doc["_id"] for doc in documents] + result = await to_thread.run_sync( + lambda: self._schedules.update_many( + { + "_id": {"$in": schedule_ids}, + "$or": [ + {"acquired_until": {"$exists": False}}, + {"acquired_until": {"$lt": now.timestamp()}}, + ], + }, + { + "$set": { + "acquired_by": scheduler_id, + **marshal_timestamp( + acquired_until, "acquired_until" + ), + } + }, + ) + ) - return schedules + # If the number of modified schedules was smaller than expected, + # manually check which ones were successfully acquired + if result.modified_count != len(schedule_ids): + async with await AsyncCursor.create( + lambda: self._schedules.find( + { + "_id": {"$in": schedule_ids}, + "acquired_by": scheduler_id, + }, + sort=[("created_at", ASCENDING)], + projection=["_id"], + session=session, + ) + ) as cursor: + acquired_schedule_ids = {doc["_id"] async for doc in cursor} + documents = [ + doc + for doc in documents + if doc["_id"] in acquired_schedule_ids + ] + + for doc in documents: + # Deserialize the schedule + doc["id"] = doc.pop("_id") + unmarshal_timestamps(doc) + schedules.append(Schedule.unmarshal(self.serializer, doc)) + + return schedules async def release_schedules( self, scheduler_id: str, results: Sequence[ScheduleResult]