diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 62d5ed29a0..75c6bad7cf 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -531,6 +531,12 @@ async def _end_session(self, lock: bool) -> None: self._client._return_server_session(self._server_session) self._server_session = None + def _end_implicit_session(self) -> None: + # Implicit sessions can't be part of transactions or pinned connections + if self._server_session is not None: + self._client._return_server_session(self._server_session) + self._server_session = None + def _check_ended(self) -> None: if self._server_session is None: raise InvalidOperation("Cannot use ended session") diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index 0412264e20..000df160d4 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -81,8 +81,8 @@ def __init__( self._explicit_session = explicit_session self._killed = self._id == 0 self._comment = comment - if _IS_SYNC and self._killed: - self._end_session(True) # type: ignore[unused-coroutine] + if self._killed: + self._end_session() if "ns" in cursor_info: # noqa: SIM401 self._ns = cursor_info["ns"] @@ -95,8 +95,7 @@ def __init__( raise TypeError("max_await_time_ms must be an integer or None") def __del__(self) -> None: - if _IS_SYNC: - self._die(False) # type: ignore[unused-coroutine] + self._die_no_lock() def batch_size(self, batch_size: int) -> AsyncCommandCursor[_DocumentType]: """Limits the number of documents returned in one batch. Each batch @@ -198,8 +197,7 @@ def session(self) -> Optional[ClientSession]: return self._session return None - async def _die(self, synchronous: bool = False) -> None: - """Closes this cursor.""" + def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]: already_killed = self._killed self._killed = True if self._id and not already_killed: @@ -210,8 +208,22 @@ async def _die(self, synchronous: bool = False) -> None: # Skip killCursors. cursor_id = 0 address = None - await self._collection.database.client._cleanup_cursor( - synchronous, + return cursor_id, address + + def _die_no_lock(self) -> None: + """Closes this cursor without acquiring a lock.""" + cursor_id, address = self._prepare_to_die() + self._collection.database.client._cleanup_cursor_no_lock( + cursor_id, address, self._sock_mgr, self._session, self._explicit_session + ) + if not self._explicit_session: + self._session = None + self._sock_mgr = None + + async def _die_lock(self) -> None: + """Closes this cursor.""" + cursor_id, address = self._prepare_to_die() + await self._collection.database.client._cleanup_cursor_lock( cursor_id, address, self._sock_mgr, @@ -222,14 +234,14 @@ async def _die(self, synchronous: bool = False) -> None: self._session = None self._sock_mgr = None - async def _end_session(self, synchronous: bool) -> None: + def _end_session(self) -> None: if self._session and not self._explicit_session: - await self._session._end_session(lock=synchronous) + self._session._end_implicit_session() self._session = None async def close(self) -> None: """Explicitly close / kill this cursor.""" - await self._die(True) + await self._die_lock() async def _send_message(self, operation: _GetMore) -> None: """Send a getmore message and handle the response.""" @@ -243,7 +255,7 @@ async def _send_message(self, operation: _GetMore) -> None: # Don't send killCursors because the cursor is already closed. self._killed = True if exc.timeout: - await self._die(False) + self._die_no_lock() else: # Return the session and pinned connection, if necessary. await self.close() @@ -305,7 +317,7 @@ async def _refresh(self) -> int: ) ) else: # Cursor id is zero nothing else to return - await self._die(True) + await self._die_lock() return len(self._data) diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index 4edd2103fd..5b4771bf8f 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -259,8 +259,7 @@ def retrieved(self) -> int: return self._retrieved def __del__(self) -> None: - if _IS_SYNC: - self._die() # type: ignore[unused-coroutine] + self._die_no_lock() def clone(self) -> AsyncCursor[_DocumentType]: """Get a clone of this cursor. @@ -996,14 +995,7 @@ def _deepcopy( y[key] = value return y - async def _die(self, synchronous: bool = False) -> None: - """Closes this cursor.""" - try: - already_killed = self._killed - except AttributeError: - # ___init__ did not run to completion (or at all). - return - + def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]: self._killed = True if self._id and not already_killed: cursor_id = self._id @@ -1013,8 +1005,34 @@ async def _die(self, synchronous: bool = False) -> None: # Skip killCursors. cursor_id = 0 address = None - await self._collection.database.client._cleanup_cursor( - synchronous, + return cursor_id, address + + def _die_no_lock(self) -> None: + """Closes this cursor without acquiring a lock.""" + try: + already_killed = self._killed + except AttributeError: + # ___init__ did not run to completion (or at all). + return + + cursor_id, address = self._prepare_to_die(already_killed) + self._collection.database.client._cleanup_cursor_no_lock( + cursor_id, address, self._sock_mgr, self._session, self._explicit_session + ) + if not self._explicit_session: + self._session = None + self._sock_mgr = None + + async def _die_lock(self) -> None: + """Closes this cursor.""" + try: + already_killed = self._killed + except AttributeError: + # ___init__ did not run to completion (or at all). + return + + cursor_id, address = self._prepare_to_die(already_killed) + await self._collection.database.client._cleanup_cursor_lock( cursor_id, address, self._sock_mgr, @@ -1027,7 +1045,7 @@ async def _die(self, synchronous: bool = False) -> None: async def close(self) -> None: """Explicitly close / kill this cursor.""" - await self._die(True) + await self._die_lock() async def distinct(self, key: str) -> list: """Get a list of distinct values for `key` among all documents @@ -1080,7 +1098,7 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None: # Don't send killCursors because the cursor is already closed. self._killed = True if exc.timeout: - await self._die(False) + self._die_no_lock() else: await self.close() # If this is a tailable cursor the error is likely diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index ae1a630096..70d1b8cb42 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -1856,48 +1856,63 @@ async def _retryable_write( async with self._tmp_session(session) as s: return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id) - async def _cleanup_cursor( + def _cleanup_cursor_no_lock( self, - locks_allowed: bool, cursor_id: int, address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[ClientSession], explicit_session: bool, ) -> None: - """Cleanup a cursor from cursor.close() or __del__. + """Cleanup a cursor from __del__ without locking. + + This method handles cleanup for Cursors/CommandCursors including any + pinned connection attached at the time the cursor + was garbage collected. + + :param cursor_id: The cursor id which may be 0. + :param address: The _CursorAddress. + :param conn_mgr: The _ConnectionManager for the pinned connection or None. + """ + # The cursor will be closed later in a different session. + if cursor_id or conn_mgr: + self._close_cursor_soon(cursor_id, address, conn_mgr) + if session and not explicit_session: + session._end_implicit_session() + + async def _cleanup_cursor_lock( + self, + cursor_id: int, + address: Optional[_CursorAddress], + conn_mgr: _ConnectionManager, + session: Optional[ClientSession], + explicit_session: bool, + ) -> None: + """Cleanup a cursor from cursor.close() using a lock. This method handles cleanup for Cursors/CommandCursors including any pinned connection or implicit session attached at the time the cursor was closed or garbage collected. - :param locks_allowed: True if we are allowed to acquire locks. :param cursor_id: The cursor id which may be 0. :param address: The _CursorAddress. :param conn_mgr: The _ConnectionManager for the pinned connection or None. :param session: The cursor's session. :param explicit_session: True if the session was passed explicitly. """ - if locks_allowed: - if cursor_id: - if conn_mgr and conn_mgr.more_to_come: - # If this is an exhaust cursor and we haven't completely - # exhausted the result set we *must* close the socket - # to stop the server from sending more data. - assert conn_mgr.conn is not None - conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR) - else: - await self._close_cursor_now( - cursor_id, address, session=session, conn_mgr=conn_mgr - ) - if conn_mgr: - await conn_mgr.close() - else: - # The cursor will be closed later in a different session. - if cursor_id or conn_mgr: - self._close_cursor_soon(cursor_id, address, conn_mgr) + if cursor_id: + if conn_mgr and conn_mgr.more_to_come: + # If this is an exhaust cursor and we haven't completely + # exhausted the result set we *must* close the socket + # to stop the server from sending more data. + assert conn_mgr.conn is not None + conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR) + else: + await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) + if conn_mgr: + await conn_mgr.close() if session and not explicit_session: - await session._end_session(lock=locks_allowed) + session._end_implicit_session() async def _close_cursor_now( self, @@ -1977,7 +1992,7 @@ async def _process_kill_cursors(self) -> None: for address, cursor_id, conn_mgr in pinned_cursors: try: - await self._cleanup_cursor(True, cursor_id, address, conn_mgr, None, False) + await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False) except Exception as exc: if isinstance(exc, InvalidOperation) and self._topology._closed: # Raise the exception when client is closed so that it diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 71af41aa8a..6489dcd27b 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -531,6 +531,12 @@ def _end_session(self, lock: bool) -> None: self._client._return_server_session(self._server_session) self._server_session = None + def _end_implicit_session(self) -> None: + # Implicit sessions can't be part of transactions or pinned connections + if self._server_session is not None: + self._client._return_server_session(self._server_session) + self._server_session = None + def _check_ended(self) -> None: if self._server_session is None: raise InvalidOperation("Cannot use ended session") diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index a2a5d8b192..acd658d69a 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -81,8 +81,8 @@ def __init__( self._explicit_session = explicit_session self._killed = self._id == 0 self._comment = comment - if _IS_SYNC and self._killed: - self._end_session(True) # type: ignore[unused-coroutine] + if self._killed: + self._end_session() if "ns" in cursor_info: # noqa: SIM401 self._ns = cursor_info["ns"] @@ -95,8 +95,7 @@ def __init__( raise TypeError("max_await_time_ms must be an integer or None") def __del__(self) -> None: - if _IS_SYNC: - self._die(False) # type: ignore[unused-coroutine] + self._die_no_lock() def batch_size(self, batch_size: int) -> CommandCursor[_DocumentType]: """Limits the number of documents returned in one batch. Each batch @@ -198,8 +197,7 @@ def session(self) -> Optional[ClientSession]: return self._session return None - def _die(self, synchronous: bool = False) -> None: - """Closes this cursor.""" + def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]: already_killed = self._killed self._killed = True if self._id and not already_killed: @@ -210,8 +208,22 @@ def _die(self, synchronous: bool = False) -> None: # Skip killCursors. cursor_id = 0 address = None - self._collection.database.client._cleanup_cursor( - synchronous, + return cursor_id, address + + def _die_no_lock(self) -> None: + """Closes this cursor without acquiring a lock.""" + cursor_id, address = self._prepare_to_die() + self._collection.database.client._cleanup_cursor_no_lock( + cursor_id, address, self._sock_mgr, self._session, self._explicit_session + ) + if not self._explicit_session: + self._session = None + self._sock_mgr = None + + def _die_lock(self) -> None: + """Closes this cursor.""" + cursor_id, address = self._prepare_to_die() + self._collection.database.client._cleanup_cursor_lock( cursor_id, address, self._sock_mgr, @@ -222,14 +234,14 @@ def _die(self, synchronous: bool = False) -> None: self._session = None self._sock_mgr = None - def _end_session(self, synchronous: bool) -> None: + def _end_session(self) -> None: if self._session and not self._explicit_session: - self._session._end_session(lock=synchronous) + self._session._end_implicit_session() self._session = None def close(self) -> None: """Explicitly close / kill this cursor.""" - self._die(True) + self._die_lock() def _send_message(self, operation: _GetMore) -> None: """Send a getmore message and handle the response.""" @@ -243,7 +255,7 @@ def _send_message(self, operation: _GetMore) -> None: # Don't send killCursors because the cursor is already closed. self._killed = True if exc.timeout: - self._die(False) + self._die_no_lock() else: # Return the session and pinned connection, if necessary. self.close() @@ -305,7 +317,7 @@ def _refresh(self) -> int: ) ) else: # Cursor id is zero nothing else to return - self._die(True) + self._die_lock() return len(self._data) diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index b74266a74e..121cee8106 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -259,8 +259,7 @@ def retrieved(self) -> int: return self._retrieved def __del__(self) -> None: - if _IS_SYNC: - self._die() # type: ignore[unused-coroutine] + self._die_no_lock() def clone(self) -> Cursor[_DocumentType]: """Get a clone of this cursor. @@ -994,14 +993,7 @@ def _deepcopy( y[key] = value return y - def _die(self, synchronous: bool = False) -> None: - """Closes this cursor.""" - try: - already_killed = self._killed - except AttributeError: - # ___init__ did not run to completion (or at all). - return - + def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]: self._killed = True if self._id and not already_killed: cursor_id = self._id @@ -1011,8 +1003,34 @@ def _die(self, synchronous: bool = False) -> None: # Skip killCursors. cursor_id = 0 address = None - self._collection.database.client._cleanup_cursor( - synchronous, + return cursor_id, address + + def _die_no_lock(self) -> None: + """Closes this cursor without acquiring a lock.""" + try: + already_killed = self._killed + except AttributeError: + # ___init__ did not run to completion (or at all). + return + + cursor_id, address = self._prepare_to_die(already_killed) + self._collection.database.client._cleanup_cursor_no_lock( + cursor_id, address, self._sock_mgr, self._session, self._explicit_session + ) + if not self._explicit_session: + self._session = None + self._sock_mgr = None + + def _die_lock(self) -> None: + """Closes this cursor.""" + try: + already_killed = self._killed + except AttributeError: + # ___init__ did not run to completion (or at all). + return + + cursor_id, address = self._prepare_to_die(already_killed) + self._collection.database.client._cleanup_cursor_lock( cursor_id, address, self._sock_mgr, @@ -1025,7 +1043,7 @@ def _die(self, synchronous: bool = False) -> None: def close(self) -> None: """Explicitly close / kill this cursor.""" - self._die(True) + self._die_lock() def distinct(self, key: str) -> list: """Get a list of distinct values for `key` among all documents @@ -1078,7 +1096,7 @@ def _send_message(self, operation: Union[_Query, _GetMore]) -> None: # Don't send killCursors because the cursor is already closed. self._killed = True if exc.timeout: - self._die(False) + self._die_no_lock() else: self.close() # If this is a tailable cursor the error is likely diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 5c6efe15d3..19eaf1347a 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -1853,46 +1853,63 @@ def _retryable_write( with self._tmp_session(session) as s: return self._retry_with_session(retryable, func, s, bulk, operation, operation_id) - def _cleanup_cursor( + def _cleanup_cursor_no_lock( self, - locks_allowed: bool, cursor_id: int, address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[ClientSession], explicit_session: bool, ) -> None: - """Cleanup a cursor from cursor.close() or __del__. + """Cleanup a cursor from __del__ without locking. + + This method handles cleanup for Cursors/CommandCursors including any + pinned connection attached at the time the cursor + was garbage collected. + + :param cursor_id: The cursor id which may be 0. + :param address: The _CursorAddress. + :param conn_mgr: The _ConnectionManager for the pinned connection or None. + """ + # The cursor will be closed later in a different session. + if cursor_id or conn_mgr: + self._close_cursor_soon(cursor_id, address, conn_mgr) + if session and not explicit_session: + session._end_implicit_session() + + def _cleanup_cursor_lock( + self, + cursor_id: int, + address: Optional[_CursorAddress], + conn_mgr: _ConnectionManager, + session: Optional[ClientSession], + explicit_session: bool, + ) -> None: + """Cleanup a cursor from cursor.close() using a lock. This method handles cleanup for Cursors/CommandCursors including any pinned connection or implicit session attached at the time the cursor was closed or garbage collected. - :param locks_allowed: True if we are allowed to acquire locks. :param cursor_id: The cursor id which may be 0. :param address: The _CursorAddress. :param conn_mgr: The _ConnectionManager for the pinned connection or None. :param session: The cursor's session. :param explicit_session: True if the session was passed explicitly. """ - if locks_allowed: - if cursor_id: - if conn_mgr and conn_mgr.more_to_come: - # If this is an exhaust cursor and we haven't completely - # exhausted the result set we *must* close the socket - # to stop the server from sending more data. - assert conn_mgr.conn is not None - conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR) - else: - self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) - if conn_mgr: - conn_mgr.close() - else: - # The cursor will be closed later in a different session. - if cursor_id or conn_mgr: - self._close_cursor_soon(cursor_id, address, conn_mgr) + if cursor_id: + if conn_mgr and conn_mgr.more_to_come: + # If this is an exhaust cursor and we haven't completely + # exhausted the result set we *must* close the socket + # to stop the server from sending more data. + assert conn_mgr.conn is not None + conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR) + else: + self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) + if conn_mgr: + conn_mgr.close() if session and not explicit_session: - session._end_session(lock=locks_allowed) + session._end_implicit_session() def _close_cursor_now( self, @@ -1972,7 +1989,7 @@ def _process_kill_cursors(self) -> None: for address, cursor_id, conn_mgr in pinned_cursors: try: - self._cleanup_cursor(True, cursor_id, address, conn_mgr, None, False) + self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False) except Exception as exc: if isinstance(exc, InvalidOperation) and self._topology._closed: # Raise the exception when client is closed so that it