Skip to content

Commit

Permalink
Fixed race condition where MongoDBDataStore let multiple schedulers a…
Browse files Browse the repository at this point in the history
…cquire the same schedules
  • Loading branch information
agronholm committed Sep 21, 2024
1 parent df887b6 commit 4758e8f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 43 deletions.
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ APScheduler, see the :doc:`migration section <migration>`.
``DeserializationError`` as appropriate
- Fixed ``repr()`` outputs of schedulers, data stores and event brokers to be much more
useful and reasonable
- Fixed race condition in ``MongoDBDataStore`` that allowed multiple schedulers to
acquire the same schedules at once

**4.0.0a5**

Expand Down
131 changes: 88 additions & 43 deletions src/apscheduler/datastores/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,53 +381,98 @@ async def remove_schedules(self, ids: Iterable[str]) -> None:
async def acquire_schedules(
self, scheduler_id: str, lease_duration: timedelta, limit: int
) -> list[Schedule]:
async for attempt in self._retry():
with attempt, self._client.start_session() as session:
schedules: list[Schedule] = []
now = datetime.now(timezone.utc)
async with await AsyncCursor.create(
lambda: self._schedules.find(
{
"next_fire_time": {"$lte": now.timestamp()},
"$and": [
{
"$or": [
{"paused": {"$exists": False}},
{"paused": False},
]
},
{
"$or": [
{"acquired_until": {"$exists": False}},
{"acquired_until": {"$lt": now.timestamp()}},
]
},
],
},
session=session,
)
.sort("next_fire_time")
.limit(limit)
) as cursor:
async for document in cursor:
document["id"] = document.pop("_id")
unmarshal_timestamps(document)
schedule = Schedule.unmarshal(self.serializer, document)
schedules.append(schedule)
schedules: list[Schedule] = []

# Fetch up to {limit} schedules
while len(schedules) < limit:
async for attempt in self._retry():
with attempt, self._client.start_session() as session:
now = datetime.now(timezone.utc)
async with await AsyncCursor.create(
lambda: self._schedules.find(
{
"next_fire_time": {"$lte": now.timestamp()},
"$and": [
{
"$or": [
{"paused": {"$exists": False}},
{"paused": False},
]
},
{
"$or": [
{"acquired_until": {"$exists": False}},
{
"acquired_until": {
"$lt": now.timestamp()
}
},
]
},
],
},
session=session,
)
.sort("next_fire_time")
.limit(limit - len(schedules))
) as cursor:
documents = [doc async for doc in cursor]

# Bail out if there are no more schedules to be acquired
if not documents:
return schedules

if schedules:
now = datetime.now(timezone.utc)
acquired_until = now + lease_duration
filters = {"_id": {"$in": [schedule.id for schedule in schedules]}}
update = {
"$set": {
"acquired_by": scheduler_id,
**marshal_timestamp(acquired_until, "acquired_until"),
}
}
self._schedules.update_many(filters, update, session=session)
schedule_ids = [doc["_id"] for doc in documents]
result = await to_thread.run_sync(
lambda: self._schedules.update_many(
{
"_id": {"$in": schedule_ids},
"$or": [
{"acquired_until": {"$exists": False}},
{"acquired_until": {"$lt": now.timestamp()}},
],
},
{
"$set": {
"acquired_by": scheduler_id,
**marshal_timestamp(
acquired_until, "acquired_until"
),
}
},
)
)

return schedules
# If the number of modified schedules was smaller than expected,
# manually check which ones were successfully acquired
if result.modified_count != len(schedule_ids):
async with await AsyncCursor.create(
lambda: self._schedules.find(
{
"_id": {"$in": schedule_ids},
"acquired_by": scheduler_id,
},
sort=[("created_at", ASCENDING)],
projection=["_id"],
session=session,
)
) as cursor:
acquired_schedule_ids = {doc["_id"] async for doc in cursor}
documents = [
doc
for doc in documents
if doc["_id"] in acquired_schedule_ids
]

for doc in documents:
# Deserialize the schedule
doc["id"] = doc.pop("_id")
unmarshal_timestamps(doc)
schedules.append(Schedule.unmarshal(self.serializer, doc))

return schedules

async def release_schedules(
self, scheduler_id: str, results: Sequence[ScheduleResult]
Expand Down

0 comments on commit 4758e8f

Please sign in to comment.