Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PYTHON-4494 - AsyncMongoClient._cleanup_cursor needs to be synchronous #1680

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pymongo/asynchronous/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@ async def _end_session(self, lock: bool) -> None:
self._client._return_server_session(self._server_session)
self._server_session = None

def _end_implicit_session(self) -> None:
# Implicit sessions can't be part of transactions or pinned connections
if self._server_session is not None:
self._client._return_server_session(self._server_session)
self._server_session = None

def _check_ended(self) -> None:
if self._server_session is None:
raise InvalidOperation("Cannot use ended session")
Expand Down
38 changes: 25 additions & 13 deletions pymongo/asynchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def __init__(
self._explicit_session = explicit_session
self._killed = self._id == 0
self._comment = comment
if _IS_SYNC and self._killed:
self._end_session(True) # type: ignore[unused-coroutine]
if self._killed:
self._end_session()

if "ns" in cursor_info: # noqa: SIM401
self._ns = cursor_info["ns"]
Expand All @@ -95,8 +95,7 @@ def __init__(
raise TypeError("max_await_time_ms must be an integer or None")

def __del__(self) -> None:
if _IS_SYNC:
self._die(False) # type: ignore[unused-coroutine]
self._die_no_lock()

def batch_size(self, batch_size: int) -> AsyncCommandCursor[_DocumentType]:
"""Limits the number of documents returned in one batch. Each batch
Expand Down Expand Up @@ -198,8 +197,7 @@ def session(self) -> Optional[ClientSession]:
return self._session
return None

async def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
already_killed = self._killed
self._killed = True
if self._id and not already_killed:
Expand All @@ -210,8 +208,22 @@ async def _die(self, synchronous: bool = False) -> None:
# Skip killCursors.
cursor_id = 0
address = None
await self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address

def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None

async def _die_lock(self) -> None:
"""Closes this cursor."""
cursor_id, address = self._prepare_to_die()
await self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
Expand All @@ -222,14 +234,14 @@ async def _die(self, synchronous: bool = False) -> None:
self._session = None
self._sock_mgr = None

async def _end_session(self, synchronous: bool) -> None:
def _end_session(self) -> None:
if self._session and not self._explicit_session:
await self._session._end_session(lock=synchronous)
self._session._end_implicit_session()
self._session = None

async def close(self) -> None:
"""Explicitly close / kill this cursor."""
await self._die(True)
await self._die_lock()

async def _send_message(self, operation: _GetMore) -> None:
"""Send a getmore message and handle the response."""
Expand All @@ -243,7 +255,7 @@ async def _send_message(self, operation: _GetMore) -> None:
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
await self._die(False)
self._die_no_lock()
else:
# Return the session and pinned connection, if necessary.
await self.close()
Expand Down Expand Up @@ -305,7 +317,7 @@ async def _refresh(self) -> int:
)
)
else: # Cursor id is zero nothing else to return
await self._die(True)
await self._die_lock()

return len(self._data)

Expand Down
46 changes: 32 additions & 14 deletions pymongo/asynchronous/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ def retrieved(self) -> int:
return self._retrieved

def __del__(self) -> None:
if _IS_SYNC:
self._die() # type: ignore[unused-coroutine]
self._die_no_lock()

def clone(self) -> AsyncCursor[_DocumentType]:
"""Get a clone of this cursor.
Expand Down Expand Up @@ -996,14 +995,7 @@ def _deepcopy(
y[key] = value
return y

async def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return

def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
self._killed = True
if self._id and not already_killed:
cursor_id = self._id
Expand All @@ -1013,8 +1005,34 @@ async def _die(self, synchronous: bool = False) -> None:
# Skip killCursors.
cursor_id = 0
address = None
await self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address

def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return

cursor_id, address = self._prepare_to_die(already_killed)
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None

async def _die_lock(self) -> None:
"""Closes this cursor."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return

cursor_id, address = self._prepare_to_die(already_killed)
await self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
Expand All @@ -1027,7 +1045,7 @@ async def _die(self, synchronous: bool = False) -> None:

async def close(self) -> None:
"""Explicitly close / kill this cursor."""
await self._die(True)
await self._die_lock()

async def distinct(self, key: str) -> list:
"""Get a list of distinct values for `key` among all documents
Expand Down Expand Up @@ -1080,7 +1098,7 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None:
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
await self._die(False)
self._die_no_lock()
else:
await self.close()
# If this is a tailable cursor the error is likely
Expand Down
63 changes: 39 additions & 24 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1856,48 +1856,63 @@ async def _retryable_write(
async with self._tmp_session(session) as s:
return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id)

async def _cleanup_cursor(
def _cleanup_cursor_no_lock(
self,
locks_allowed: bool,
cursor_id: int,
address: Optional[_CursorAddress],
conn_mgr: _ConnectionManager,
session: Optional[ClientSession],
explicit_session: bool,
) -> None:
"""Cleanup a cursor from cursor.close() or __del__.
"""Cleanup a cursor from __del__ without locking.

This method handles cleanup for Cursors/CommandCursors including any
pinned connection attached at the time the cursor
was garbage collected.

:param cursor_id: The cursor id which may be 0.
:param address: The _CursorAddress.
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
"""
# The cursor will be closed later in a different session.
if cursor_id or conn_mgr:
self._close_cursor_soon(cursor_id, address, conn_mgr)
if session and not explicit_session:
session._end_implicit_session()

async def _cleanup_cursor_lock(
self,
cursor_id: int,
address: Optional[_CursorAddress],
conn_mgr: _ConnectionManager,
session: Optional[ClientSession],
explicit_session: bool,
) -> None:
"""Cleanup a cursor from cursor.close() using a lock.

This method handles cleanup for Cursors/CommandCursors including any
pinned connection or implicit session attached at the time the cursor
was closed or garbage collected.

:param locks_allowed: True if we are allowed to acquire locks.
:param cursor_id: The cursor id which may be 0.
:param address: The _CursorAddress.
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
:param session: The cursor's session.
:param explicit_session: True if the session was passed explicitly.
"""
if locks_allowed:
if cursor_id:
if conn_mgr and conn_mgr.more_to_come:
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
assert conn_mgr.conn is not None
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
else:
await self._close_cursor_now(
cursor_id, address, session=session, conn_mgr=conn_mgr
)
if conn_mgr:
await conn_mgr.close()
else:
# The cursor will be closed later in a different session.
if cursor_id or conn_mgr:
self._close_cursor_soon(cursor_id, address, conn_mgr)
if cursor_id:
if conn_mgr and conn_mgr.more_to_come:
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
assert conn_mgr.conn is not None
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
else:
await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr)
if conn_mgr:
await conn_mgr.close()
if session and not explicit_session:
await session._end_session(lock=locks_allowed)
session._end_implicit_session()

async def _close_cursor_now(
self,
Expand Down Expand Up @@ -1977,7 +1992,7 @@ async def _process_kill_cursors(self) -> None:

for address, cursor_id, conn_mgr in pinned_cursors:
try:
await self._cleanup_cursor(True, cursor_id, address, conn_mgr, None, False)
await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False)
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
# Raise the exception when client is closed so that it
Expand Down
6 changes: 6 additions & 0 deletions pymongo/synchronous/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@ def _end_session(self, lock: bool) -> None:
self._client._return_server_session(self._server_session)
self._server_session = None

def _end_implicit_session(self) -> None:
# Implicit sessions can't be part of transactions or pinned connections
if self._server_session is not None:
self._client._return_server_session(self._server_session)
self._server_session = None

def _check_ended(self) -> None:
if self._server_session is None:
raise InvalidOperation("Cannot use ended session")
Expand Down
38 changes: 25 additions & 13 deletions pymongo/synchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def __init__(
self._explicit_session = explicit_session
self._killed = self._id == 0
self._comment = comment
if _IS_SYNC and self._killed:
self._end_session(True) # type: ignore[unused-coroutine]
if self._killed:
self._end_session()

if "ns" in cursor_info: # noqa: SIM401
self._ns = cursor_info["ns"]
Expand All @@ -95,8 +95,7 @@ def __init__(
raise TypeError("max_await_time_ms must be an integer or None")

def __del__(self) -> None:
if _IS_SYNC:
self._die(False) # type: ignore[unused-coroutine]
self._die_no_lock()

def batch_size(self, batch_size: int) -> CommandCursor[_DocumentType]:
"""Limits the number of documents returned in one batch. Each batch
Expand Down Expand Up @@ -198,8 +197,7 @@ def session(self) -> Optional[ClientSession]:
return self._session
return None

def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
already_killed = self._killed
self._killed = True
if self._id and not already_killed:
Expand All @@ -210,8 +208,22 @@ def _die(self, synchronous: bool = False) -> None:
# Skip killCursors.
cursor_id = 0
address = None
self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address

def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None

def _die_lock(self) -> None:
"""Closes this cursor."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
Expand All @@ -222,14 +234,14 @@ def _die(self, synchronous: bool = False) -> None:
self._session = None
self._sock_mgr = None

def _end_session(self, synchronous: bool) -> None:
def _end_session(self) -> None:
if self._session and not self._explicit_session:
self._session._end_session(lock=synchronous)
self._session._end_implicit_session()
self._session = None

def close(self) -> None:
"""Explicitly close / kill this cursor."""
self._die(True)
self._die_lock()

def _send_message(self, operation: _GetMore) -> None:
"""Send a getmore message and handle the response."""
Expand All @@ -243,7 +255,7 @@ def _send_message(self, operation: _GetMore) -> None:
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
self._die(False)
self._die_no_lock()
else:
# Return the session and pinned connection, if necessary.
self.close()
Expand Down Expand Up @@ -305,7 +317,7 @@ def _refresh(self) -> int:
)
)
else: # Cursor id is zero nothing else to return
self._die(True)
self._die_lock()

return len(self._data)

Expand Down
Loading
Loading