Skip to content

Commit

Permalink
PYTHON-4494 - AsyncMongoClient._cleanup_cursor needs to be synchronous (
Browse files Browse the repository at this point in the history
  • Loading branch information
NoahStapp committed Jun 17, 2024
1 parent bba5f81 commit 1f910b5
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 100 deletions.
6 changes: 6 additions & 0 deletions pymongo/asynchronous/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@ async def _end_session(self, lock: bool) -> None:
self._client._return_server_session(self._server_session)
self._server_session = None

def _end_implicit_session(self) -> None:
# Implicit sessions can't be part of transactions or pinned connections
if self._server_session is not None:
self._client._return_server_session(self._server_session)
self._server_session = None

def _check_ended(self) -> None:
if self._server_session is None:
raise InvalidOperation("Cannot use ended session")
Expand Down
38 changes: 25 additions & 13 deletions pymongo/asynchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def __init__(
self._explicit_session = explicit_session
self._killed = self._id == 0
self._comment = comment
if _IS_SYNC and self._killed:
self._end_session(True) # type: ignore[unused-coroutine]
if self._killed:
self._end_session()

if "ns" in cursor_info: # noqa: SIM401
self._ns = cursor_info["ns"]
Expand All @@ -95,8 +95,7 @@ def __init__(
raise TypeError("max_await_time_ms must be an integer or None")

def __del__(self) -> None:
if _IS_SYNC:
self._die(False) # type: ignore[unused-coroutine]
self._die_no_lock()

def batch_size(self, batch_size: int) -> AsyncCommandCursor[_DocumentType]:
"""Limits the number of documents returned in one batch. Each batch
Expand Down Expand Up @@ -198,8 +197,7 @@ def session(self) -> Optional[ClientSession]:
return self._session
return None

async def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
already_killed = self._killed
self._killed = True
if self._id and not already_killed:
Expand All @@ -210,8 +208,22 @@ async def _die(self, synchronous: bool = False) -> None:
# Skip killCursors.
cursor_id = 0
address = None
await self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address

def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None

async def _die_lock(self) -> None:
"""Closes this cursor."""
cursor_id, address = self._prepare_to_die()
await self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
Expand All @@ -222,14 +234,14 @@ async def _die(self, synchronous: bool = False) -> None:
self._session = None
self._sock_mgr = None

async def _end_session(self, synchronous: bool) -> None:
def _end_session(self) -> None:
if self._session and not self._explicit_session:
await self._session._end_session(lock=synchronous)
self._session._end_implicit_session()
self._session = None

async def close(self) -> None:
"""Explicitly close / kill this cursor."""
await self._die(True)
await self._die_lock()

async def _send_message(self, operation: _GetMore) -> None:
"""Send a getmore message and handle the response."""
Expand All @@ -243,7 +255,7 @@ async def _send_message(self, operation: _GetMore) -> None:
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
await self._die(False)
self._die_no_lock()
else:
# Return the session and pinned connection, if necessary.
await self.close()
Expand Down Expand Up @@ -305,7 +317,7 @@ async def _refresh(self) -> int:
)
)
else: # Cursor id is zero nothing else to return
await self._die(True)
await self._die_lock()

return len(self._data)

Expand Down
46 changes: 32 additions & 14 deletions pymongo/asynchronous/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ def retrieved(self) -> int:
return self._retrieved

def __del__(self) -> None:
if _IS_SYNC:
self._die() # type: ignore[unused-coroutine]
self._die_no_lock()

def clone(self) -> AsyncCursor[_DocumentType]:
"""Get a clone of this cursor.
Expand Down Expand Up @@ -996,14 +995,7 @@ def _deepcopy(
y[key] = value
return y

async def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return

def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
self._killed = True
if self._id and not already_killed:
cursor_id = self._id
Expand All @@ -1013,8 +1005,34 @@ async def _die(self, synchronous: bool = False) -> None:
# Skip killCursors.
cursor_id = 0
address = None
await self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address

def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return

cursor_id, address = self._prepare_to_die(already_killed)
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None

async def _die_lock(self) -> None:
"""Closes this cursor."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return

cursor_id, address = self._prepare_to_die(already_killed)
await self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
Expand All @@ -1027,7 +1045,7 @@ async def _die(self, synchronous: bool = False) -> None:

async def close(self) -> None:
"""Explicitly close / kill this cursor."""
await self._die(True)
await self._die_lock()

async def distinct(self, key: str) -> list:
"""Get a list of distinct values for `key` among all documents
Expand Down Expand Up @@ -1080,7 +1098,7 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None:
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
await self._die(False)
self._die_no_lock()
else:
await self.close()
# If this is a tailable cursor the error is likely
Expand Down
63 changes: 39 additions & 24 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1857,48 +1857,63 @@ async def _retryable_write(
async with self._tmp_session(session) as s:
return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id)

async def _cleanup_cursor(
def _cleanup_cursor_no_lock(
self,
locks_allowed: bool,
cursor_id: int,
address: Optional[_CursorAddress],
conn_mgr: _ConnectionManager,
session: Optional[ClientSession],
explicit_session: bool,
) -> None:
"""Cleanup a cursor from cursor.close() or __del__.
"""Cleanup a cursor from __del__ without locking.
This method handles cleanup for Cursors/CommandCursors including any
pinned connection attached at the time the cursor
was garbage collected.
:param cursor_id: The cursor id which may be 0.
:param address: The _CursorAddress.
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
"""
# The cursor will be closed later in a different session.
if cursor_id or conn_mgr:
self._close_cursor_soon(cursor_id, address, conn_mgr)
if session and not explicit_session:
session._end_implicit_session()

async def _cleanup_cursor_lock(
self,
cursor_id: int,
address: Optional[_CursorAddress],
conn_mgr: _ConnectionManager,
session: Optional[ClientSession],
explicit_session: bool,
) -> None:
"""Cleanup a cursor from cursor.close() using a lock.
This method handles cleanup for Cursors/CommandCursors including any
pinned connection or implicit session attached at the time the cursor
was closed or garbage collected.
:param locks_allowed: True if we are allowed to acquire locks.
:param cursor_id: The cursor id which may be 0.
:param address: The _CursorAddress.
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
:param session: The cursor's session.
:param explicit_session: True if the session was passed explicitly.
"""
if locks_allowed:
if cursor_id:
if conn_mgr and conn_mgr.more_to_come:
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
assert conn_mgr.conn is not None
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
else:
await self._close_cursor_now(
cursor_id, address, session=session, conn_mgr=conn_mgr
)
if conn_mgr:
await conn_mgr.close()
else:
# The cursor will be closed later in a different session.
if cursor_id or conn_mgr:
self._close_cursor_soon(cursor_id, address, conn_mgr)
if cursor_id:
if conn_mgr and conn_mgr.more_to_come:
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
assert conn_mgr.conn is not None
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
else:
await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr)
if conn_mgr:
await conn_mgr.close()
if session and not explicit_session:
await session._end_session(lock=locks_allowed)
session._end_implicit_session()

async def _close_cursor_now(
self,
Expand Down Expand Up @@ -1978,7 +1993,7 @@ async def _process_kill_cursors(self) -> None:

for address, cursor_id, conn_mgr in pinned_cursors:
try:
await self._cleanup_cursor(True, cursor_id, address, conn_mgr, None, False)
await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False)
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
# Raise the exception when client is closed so that it
Expand Down
6 changes: 6 additions & 0 deletions pymongo/synchronous/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@ def _end_session(self, lock: bool) -> None:
self._client._return_server_session(self._server_session)
self._server_session = None

def _end_implicit_session(self) -> None:
# Implicit sessions can't be part of transactions or pinned connections
if self._server_session is not None:
self._client._return_server_session(self._server_session)
self._server_session = None

def _check_ended(self) -> None:
if self._server_session is None:
raise InvalidOperation("Cannot use ended session")
Expand Down
38 changes: 25 additions & 13 deletions pymongo/synchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def __init__(
self._explicit_session = explicit_session
self._killed = self._id == 0
self._comment = comment
if _IS_SYNC and self._killed:
self._end_session(True) # type: ignore[unused-coroutine]
if self._killed:
self._end_session()

if "ns" in cursor_info: # noqa: SIM401
self._ns = cursor_info["ns"]
Expand All @@ -95,8 +95,7 @@ def __init__(
raise TypeError("max_await_time_ms must be an integer or None")

def __del__(self) -> None:
if _IS_SYNC:
self._die(False) # type: ignore[unused-coroutine]
self._die_no_lock()

def batch_size(self, batch_size: int) -> CommandCursor[_DocumentType]:
"""Limits the number of documents returned in one batch. Each batch
Expand Down Expand Up @@ -198,8 +197,7 @@ def session(self) -> Optional[ClientSession]:
return self._session
return None

def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
already_killed = self._killed
self._killed = True
if self._id and not already_killed:
Expand All @@ -210,8 +208,22 @@ def _die(self, synchronous: bool = False) -> None:
# Skip killCursors.
cursor_id = 0
address = None
self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address

def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None

def _die_lock(self) -> None:
"""Closes this cursor."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
Expand All @@ -222,14 +234,14 @@ def _die(self, synchronous: bool = False) -> None:
self._session = None
self._sock_mgr = None

def _end_session(self, synchronous: bool) -> None:
def _end_session(self) -> None:
if self._session and not self._explicit_session:
self._session._end_session(lock=synchronous)
self._session._end_implicit_session()
self._session = None

def close(self) -> None:
"""Explicitly close / kill this cursor."""
self._die(True)
self._die_lock()

def _send_message(self, operation: _GetMore) -> None:
"""Send a getmore message and handle the response."""
Expand All @@ -243,7 +255,7 @@ def _send_message(self, operation: _GetMore) -> None:
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
self._die(False)
self._die_no_lock()
else:
# Return the session and pinned connection, if necessary.
self.close()
Expand Down Expand Up @@ -305,7 +317,7 @@ def _refresh(self) -> int:
)
)
else: # Cursor id is zero nothing else to return
self._die(True)
self._die_lock()

return len(self._data)

Expand Down
Loading

0 comments on commit 1f910b5

Please sign in to comment.