diff --git a/benchmarks/mempool.py b/benchmarks/mempool.py index 335779000b24..6f397007f982 100644 --- a/benchmarks/mempool.py +++ b/benchmarks/mempool.py @@ -21,6 +21,7 @@ from chia.types.spend_bundle import SpendBundle from chia.util.batches import to_batches from chia.util.ints import uint32, uint64 +from chia.util.task_referencer import create_referenced_task NUM_ITERS = 200 NUM_PEERS = 5 @@ -189,7 +190,7 @@ async def add_spend_bundles(spend_bundles: list[SpendBundle]) -> None: start = monotonic() for peer in range(NUM_PEERS): total_bundles += len(large_spend_bundles[peer]) - tasks.append(asyncio.create_task(add_spend_bundles(large_spend_bundles[peer]))) + tasks.append(create_referenced_task(add_spend_bundles(large_spend_bundles[peer]))) await asyncio.gather(*tasks) stop = monotonic() print(f" time: {stop - start:0.4f}s") @@ -208,7 +209,7 @@ async def add_spend_bundles(spend_bundles: list[SpendBundle]) -> None: start = monotonic() for peer in range(NUM_PEERS): total_bundles += len(spend_bundles[peer]) - tasks.append(asyncio.create_task(add_spend_bundles(spend_bundles[peer]))) + tasks.append(create_referenced_task(add_spend_bundles(spend_bundles[peer]))) await asyncio.gather(*tasks) stop = monotonic() print(f" time: {stop - start:0.4f}s") @@ -221,7 +222,7 @@ async def add_spend_bundles(spend_bundles: list[SpendBundle]) -> None: start = monotonic() for peer in range(NUM_PEERS): total_bundles += len(replacement_spend_bundles[peer]) - tasks.append(asyncio.create_task(add_spend_bundles(replacement_spend_bundles[peer]))) + tasks.append(create_referenced_task(add_spend_bundles(replacement_spend_bundles[peer]))) await asyncio.gather(*tasks) stop = monotonic() print(f" time: {stop - start:0.4f}s") diff --git a/chia/_tests/core/data_layer/test_data_rpc.py b/chia/_tests/core/data_layer/test_data_rpc.py index ed7cf620039c..46e5dd5a701b 100644 --- a/chia/_tests/core/data_layer/test_data_rpc.py +++ b/chia/_tests/core/data_layer/test_data_rpc.py @@ -67,6 +67,7 @@ from chia.util.hash import std_hash from chia.util.ints import uint8, uint16, uint32, uint64 from chia.util.keychain import bytes_to_mnemonic +from chia.util.task_referencer import create_referenced_task from chia.util.timing import adjusted_timeout, backoff_times from chia.wallet.trading.offer import Offer as TradingOffer from chia.wallet.transaction_record import TransactionRecord @@ -2191,7 +2192,7 @@ async def test_issue_15955_deadlock( while time.monotonic() < end: with anyio.fail_after(adjusted_timeout(timeout)): await asyncio.gather( - *(asyncio.create_task(data_layer.get_value(store_id=store_id, key=key)) for _ in range(10)) + *(create_referenced_task(data_layer.get_value(store_id=store_id, key=key)) for _ in range(10)) ) diff --git a/chia/_tests/core/farmer/test_farmer_api.py b/chia/_tests/core/farmer/test_farmer_api.py index 57741c4d0825..b95bbb42ad88 100644 --- a/chia/_tests/core/farmer/test_farmer_api.py +++ b/chia/_tests/core/farmer/test_farmer_api.py @@ -1,6 +1,6 @@ from __future__ import annotations -from asyncio import Task, create_task, gather, sleep +from asyncio import Task, gather, sleep from collections.abc import Coroutine from typing import Any, Optional, TypeVar @@ -20,13 +20,14 @@ from chia.server.outbound_message import Message, NodeType from chia.util.hash import std_hash from chia.util.ints import uint8, uint32, uint64 +from chia.util.task_referencer import create_referenced_task T = TypeVar("T") async def begin_task(coro: Coroutine[Any, Any, T]) -> Task[T]: """Awaitable function that adds a coroutine to the event loop and sets it running.""" - task = create_task(coro) + task = create_referenced_task(coro) await sleep(0) return task diff --git a/chia/_tests/core/full_node/stores/test_block_store.py b/chia/_tests/core/full_node/stores/test_block_store.py index aefa8607782b..de2d48e0fb24 100644 --- a/chia/_tests/core/full_node/stores/test_block_store.py +++ b/chia/_tests/core/full_node/stores/test_block_store.py @@ -30,6 +30,7 @@ from chia.util.db_wrapper import get_host_parameter_limit from chia.util.full_block_utils import GeneratorBlockInfo from chia.util.ints import uint8, uint32, uint64 +from chia.util.task_referencer import create_referenced_task log = logging.getLogger(__name__) @@ -242,12 +243,12 @@ async def test_deadlock(tmp_dir: Path, db_version: int, bt: BlockTools, use_cach rand_i = random.randint(0, 9) if random.random() < 0.5: tasks.append( - asyncio.create_task( + create_referenced_task( store.add_full_block(blocks[rand_i].header_hash, blocks[rand_i], block_records[rand_i]) ) ) if random.random() < 0.5: - tasks.append(asyncio.create_task(store.get_full_block(blocks[rand_i].header_hash))) + tasks.append(create_referenced_task(store.get_full_block(blocks[rand_i].header_hash))) await asyncio.gather(*tasks) diff --git a/chia/_tests/core/full_node/test_full_node.py b/chia/_tests/core/full_node/test_full_node.py index aad87ec8067d..4ac544a19d7c 100644 --- a/chia/_tests/core/full_node/test_full_node.py +++ b/chia/_tests/core/full_node/test_full_node.py @@ -84,6 +84,7 @@ from chia.util.ints import uint8, uint16, uint32, uint64, uint128 from chia.util.limited_semaphore import LimitedSemaphore from chia.util.recursive_replace import recursive_replace +from chia.util.task_referencer import create_referenced_task from chia.util.vdf_prover import get_vdf_info_and_proof from chia.wallet.util.tx_config import DEFAULT_TX_CONFIG from chia.wallet.wallet_spend_bundle import WalletSpendBundle @@ -807,13 +808,13 @@ async def test_new_peak(self, wallet_nodes, self_hostname): uint32(0), block.reward_chain_block.get_unfinished().get_hash(), ) - task_1 = asyncio.create_task(full_node_1.new_peak(new_peak, dummy_peer)) + task_1 = create_referenced_task(full_node_1.new_peak(new_peak, dummy_peer)) await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 1)) task_1.cancel() await full_node_1.full_node.add_block(block, peer) # Ignores, already have - task_2 = asyncio.create_task(full_node_1.new_peak(new_peak, dummy_peer)) + task_2 = create_referenced_task(full_node_1.new_peak(new_peak, dummy_peer)) await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 0)) task_2.cancel() @@ -829,8 +830,7 @@ async def suppress_value_error(coro: Coroutine) -> None: uint32(0), blocks_reorg[-2].reward_chain_block.get_unfinished().get_hash(), ) - # TODO: stop dropping tasks on the floor - asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) # noqa: RUF006 + create_referenced_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 0)) # Does not ignore equal weight @@ -841,8 +841,7 @@ async def suppress_value_error(coro: Coroutine) -> None: uint32(0), blocks_reorg[-1].reward_chain_block.get_unfinished().get_hash(), ) - # TODO: stop dropping tasks on the floor - asyncio.create_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) # noqa: RUF006 + create_referenced_task(suppress_value_error(full_node_1.new_peak(new_peak, dummy_peer))) await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 1)) @pytest.mark.anyio @@ -1568,7 +1567,7 @@ async def test_double_blocks_same_pospace(self, wallet_nodes, self_hostname): block_2 = recursive_replace(block_2, "foliage.foliage_transaction_block_signature", new_fbh_sig) block_2 = recursive_replace(block_2, "transactions_generator", None) - rb_task = asyncio.create_task(full_node_2.full_node.add_block(block_2, dummy_peer)) + rb_task = create_referenced_task(full_node_2.full_node.add_block(block_2, dummy_peer)) await time_out_assert(10, time_out_messages(incoming_queue, "request_block", 1)) rb_task.cancel() diff --git a/chia/_tests/core/full_node/test_tx_processing_queue.py b/chia/_tests/core/full_node/test_tx_processing_queue.py index 228d5aed75e4..73a0c096e987 100644 --- a/chia/_tests/core/full_node/test_tx_processing_queue.py +++ b/chia/_tests/core/full_node/test_tx_processing_queue.py @@ -11,6 +11,7 @@ from chia.full_node.tx_processing_queue import TransactionQueue, TransactionQueueFull from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.transaction_queue_entry import TransactionQueueEntry +from chia.util.task_referencer import create_referenced_task log = logging.getLogger(__name__) @@ -76,7 +77,7 @@ async def test_one_peer_and_await(seeded_random: random.Random) -> None: assert list_txs[i - 20] == resulting_txs[i] # now we validate that the pop command is blocking - task = asyncio.create_task(transaction_queue.pop()) + task = create_referenced_task(transaction_queue.pop()) with pytest.raises(asyncio.InvalidStateError): # task is not done, so we expect an error when getting result task.result() # add a tx to test task completion diff --git a/chia/_tests/core/server/flood.py b/chia/_tests/core/server/flood.py index bd06281a36ae..1782579f4177 100644 --- a/chia/_tests/core/server/flood.py +++ b/chia/_tests/core/server/flood.py @@ -9,6 +9,7 @@ import time from chia._tests.util.misc import create_logger +from chia.util.task_referencer import create_referenced_task # TODO: CAMPid 0945094189459712842390t591 IP = "127.0.0.1" @@ -62,7 +63,7 @@ async def dun() -> None: task.cancel() - file_task = asyncio.create_task(dun()) + file_task = create_referenced_task(dun()) with out_path.open(mode="w") as file: logger = create_logger(file=file) @@ -70,7 +71,7 @@ async def dun() -> None: async def f() -> None: await asyncio.gather(*[tcp_echo_client(task_counter=f"{i}", logger=logger) for i in range(0, NUM_CLIENTS)]) - task = asyncio.create_task(f()) + task = create_referenced_task(f()) try: await task except asyncio.CancelledError: diff --git a/chia/_tests/core/server/serve.py b/chia/_tests/core/server/serve.py index e637146e6cc7..0c8eb047fbf7 100644 --- a/chia/_tests/core/server/serve.py +++ b/chia/_tests/core/server/serve.py @@ -14,6 +14,7 @@ from chia._tests.util.misc import create_logger from chia.server.chia_policy import ChiaPolicy from chia.server.start_service import async_run +from chia.util.task_referencer import create_referenced_task if sys.platform == "win32": import _winapi @@ -86,7 +87,7 @@ async def dun() -> None: thread_end_event.set() - file_task = asyncio.create_task(dun()) + file_task = create_referenced_task(dun()) loop = asyncio.get_event_loop() server = await loop.create_server(functools.partial(EchoServer, logger=logger), ip, port) diff --git a/chia/_tests/core/server/test_loop.py b/chia/_tests/core/server/test_loop.py index a5d485779024..40bc00285752 100644 --- a/chia/_tests/core/server/test_loop.py +++ b/chia/_tests/core/server/test_loop.py @@ -18,6 +18,7 @@ from chia._tests.core.server import serve from chia._tests.util.misc import create_logger from chia.server import chia_policy +from chia.util.task_referencer import create_referenced_task from chia.util.timing import adjusted_timeout here = pathlib.Path(__file__).parent @@ -123,7 +124,7 @@ def _run(self) -> None: asyncio.set_event_loop_policy(original_event_loop_policy) async def main(self) -> None: - self.server_task = asyncio.create_task( + self.server_task = create_referenced_task( serve.async_main( out_path=self.out_path, ip=self.ip, diff --git a/chia/_tests/db/test_db_wrapper.py b/chia/_tests/db/test_db_wrapper.py index 88df3dea71e9..5597469d278d 100644 --- a/chia/_tests/db/test_db_wrapper.py +++ b/chia/_tests/db/test_db_wrapper.py @@ -14,6 +14,7 @@ from chia._tests.util.db_connection import DBConnection, PathDBConnection from chia._tests.util.misc import Marks, boolean_datacases, datacases from chia.util.db_wrapper import DBWrapper2, ForeignKeyError, InternalError, NestedForeignKeyDelayedRequestError +from chia.util.task_referencer import create_referenced_task if TYPE_CHECKING: ConnectionContextManager = contextlib.AbstractAsyncContextManager[aiosqlite.core.Connection] @@ -119,7 +120,7 @@ async def test_concurrent_writers(acquire_outside: bool, get_reader_method: GetR tasks = [] for index in range(concurrent_task_count): - task = asyncio.create_task(increment_counter(db_wrapper)) + task = create_referenced_task(increment_counter(db_wrapper)) tasks.append(task) await asyncio.wait_for(asyncio.gather(*tasks), timeout=None) @@ -263,7 +264,7 @@ async def write() -> None: async with get_reader() as reader: assert await query_value(connection=reader) == 0 - task = asyncio.create_task(write()) + task = create_referenced_task(write()) await writer_committed.wait() assert await query_value(connection=reader) == 0 if transactioned else 1 @@ -342,7 +343,7 @@ async def test_concurrent_readers(acquire_outside: bool, get_reader_method: GetR tasks = [] values: list[int] = [] for index in range(concurrent_task_count): - task = asyncio.create_task(sum_counter(db_wrapper, values)) + task = create_referenced_task(sum_counter(db_wrapper, values)) tasks.append(task) await asyncio.wait_for(asyncio.gather(*tasks), timeout=None) @@ -371,11 +372,11 @@ async def test_mixed_readers_writers(acquire_outside: bool, get_reader_method: G tasks = [] values: list[int] = [] for index in range(concurrent_task_count): - task = asyncio.create_task(increment_counter(db_wrapper)) + task = create_referenced_task(increment_counter(db_wrapper)) tasks.append(task) - task = asyncio.create_task(decrement_counter(db_wrapper)) + task = create_referenced_task(decrement_counter(db_wrapper)) tasks.append(task) - task = asyncio.create_task(sum_counter(db_wrapper, values)) + task = create_referenced_task(sum_counter(db_wrapper, values)) tasks.append(task) await asyncio.wait_for(asyncio.gather(*tasks), timeout=None) diff --git a/chia/_tests/util/test_limited_semaphore.py b/chia/_tests/util/test_limited_semaphore.py index 713ddf24b54c..d7a0516559c0 100644 --- a/chia/_tests/util/test_limited_semaphore.py +++ b/chia/_tests/util/test_limited_semaphore.py @@ -6,6 +6,7 @@ import pytest from chia.util.limited_semaphore import LimitedSemaphore, LimitedSemaphoreFullError +from chia.util.task_referencer import create_referenced_task @pytest.mark.anyio @@ -27,8 +28,8 @@ async def acquire(entered_event: Optional[asyncio.Event] = None) -> None: waiting_events = [asyncio.Event() for _ in range(waiting_limit)] failed_events = [asyncio.Event() for _ in range(beyond_limit)] - entered_tasks = [asyncio.create_task(acquire(entered_event=event)) for event in entered_events] - waiting_tasks = [asyncio.create_task(acquire(entered_event=event)) for event in waiting_events] + entered_tasks = [create_referenced_task(acquire(entered_event=event)) for event in entered_events] + waiting_tasks = [create_referenced_task(acquire(entered_event=event)) for event in waiting_events] await asyncio.gather(*(event.wait() for event in entered_events)) assert all(event.is_set() for event in entered_events) @@ -36,7 +37,7 @@ async def acquire(entered_event: Optional[asyncio.Event] = None) -> None: assert semaphore._available_count == 0 - failure_tasks = [asyncio.create_task(acquire()) for _ in range(beyond_limit)] + failure_tasks = [create_referenced_task(acquire()) for _ in range(beyond_limit)] failure_results = await asyncio.gather(*failure_tasks, return_exceptions=True) assert [str(error) for error in failure_results] == [str(LimitedSemaphoreFullError())] * beyond_limit diff --git a/chia/_tests/util/test_priority_mutex.py b/chia/_tests/util/test_priority_mutex.py index b2169e9b92c3..c8fab8e5fb70 100644 --- a/chia/_tests/util/test_priority_mutex.py +++ b/chia/_tests/util/test_priority_mutex.py @@ -16,6 +16,7 @@ from chia._tests.util.misc import Marks, datacases from chia._tests.util.time_out_assert import time_out_assert_custom_interval from chia.util.priority_mutex import NestedLockUnsupportedError, PriorityMutex +from chia.util.task_referencer import create_referenced_task from chia.util.timing import adjusted_timeout log = logging.getLogger(__name__) @@ -65,10 +66,10 @@ async def do_low(i: int) -> None: log.warning(f"Spend {time.time() - t1} waiting for low {i}") await kind_of_slow_func() - h = asyncio.create_task(do_high()) + h = create_referenced_task(do_high()) l_tasks = [] for i in range(50): - l_tasks.append(asyncio.create_task(do_low(i))) + l_tasks.append(create_referenced_task(do_low(i))) winner = None @@ -334,13 +335,13 @@ async def queued_after() -> None: async with mutex.acquire(priority=MutexPriority.high): pass - block_task = asyncio.create_task(block()) + block_task = create_referenced_task(block()) await blocker_acquired_event.wait() - cancel_task = asyncio.create_task(to_be_cancelled(mutex=mutex)) + cancel_task = create_referenced_task(to_be_cancelled(mutex=mutex)) await wait_queued(mutex=mutex, task=cancel_task) - queued_after_task = asyncio.create_task(queued_after()) + queued_after_task = create_referenced_task(queued_after()) await wait_queued(mutex=mutex, task=queued_after_task) cancel_task.cancel() @@ -441,7 +442,7 @@ async def create_acquire_tasks_in_controlled_order( release_event = asyncio.Event() for request in requests: - task = asyncio.create_task(request.acquire(mutex=mutex, wait_for=release_event)) + task = create_referenced_task(request.acquire(mutex=mutex, wait_for=release_event)) tasks.append(task) await wait_queued(mutex=mutex, task=task) @@ -461,14 +462,14 @@ async def other_task_function() -> None: await other_task_allow_release_event.wait() async with mutex.acquire(priority=MutexPriority.high): - other_task = asyncio.create_task(other_task_function()) + other_task = create_referenced_task(other_task_function()) await wait_queued(mutex=mutex, task=other_task) async def another_task_function() -> None: async with mutex.acquire(priority=MutexPriority.high): pass - another_task = asyncio.create_task(another_task_function()) + another_task = create_referenced_task(another_task_function()) await wait_queued(mutex=mutex, task=another_task) other_task_allow_release_event.set() diff --git a/chia/daemon/client.py b/chia/daemon/client.py index 5597a68041b4..756ee5e938e5 100644 --- a/chia/daemon/client.py +++ b/chia/daemon/client.py @@ -12,6 +12,7 @@ from chia.util.ints import uint32 from chia.util.json_util import dict_to_json_str +from chia.util.task_referencer import create_referenced_task from chia.util.ws_message import WsRpcMessage, create_payload_dict @@ -67,8 +68,7 @@ async def listener_task() -> None: finally: await self.close() - # TODO: stop dropping tasks on the floor - asyncio.create_task(listener_task()) # noqa: RUF006 + create_referenced_task(listener_task(), known_unreferenced=True) await asyncio.sleep(1) async def listener(self) -> None: @@ -92,8 +92,7 @@ async def _get(self, request: WsRpcMessage) -> WsRpcMessage: string = dict_to_json_str(request) if self.websocket is None or self.websocket.closed: raise Exception("Websocket is not connected") - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.websocket.send_str(string)) # noqa: RUF006 + create_referenced_task(self.websocket.send_str(string), known_unreferenced=True) try: await asyncio.wait_for(self._request_dict[request_id].wait(), timeout=30) self._request_dict.pop(request_id) diff --git a/chia/daemon/keychain_proxy.py b/chia/daemon/keychain_proxy.py index b7330c2cc8f5..8e35b602d9af 100644 --- a/chia/daemon/keychain_proxy.py +++ b/chia/daemon/keychain_proxy.py @@ -31,6 +31,7 @@ KeychainProxyConnectionTimeout, ) from chia.util.keychain import Keychain, KeyData, bytes_to_mnemonic, mnemonic_to_seed +from chia.util.task_referencer import create_referenced_task from chia.util.ws_message import WsRpcMessage @@ -99,7 +100,7 @@ async def _get(self, request: WsRpcMessage) -> WsRpcMessage: raise KeychainProxyConnectionTimeout() async def start(self, wait_for_start: bool = False) -> None: - self.keychain_connection_task = asyncio.create_task(self.connect_to_keychain()) + self.keychain_connection_task = create_referenced_task(self.connect_to_keychain()) await self.connection_established.wait() # wait until connection is established. async def connect_to_keychain(self) -> None: diff --git a/chia/daemon/server.py b/chia/daemon/server.py index 1817434acab5..07a123a4de60 100644 --- a/chia/daemon/server.py +++ b/chia/daemon/server.py @@ -47,6 +47,7 @@ from chia.util.safe_cancel_task import cancel_task_safe from chia.util.service_groups import validate_service from chia.util.setproctitle import setproctitle +from chia.util.task_referencer import create_referenced_task from chia.util.ws_message import WsRpcMessage, create_payload, format_response from chia.wallet.derive_keys import ( master_pk_to_wallet_pk_unhardened, @@ -212,7 +213,7 @@ async def run(self) -> AsyncIterator[None]: ssl.OPENSSL_VERSION, ) - self.state_changed_task = asyncio.create_task(self._process_state_changed_queue()) + self.state_changed_task = create_referenced_task(self._process_state_changed_queue()) self.webserver = await WebServer.create( hostname=self.self_hostname, port=self.daemon_port, @@ -249,7 +250,7 @@ async def stop(self) -> dict[str, Any]: cancel_task_safe(self.state_changed_task, self.log) service_names = list(self.services.keys()) stop_service_jobs = [ - asyncio.create_task(kill_service(self.root_path, self.services, s_n)) for s_n in service_names + create_referenced_task(kill_service(self.root_path, self.services, s_n)) for s_n in service_names ] if stop_service_jobs: await asyncio.wait(stop_service_jobs) @@ -1056,7 +1057,7 @@ def _run_next_serial_plotting(self, loop: asyncio.AbstractEventLoop, queue: str break if next_plot_id is not None: - loop.create_task(self._start_plotting(next_plot_id, loop, queue)) + create_referenced_task(self._start_plotting(next_plot_id, loop, queue)) def _post_process_plotting_job(self, job: dict[str, Any]): id: str = job["id"] @@ -1195,7 +1196,7 @@ async def start_plotting(self, websocket: WebSocketResponse, request: dict[str, # TODO: loop gets passed down a lot, review for potential removal loop = asyncio.get_running_loop() # TODO: stop dropping tasks on the floor - loop.create_task(self._start_plotting(id, loop, queue)) # noqa: RUF006 + create_referenced_task(self._start_plotting(id, loop, queue)) else: log.info("Plotting will start automatically when previous plotting finish") @@ -1357,7 +1358,7 @@ async def register_service(self, websocket: WebSocketResponse, request: dict[str } else: if self.ping_job is None: - self.ping_job = asyncio.create_task(self.ping_task()) + self.ping_job = create_referenced_task(self.ping_task()) self.log.info(f"registered for service {service}") log.info(f"{response}") return response diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index 9ba8bbff1cf4..fca8c2f8ba04 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -63,6 +63,7 @@ from chia.util.async_pool import Job, QueuedAsyncPool from chia.util.ints import uint32, uint64 from chia.util.path import path_from_root +from chia.util.task_referencer import create_referenced_task from chia.wallet.trade_record import TradeRecord from chia.wallet.trading.offer import Offer as TradingOffer from chia.wallet.transaction_record import TransactionRecord @@ -200,7 +201,7 @@ async def manage(self) -> AsyncIterator[None]: self._wallet_rpc = await self.wallet_rpc_init await self._data_store.migrate_db() - self.periodically_manage_data_task = asyncio.create_task(self.periodically_manage_data()) + self.periodically_manage_data_task = create_referenced_task(self.periodically_manage_data()) try: yield finally: diff --git a/chia/farmer/farmer.py b/chia/farmer/farmer.py index cd2f69c68bea..e4a9ae086d6f 100644 --- a/chia/farmer/farmer.py +++ b/chia/farmer/farmer.py @@ -50,6 +50,7 @@ from chia.util.keychain import Keychain from chia.util.logging import TimedDuplicateFilter from chia.util.profiler import profile_task +from chia.util.task_referencer import create_referenced_task from chia.wallet.derive_keys import ( find_authentication_sk, find_owner_sk, @@ -187,8 +188,8 @@ async def start_task() -> None: # succeeds or until we need to shut down. while not self._shut_down: if await self.setup_keys(): - self.update_pool_state_task = asyncio.create_task(self._periodically_update_pool_state_task()) - self.cache_clear_task = asyncio.create_task(self._periodically_clear_cache_and_refresh_task()) + self.update_pool_state_task = create_referenced_task(self._periodically_update_pool_state_task()) + self.cache_clear_task = create_referenced_task(self._periodically_clear_cache_and_refresh_task()) log.debug("start_task: initialized") self.started = True return @@ -198,11 +199,9 @@ async def start_task() -> None: if sys.getprofile() is not None: self.log.warning("not enabling profiler, getprofile() is already set") else: - # TODO: stop dropping tasks on the floor - asyncio.create_task(profile_task(self._root_path, "farmer", self.log)) # noqa: RUF006 + create_referenced_task(profile_task(self._root_path, "farmer", self.log), known_unreferenced=True) - # TODO: stop dropping tasks on the floor - asyncio.create_task(start_task()) # noqa: RUF006 + create_referenced_task(start_task(), known_unreferenced=True) try: yield finally: @@ -313,7 +312,7 @@ async def handshake_task() -> None: if peer.connection_type is NodeType.HARVESTER: self.plot_sync_receivers[peer.peer_node_id] = Receiver(peer, self.plot_sync_callback) - self.harvester_handshake_task = asyncio.create_task(handshake_task()) + self.harvester_handshake_task = create_referenced_task(handshake_task()) def set_server(self, server: ChiaServer) -> None: self.server = server diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 0234ff5d1583..1c88f8cf9984 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -91,6 +91,7 @@ from chia.util.path import path_from_root from chia.util.profiler import enable_profiler, mem_profile_task, profile_task from chia.util.safe_cancel_task import cancel_task_safe +from chia.util.task_referencer import create_referenced_task # This is the result of calling peak_post_processing, which is then fed into peak_post_processing_2 @@ -279,13 +280,12 @@ async def manage(self) -> AsyncIterator[None]: # Transactions go into this queue from the server, and get sent to respond_transaction self._transaction_queue = TransactionQueue(1000, self.log) - self._transaction_queue_task: asyncio.Task[None] = asyncio.create_task(self._handle_transactions()) + self._transaction_queue_task: asyncio.Task[None] = create_referenced_task(self._handle_transactions()) - self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof()) + self._init_weight_proof = create_referenced_task(self.initialize_weight_proof()) if self.config.get("enable_profiler", False): - # TODO: stop dropping tasks on the floor - asyncio.create_task(profile_task(self.root_path, "node", self.log)) # noqa: RUF006 + create_referenced_task(profile_task(self.root_path, "node", self.log), known_unreferenced=True) self.profile_block_validation = self.config.get("profile_block_validation", False) if self.profile_block_validation: # pragma: no cover @@ -295,8 +295,7 @@ async def manage(self) -> AsyncIterator[None]: profile_dir.mkdir(parents=True, exist_ok=True) if self.config.get("enable_memory_profiler", False): - # TODO: stop dropping tasks on the floor - asyncio.create_task(mem_profile_task(self.root_path, "node", self.log)) # noqa: RUF006 + create_referenced_task(mem_profile_task(self.root_path, "node", self.log), known_unreferenced=True) time_taken = time.monotonic() - start_time peak: Optional[BlockRecord] = self.blockchain.get_peak() @@ -331,7 +330,7 @@ async def manage(self) -> AsyncIterator[None]: if "sanitize_weight_proof_only" in self.config: sanitize_weight_proof_only = self.config["sanitize_weight_proof_only"] assert self.config["target_uncompact_proofs"] != 0 - self.uncompact_task = asyncio.create_task( + self.uncompact_task = create_referenced_task( self.broadcast_uncompact_blocks( self.config["send_uncompact_interval"], self.config["target_uncompact_proofs"], @@ -339,12 +338,11 @@ async def manage(self) -> AsyncIterator[None]: ) ) if self.wallet_sync_task is None or self.wallet_sync_task.done(): - self.wallet_sync_task = asyncio.create_task(self._wallets_sync_task_handler()) + self.wallet_sync_task = create_referenced_task(self._wallets_sync_task_handler()) self.initialized = True if self.full_node_peers is not None: - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.full_node_peers.start()) # noqa: RUF006 + create_referenced_task(self.full_node_peers.start(), known_unreferenced=True) try: yield finally: @@ -360,8 +358,7 @@ async def manage(self) -> AsyncIterator[None]: self.mempool_manager.shut_down() if self.full_node_peers is not None: - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.full_node_peers.close()) # noqa: RUF006 + create_referenced_task(self.full_node_peers.close(), known_unreferenced=True) if self.uncompact_task is not None: self.uncompact_task.cancel() if self._transaction_queue_task is not None: @@ -517,7 +514,7 @@ async def _handle_transactions(self) -> None: self._tx_task_list.remove(oldtask) item: TransactionQueueEntry = await self.transaction_queue.pop() - self._tx_task_list.append(asyncio.create_task(self._handle_one_transaction(item))) + self._tx_task_list.append(create_referenced_task(self._handle_one_transaction(item))) async def initialize_weight_proof(self) -> None: self.weight_proof_handler = WeightProofHandler( @@ -746,7 +743,7 @@ async def new_peak(self, request: full_node_protocol.NewPeak, peer: WSChiaConnec # Updates heights in the UI. Sleeps 1.5s before, so other peers have time to update their peaks as well. # Limit to 3 refreshes. if not seen_header_hash and len(self._ui_tasks) < 3: - self._ui_tasks.add(asyncio.create_task(self._refresh_ui_connections(1.5))) + self._ui_tasks.add(create_referenced_task(self._refresh_ui_connections(1.5))) # Prune completed connect tasks self._ui_tasks = set(filter(lambda t: not t.done(), self._ui_tasks)) except Exception as e: @@ -818,7 +815,7 @@ async def new_peak(self, request: full_node_protocol.NewPeak, peer: WSChiaConnec # point being in the past), or we are very far behind. Performs a long sync. # Multiple tasks may be created here. If we don't save all handles, a task could enter a sync object # and be cleaned up by the GC, corrupting the sync object and possibly not allowing anything else in. - self._sync_task_list.append(asyncio.create_task(self._sync())) + self._sync_task_list.append(create_referenced_task(self._sync())) async def send_peak_to_timelords( self, peak_block: Optional[FullBlock] = None, peer: Optional[WSChiaConnection] = None @@ -906,8 +903,7 @@ async def on_connect(self, connection: WSChiaConnection) -> None: self._state_changed("add_connection") self._state_changed("sync_mode") if self.full_node_peers is not None: - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.full_node_peers.on_connect(connection)) # noqa: RUF006 + create_referenced_task(self.full_node_peers.on_connect(connection)) if self.initialized is False: return None @@ -1396,9 +1392,9 @@ async def ingest_blocks( Optional[tuple[WSChiaConnection, ValidationState, list[Awaitable[PreValidationResult]], list[FullBlock]]] ] = asyncio.Queue(maxsize=10) - fetch_task = asyncio.create_task(fetch_blocks(block_queue)) - validate_task = asyncio.create_task(validate_blocks(block_queue, validation_queue)) - ingest_task = asyncio.create_task(ingest_blocks(validation_queue)) + fetch_task = create_referenced_task(fetch_blocks(block_queue)) + validate_task = create_referenced_task(validate_blocks(block_queue, validation_queue)) + ingest_task = create_referenced_task(ingest_blocks(validation_queue)) try: await asyncio.gather(fetch_task, validate_task, ingest_task) except Exception: @@ -2254,7 +2250,7 @@ async def add_block( record = self.blockchain.block_record(block.header_hash) if self.weight_proof_handler is not None and record.sub_epoch_summary_included is not None: self._segment_task_list.append( - asyncio.create_task(self.weight_proof_handler.create_prev_sub_epoch_segments()) + create_referenced_task(self.weight_proof_handler.create_prev_sub_epoch_segments()) ) for task in self._segment_task_list[:]: if task.done(): diff --git a/chia/full_node/full_node_api.py b/chia/full_node/full_node_api.py index da2ce74db22d..b0c711b864c9 100644 --- a/chia/full_node/full_node_api.py +++ b/chia/full_node/full_node_api.py @@ -73,6 +73,7 @@ from chia.util.hash import std_hash from chia.util.ints import uint8, uint32, uint64, uint128 from chia.util.limited_semaphore import LimitedSemaphoreFullError +from chia.util.task_referencer import create_referenced_task if TYPE_CHECKING: from chia.full_node.full_node import FullNode @@ -229,7 +230,7 @@ async def tx_request_and_timeout(full_node: FullNode, transaction_id: bytes32, t full_node.full_node_store.tx_fetch_tasks.pop(task_id) task_id: bytes32 = bytes32.secret() - fetch_task = asyncio.create_task( + fetch_task = create_referenced_task( tx_request_and_timeout(self.full_node, transaction.transaction_id, task_id) ) self.full_node.full_node_store.tx_fetch_tasks[task_id] = fetch_task @@ -472,8 +473,7 @@ async def eventually_clear() -> None: await asyncio.sleep(5) self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, None) - # TODO: stop dropping tasks on the floor - asyncio.create_task(eventually_clear()) # noqa: RUF006 + create_referenced_task(eventually_clear(), known_unreferenced=True) return msg @@ -541,8 +541,7 @@ async def eventually_clear() -> None: await asyncio.sleep(5) self.full_node.full_node_store.remove_requesting_unfinished_block(block_hash, foliage_hash) - # TODO: stop dropping tasks on the floor - asyncio.create_task(eventually_clear()) # noqa: RUF006 + create_referenced_task(eventually_clear(), known_unreferenced=True) return msg diff --git a/chia/full_node/weight_proof.py b/chia/full_node/weight_proof.py index 38e568a3f377..ef8edf1caf8e 100644 --- a/chia/full_node/weight_proof.py +++ b/chia/full_node/weight_proof.py @@ -45,6 +45,7 @@ from chia.util.hash import std_hash from chia.util.ints import uint8, uint32, uint64, uint128 from chia.util.setproctitle import getproctitle, setproctitle +from chia.util.task_referencer import create_referenced_task log = logging.getLogger(__name__) @@ -614,7 +615,7 @@ async def validate_weight_proof(self, weight_proof: WeightProof) -> tuple[bool, # The shutdown file manager must be inside of the executor manager so that # we request the workers close prior to waiting for them to close. with _create_shutdown_file() as shutdown_file: - task = asyncio.create_task( + task = create_referenced_task( validate_weight_proof_inner( self.constants, executor, diff --git a/chia/introducer/introducer.py b/chia/introducer/introducer.py index 750f4b26e190..462125892b48 100644 --- a/chia/introducer/introducer.py +++ b/chia/introducer/introducer.py @@ -13,6 +13,7 @@ from chia.server.server import ChiaServer from chia.server.ws_connection import WSChiaConnection from chia.util.ints import uint64 +from chia.util.task_referencer import create_referenced_task class Introducer: @@ -39,7 +40,7 @@ def __init__(self, max_peers_to_send: int, recent_peer_threshold: int): @contextlib.asynccontextmanager async def manage(self) -> AsyncIterator[None]: - self._vetting_task = asyncio.create_task(self._vetting_loop()) + self._vetting_task = create_referenced_task(self._vetting_loop()) try: yield finally: diff --git a/chia/plot_sync/sender.py b/chia/plot_sync/sender.py index 9c3be76e1dc9..c54ad27b5e5d 100644 --- a/chia/plot_sync/sender.py +++ b/chia/plot_sync/sender.py @@ -29,6 +29,7 @@ from chia.server.ws_connection import WSChiaConnection from chia.util.batches import to_batches from chia.util.ints import int16, uint32, uint64 +from chia.util.task_referencer import create_referenced_task log = logging.getLogger(__name__) @@ -120,7 +121,7 @@ async def start(self) -> None: if self._task is not None and self._stop_requested: await self.await_closed() if self._task is None: - self._task = asyncio.create_task(self._run()) + self._task = create_referenced_task(self._run()) if not self._plot_manager.initial_refresh() or self._sync_id != 0: self._reset() else: diff --git a/chia/plotters/plotters_util.py b/chia/plotters/plotters_util.py index 21e87f0088bb..96dca52c76fe 100644 --- a/chia/plotters/plotters_util.py +++ b/chia/plotters/plotters_util.py @@ -14,6 +14,7 @@ from chia.util.chia_version import chia_short_version from chia.util.config import lock_and_load_config +from chia.util.task_referencer import create_referenced_task @contextlib.contextmanager @@ -83,13 +84,13 @@ def process_stderr_line(line_bytes: bytes) -> None: try: await asyncio.wait( [ - asyncio.create_task( + create_referenced_task( _read_stream( process.stdout, process_stdout_line, ) ), - asyncio.create_task( + create_referenced_task( _read_stream( process.stderr, process_stderr_line, diff --git a/chia/rpc/rpc_client.py b/chia/rpc/rpc_client.py index 9cefe722f177..ec7e380feb15 100644 --- a/chia/rpc/rpc_client.py +++ b/chia/rpc/rpc_client.py @@ -17,6 +17,7 @@ from chia.types.blockchain_format.sized_bytes import bytes32 from chia.util.byte_types import hexstr_to_bytes from chia.util.ints import uint16 +from chia.util.task_referencer import create_referenced_task _T_RpcClient = TypeVar("_T_RpcClient", bound="RpcClient") @@ -156,7 +157,7 @@ async def reset_log_level(self) -> dict: return await self.fetch("reset_log_level", {}) def close(self) -> None: - self.closing_task = asyncio.create_task(self.session.close()) + self.closing_task = create_referenced_task(self.session.close()) async def await_closed(self) -> None: if self.closing_task is not None: diff --git a/chia/rpc/rpc_server.py b/chia/rpc/rpc_server.py index a604cad144d3..2ee49cc20a15 100644 --- a/chia/rpc/rpc_server.py +++ b/chia/rpc/rpc_server.py @@ -38,6 +38,7 @@ from chia.util.ints import uint16 from chia.util.json_util import dict_to_json_str from chia.util.network import WebServer, resolve +from chia.util.task_referencer import create_referenced_task from chia.util.ws_message import ( WsRpcMessage, create_payload, @@ -248,8 +249,7 @@ async def _state_changed(self, change: str, change_data: Optional[dict[str, Any] def state_changed(self, change: str, change_data: Optional[dict[str, Any]] = None) -> None: if self.websocket is None or self.websocket.closed: return None - # TODO: stop dropping tasks on the floor - asyncio.create_task(self._state_changed(change, change_data)) # noqa: RUF006 + create_referenced_task(self._state_changed(change, change_data), known_unreferenced=True) @property def listen_port(self) -> uint16: @@ -460,7 +460,7 @@ async def inner() -> None: self.client_session = None await asyncio.sleep(2) - self.daemon_connection_task = asyncio.create_task(inner()) + self.daemon_connection_task = create_referenced_task(inner()) _routes: ClassVar[dict[str, Callable[..., Awaitable[object]]]] = { "/get_network_info": get_network_info, diff --git a/chia/seeder/crawler.py b/chia/seeder/crawler.py index 6636167e6b87..24d7b6608a0a 100644 --- a/chia/seeder/crawler.py +++ b/chia/seeder/crawler.py @@ -29,6 +29,7 @@ from chia.util.ints import uint32, uint64 from chia.util.network import resolve from chia.util.path import path_from_root +from chia.util.task_referencer import create_referenced_task log = logging.getLogger(__name__) @@ -82,7 +83,7 @@ async def manage(self) -> AsyncIterator[None]: if self.start_crawler_loop: # Bootstrap the initial peers await self.load_bootstrap_peers() - self.crawl_task = asyncio.create_task(self.crawl()) + self.crawl_task = create_referenced_task(self.crawl()) try: yield finally: @@ -219,7 +220,7 @@ async def crawl(self) -> None: total_nodes += 1 if peer.ip_address not in tried_nodes: tried_nodes.add(peer.ip_address) - task = asyncio.create_task(self.connect_task(peer)) + task = create_referenced_task(self.connect_task(peer)) tasks.add(task) if len(tasks) >= 250: await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) diff --git a/chia/seeder/dns_server.py b/chia/seeder/dns_server.py index a00af4172cd5..7a1faf822ce9 100644 --- a/chia/seeder/dns_server.py +++ b/chia/seeder/dns_server.py @@ -24,6 +24,7 @@ from chia.util.config import load_config, load_config_cli from chia.util.default_root import resolve_root_path from chia.util.path import path_from_root +from chia.util.task_referencer import create_referenced_task SERVICE_NAME = "seeder" log = logging.getLogger(__name__) @@ -60,7 +61,7 @@ class UDPDNSServerProtocol(asyncio.DatagramProtocol): queue_task: Optional[asyncio.Task[None]] = field(init=False, default=None) def start(self) -> None: - self.queue_task = asyncio.create_task(self.respond()) # This starts the dns respond loop. + self.queue_task = create_referenced_task(self.respond()) # This starts the dns respond loop. async def stop(self) -> None: if self.queue_task is not None: @@ -81,8 +82,7 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: dns_request: Optional[DNSRecord] = parse_dns_request(data) if dns_request is None: # Invalid Request, we can just drop it and move on. return - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.handler(dns_request, addr)) # noqa: RUF006 + create_referenced_task(self.handler(dns_request, addr), known_unreferenced=True) async def respond(self) -> None: log.info("UDP DNS responder started.") @@ -180,7 +180,7 @@ def buffer_updated(self, nbytes: int) -> None: if dns_request is None: # Invalid Request, so we disconnect and don't send anything back. self.transport.close() return - self.futures.append(asyncio.create_task(self.handle_and_respond(dns_request))) + self.futures.append(create_referenced_task(self.handle_and_respond(dns_request))) self.buffer = bytearray(2 if self.expected_length == 0 else self.expected_length) # Reset the buffer if empty. @@ -195,8 +195,7 @@ def eof_received(self) -> Optional[bool]: f"Received incomplete TCP DNS request of length {self.expected_length} from {self.peer_info}, " f"closing connection after dns replies are sent." ) - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.wait_for_futures()) # noqa: RUF006 + create_referenced_task(self.wait_for_futures(), known_unreferenced=True) return True # Keep connection open, until the futures are done. log.info(f"Received early EOF from {self.peer_info}, closing connection.") return False @@ -342,7 +341,7 @@ async def run(self) -> AsyncIterator[None]: # Set up the crawl store and the peer update task. self.crawl_store = await CrawlStore.create(await aiosqlite.connect(self.db_path, timeout=120)) - self.reliable_task = asyncio.create_task(self.periodically_get_reliable_peers()) + self.reliable_task = create_referenced_task(self.periodically_get_reliable_peers()) # One protocol instance will be created for each udp transport, so that we can accept ipv4 and ipv6 self.udp_transport_ipv6, self.udp_protocol_ipv6 = await loop.create_datagram_endpoint( diff --git a/chia/server/node_discovery.py b/chia/server/node_discovery.py index 57afa4f204ae..478c43a46681 100644 --- a/chia/server/node_discovery.py +++ b/chia/server/node_discovery.py @@ -27,6 +27,7 @@ from chia.util.ip_address import IPAddress from chia.util.network import resolve from chia.util.safe_cancel_task import cancel_task_safe +from chia.util.task_referencer import create_referenced_task MAX_PEERS_RECEIVED_PER_REQUEST = 1000 MAX_TOTAL_PEERS_RECEIVED = 3000 @@ -98,9 +99,9 @@ async def initialize_address_manager(self) -> None: async def start_tasks(self) -> None: random = Random() - self.connect_peers_task = asyncio.create_task(self._connect_to_peers(random)) - self.serialize_task = asyncio.create_task(self._periodically_serialize(random)) - self.cleanup_task = asyncio.create_task(self._periodically_cleanup()) + self.connect_peers_task = create_referenced_task(self._connect_to_peers(random)) + self.serialize_task = create_referenced_task(self._periodically_serialize(random)) + self.cleanup_task = create_referenced_task(self._periodically_cleanup()) async def _close_common(self) -> None: self.is_closed = True @@ -397,7 +398,7 @@ async def _connect_to_peers(self, random: Random) -> None: self.log.debug("Max concurrent outbound connections reached. waiting") await asyncio.wait(self.pending_tasks, return_when=asyncio.FIRST_COMPLETED) self.pending_tasks.add( - asyncio.create_task(self.start_client_async(addr, disconnect_after_handshake)) + create_referenced_task(self.start_client_async(addr, disconnect_after_handshake)) ) await asyncio.sleep(connect_peer_interval) @@ -522,8 +523,8 @@ def __init__( async def start(self) -> None: await self.initialize_address_manager() - self.self_advertise_task = asyncio.create_task(self._periodically_self_advertise_and_clean_data()) - self.address_relay_task = asyncio.create_task(self._address_relay()) + self.self_advertise_task = create_referenced_task(self._periodically_self_advertise_and_clean_data()) + self.address_relay_task = create_referenced_task(self._address_relay()) await self.start_tasks() async def close(self) -> None: diff --git a/chia/server/server.py b/chia/server/server.py index 1ff72c2214a5..116b7fb747ee 100644 --- a/chia/server/server.py +++ b/chia/server/server.py @@ -40,6 +40,7 @@ from chia.util.network import WebServer, is_in_network, is_localhost, is_trusted_peer from chia.util.ssl_check import verify_ssl_certs_and_keys from chia.util.streamable import Streamable +from chia.util.task_referencer import create_referenced_task max_message_size = 50 * 1024 * 1024 # 50MB @@ -285,7 +286,7 @@ async def start( if self.webserver is not None: raise RuntimeError("ChiaServer already started") if self.gc_task is None: - self.gc_task = asyncio.create_task(self.garbage_collect_connections_task()) + self.gc_task = create_referenced_task(self.garbage_collect_connections_task()) if self._port is not None: self.on_connect = on_connect @@ -502,8 +503,7 @@ async def start_client( self.log.info(f"Connected with {connection_type_str} {target_node}") else: self.log.debug(f"Successful feeler connection with {connection_type_str} {target_node}") - # TODO: stop dropping tasks on the floor - asyncio.create_task(connection.close()) # noqa: RUF006 + create_referenced_task(connection.close(), known_unreferenced=True) return True except client_exceptions.ClientConnectorError as e: if is_feeler: @@ -652,7 +652,7 @@ async def close_all_connections(self) -> None: self.log.error(f"Exception while closing connection {e}") def close_all(self) -> None: - self.connection_close_task = asyncio.create_task(self.close_all_connections()) + self.connection_close_task = create_referenced_task(self.close_all_connections()) if self.webserver is not None: self.webserver.close() diff --git a/chia/server/signal_handlers.py b/chia/server/signal_handlers.py index 26630f619679..33f8cc7fdff4 100644 --- a/chia/server/signal_handlers.py +++ b/chia/server/signal_handlers.py @@ -12,6 +12,8 @@ from typing_extensions import Protocol +from chia.util.task_referencer import create_referenced_task + class Handler(Protocol): def __call__( @@ -59,7 +61,7 @@ def loop_safe_sync_signal_handler_for_async( ) -> None: self.remove_done_handlers() - task = asyncio.create_task( + task = create_referenced_task( handler(signal_=signal_, stack_frame=stack_frame, loop=loop), ) self.tasks.append(task) diff --git a/chia/server/start_service.py b/chia/server/start_service.py index b0827eba06c6..33816bc0386c 100644 --- a/chia/server/start_service.py +++ b/chia/server/start_service.py @@ -29,6 +29,7 @@ from chia.util.log_exceptions import log_exceptions from chia.util.network import resolve from chia.util.setproctitle import setproctitle +from chia.util.task_referencer import create_referenced_task # this is used to detect whether we are running in the main process or not, in # signal handlers. We need to ignore signals in the sub processes. @@ -218,7 +219,7 @@ async def manage(self, *, start: bool = True) -> AsyncIterator[None]: except ValueError: pass - self._connect_peers_task = asyncio.create_task(self._connect_peers_task_handler()) + self._connect_peers_task = create_referenced_task(self._connect_peers_task_handler()) self._log.info( f"Started {self._service_name} service on network_id: {self._network_id} " diff --git a/chia/server/ws_connection.py b/chia/server/ws_connection.py index 5528c1eba178..9c767bf697ef 100644 --- a/chia/server/ws_connection.py +++ b/chia/server/ws_connection.py @@ -38,6 +38,7 @@ # Each message is prepended with LENGTH_BYTES bytes specifying the length from chia.util.network import is_localhost from chia.util.streamable import Streamable +from chia.util.task_referencer import create_referenced_task # Max size 2^(8*4) which is around 4GiB LENGTH_BYTES: int = 4 @@ -296,9 +297,9 @@ async def perform_handshake( # "1" means capability is enabled self.peer_capabilities = known_active_capabilities(inbound_handshake.capabilities) - self.outbound_task = asyncio.create_task(self.outbound_handler()) - self.inbound_task = asyncio.create_task(self.inbound_handler()) - self.incoming_message_task = asyncio.create_task(self.incoming_message_handler()) + self.outbound_task = create_referenced_task(self.outbound_handler()) + self.inbound_task = create_referenced_task(self.inbound_handler()) + self.incoming_message_task = create_referenced_task(self.incoming_message_handler()) async def close( self, @@ -503,7 +504,7 @@ async def incoming_message_handler(self) -> None: while True: message = await self.incoming_queue.get() task_id: bytes32 = bytes32.secret() - api_task = asyncio.create_task(self._api_call(message, task_id)) + api_task = create_referenced_task(self._api_call(message, task_id)) self.api_tasks[task_id] = api_task async def inbound_handler(self) -> None: @@ -649,8 +650,7 @@ async def _send_message(self, message: Message) -> None: # TODO: fix this special case. This function has rate limits which are too low. if ProtocolMessageTypes(message.type) != ProtocolMessageTypes.respond_peers: - # TODO: stop dropping tasks on the floor - asyncio.create_task(self._wait_and_retry(message)) # noqa: RUF006 + create_referenced_task(self._wait_and_retry(message), known_unreferenced=True) return None else: @@ -678,8 +678,7 @@ async def _read_one_message(self) -> Optional[Message]: f"{self.peer_server_port}/" f"{self.peer_info.port}" ) - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.close()) # noqa: RUF006 + create_referenced_task(self.close(), known_unreferenced=True) await asyncio.sleep(3) elif message.type == WSMsgType.CLOSE: self.log.debug( @@ -687,13 +686,11 @@ async def _read_one_message(self) -> Optional[Message]: f"{self.peer_server_port}/" f"{self.peer_info.port}" ) - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.close()) # noqa: RUF006 + create_referenced_task(self.close(), known_unreferenced=True) await asyncio.sleep(3) elif message.type == WSMsgType.CLOSED: if not self.closed: - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.close()) # noqa: RUF006 + create_referenced_task(self.close(), known_unreferenced=True) await asyncio.sleep(3) return None elif message.type == WSMsgType.BINARY: @@ -713,8 +710,7 @@ async def _read_one_message(self) -> Optional[Message]: details = ", ".join([f"{self.peer_info.host}", f"message: {message_type}", limiter_msg]) self.log.error(f"Peer has been rate limited and will be disconnected: {details}") # Only full node disconnects peers, to prevent abuse and crashing timelords, farmers, etc - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.close(RATE_LIMITER_BAN_SECONDS)) # noqa: RUF006 + create_referenced_task(self.close(RATE_LIMITER_BAN_SECONDS), known_unreferenced=True) await asyncio.sleep(3) return None else: @@ -727,17 +723,14 @@ async def _read_one_message(self) -> Optional[Message]: elif message.type == WSMsgType.ERROR: self.log.error(f"WebSocket Error: {message}") if isinstance(message.data, WebSocketError) and message.data.code == WSCloseCode.MESSAGE_TOO_BIG: - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.close(RATE_LIMITER_BAN_SECONDS)) # noqa: RUF006 + create_referenced_task(self.close(RATE_LIMITER_BAN_SECONDS), known_unreferenced=True) else: - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.close()) # noqa: RUF006 + create_referenced_task(self.close(), known_unreferenced=True) await asyncio.sleep(3) else: self.log.error(f"Unexpected WebSocket message type: {message}") - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.close()) # noqa: RUF006 + create_referenced_task(self.close()) await asyncio.sleep(3) return None diff --git a/chia/simulator/setup_services.py b/chia/simulator/setup_services.py index 629074a40faf..3fa87bc7b08a 100644 --- a/chia/simulator/setup_services.py +++ b/chia/simulator/setup_services.py @@ -48,6 +48,7 @@ from chia.util.ints import uint16 from chia.util.keychain import bytes_to_mnemonic from chia.util.lock import Lockfile +from chia.util.task_referencer import create_referenced_task log = logging.getLogger(__name__) @@ -413,7 +414,7 @@ async def setup_introducer(bt: BlockTools, port: int) -> AsyncGenerator[Introduc async def setup_vdf_client(bt: BlockTools, self_hostname: str, port: int) -> AsyncIterator[None]: find_vdf_client() # raises FileNotFoundError if not found process_mgr = VDFClientProcessMgr() - vdf_task_1 = asyncio.create_task( + vdf_task_1 = create_referenced_task( spawn_process(self_hostname, port, 1, process_mgr, prefer_ipv6=bt.config.get("prefer_ipv6", False)), name="vdf_client_1", ) @@ -447,7 +448,7 @@ async def setup_vdf_clients(bt: BlockTools, self_hostname: str, port: int) -> As prefer_ipv6 = bt.config.get("prefer_ipv6", False) for i in range(1, 4): tasks.append( - asyncio.create_task( + create_referenced_task( spawn_process( host=self_hostname, port=port, counter=i, process_mgr=process_mgr, prefer_ipv6=prefer_ipv6 ), diff --git a/chia/timelord/timelord.py b/chia/timelord/timelord.py index 4fd5a6c126cc..dbd72216fc13 100644 --- a/chia/timelord/timelord.py +++ b/chia/timelord/timelord.py @@ -42,6 +42,7 @@ from chia.types.end_of_slot_bundle import EndOfSubSlotBundle from chia.util.ints import uint8, uint16, uint32, uint64, uint128 from chia.util.streamable import Streamable, streamable +from chia.util.task_referencer import create_referenced_task log = logging.getLogger(__name__) @@ -158,7 +159,7 @@ async def manage(self) -> AsyncIterator[None]: self.last_state: LastState = LastState(self.constants) slow_bluebox = self.config.get("slow_bluebox", False) if not self.bluebox_mode: - self.main_loop = asyncio.create_task(self._manage_chains()) + self.main_loop = create_referenced_task(self._manage_chains()) else: if os.name == "nt" or slow_bluebox: # `vdf_client` doesn't build on windows, use `prove()` from chiavdf. @@ -167,11 +168,11 @@ async def manage(self) -> AsyncIterator[None]: self.bluebox_pool = ThreadPoolExecutor( max_workers=workers, ) - self.main_loop = asyncio.create_task( + self.main_loop = create_referenced_task( self._start_manage_discriminant_queue_sanitizer_slow(self.bluebox_pool, workers) ) else: - self.main_loop = asyncio.create_task(self._manage_discriminant_queue_sanitizer()) + self.main_loop = create_referenced_task(self._manage_discriminant_queue_sanitizer()) log.info(f"Started timelord, listening on port {self.get_vdf_server_port()}") try: yield @@ -418,7 +419,7 @@ async def _map_chains_with_vdf_clients(self) -> None: assert challenge is not None assert initial_form is not None self.process_communication_tasks.append( - asyncio.create_task( + create_referenced_task( self._do_process_communication( picked_chain, challenge, initial_form, ip, reader, writer, proof_label=self.num_resets ) @@ -1112,7 +1113,7 @@ async def _manage_discriminant_queue_sanitizer(self) -> None: info = self.pending_bluebox_info[0] ip, reader, writer = self.free_clients[0] self.process_communication_tasks.append( - asyncio.create_task( + create_referenced_task( self._do_process_communication( Chain.BLUEBOX, info[1].new_proof_of_time.challenge, @@ -1136,7 +1137,7 @@ async def _manage_discriminant_queue_sanitizer(self) -> None: async def _start_manage_discriminant_queue_sanitizer_slow(self, pool: ThreadPoolExecutor, counter: int) -> None: tasks = [] for _ in range(counter): - tasks.append(asyncio.create_task(self._manage_discriminant_queue_sanitizer_slow(pool))) + tasks.append(create_referenced_task(self._manage_discriminant_queue_sanitizer_slow(pool))) for task in tasks: await task diff --git a/chia/util/async_pool.py b/chia/util/async_pool.py index 1621a7b284ac..957f636397a5 100644 --- a/chia/util/async_pool.py +++ b/chia/util/async_pool.py @@ -12,6 +12,7 @@ import anyio from chia.util.log_exceptions import log_exceptions +from chia.util.task_referencer import create_referenced_task class InvalidTargetWorkerCountError(Exception): @@ -153,7 +154,7 @@ async def managed( if self._target_worker_count < 1: raise InvalidTargetWorkerCountError(self._target_worker_count) - task = asyncio.create_task(self._run(_check_single_use=False)) + task = create_referenced_task(self._run(_check_single_use=False)) try: # TODO: should this terminate if the run task ends? await self._started.wait() @@ -187,7 +188,7 @@ async def _run(self, *, _check_single_use: bool = True) -> None: async def _run_single(self) -> None: while len(self._workers) < self._target_worker_count: new_worker_id = next(self._worker_id_counter) - new_worker = asyncio.create_task(self.worker_async_callable(new_worker_id)) + new_worker = create_referenced_task(self.worker_async_callable(new_worker_id)) self.log.debug(f"{self.name}: adding worker {new_worker_id}") self._workers[new_worker] = new_worker_id diff --git a/chia/util/beta_metrics.py b/chia/util/beta_metrics.py index 56a9d07d79c0..01e1fc21671e 100644 --- a/chia/util/beta_metrics.py +++ b/chia/util/beta_metrics.py @@ -12,6 +12,7 @@ from chia.util.config import load_config from chia.util.cpu import available_logical_cores +from chia.util.task_referencer import create_referenced_task log = logging.getLogger("beta") @@ -84,7 +85,7 @@ def start_logging(self) -> None: if self.task is not None: raise RuntimeError("Already started") self.stop_task = False - self.task = asyncio.create_task(self.run()) + self.task = create_referenced_task(self.run()) async def stop_logging(self) -> None: log.debug("stop_logging") diff --git a/chia/util/network.py b/chia/util/network.py index e35074c46875..70021f1f98c2 100644 --- a/chia/util/network.py +++ b/chia/util/network.py @@ -17,6 +17,7 @@ from chia.types.blockchain_format.sized_bytes import bytes32 from chia.util.ints import uint16 from chia.util.ip_address import IPAddress +from chia.util.task_referencer import create_referenced_task @final @@ -97,7 +98,7 @@ async def _close(self) -> None: await self.runner.cleanup() def close(self) -> None: - self._close_task = asyncio.create_task(self._close()) + self._close_task = create_referenced_task(self._close()) async def await_closed(self) -> None: if self._close_task is None: diff --git a/chia/util/task_referencer.py b/chia/util/task_referencer.py new file mode 100644 index 000000000000..ce95078bbc54 --- /dev/null +++ b/chia/util/task_referencer.py @@ -0,0 +1,59 @@ +# TODO: this should not exist, it is a bad task group that doesn't require +# any responsibility of the requestor. + +from __future__ import annotations + +import asyncio +import dataclasses +import logging +import typing + +T = typing.TypeVar("T") + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class _TaskInfo: + task: asyncio.Task[object] + # retained for potential debugging use + known_unreferenced: bool + + def __str__(self) -> str: + return self.task.get_name() + + +@dataclasses.dataclass +class _TaskReferencer: + """Holds strong references to tasks until they are done. This compensates for + asyncio holding only weak references. This should be replaced by patterns using + task groups such as from anyio. + """ + + tasks: dict[asyncio.Task[object], _TaskInfo] = dataclasses.field(default_factory=dict) + + def create_task( + self, + coroutine: typing.Coroutine[object, object, T], + *, + name: typing.Optional[str] = None, + known_unreferenced: bool = False, + ) -> asyncio.Task[T]: + task = asyncio.create_task(coro=coroutine, name=name) # noqa: TID251 + task.add_done_callback(self._task_done) + + self.tasks[task] = _TaskInfo(task=task, known_unreferenced=known_unreferenced) + + return task + + def _task_done(self, task: asyncio.Task[object]) -> None: + # TODO: consider collecting results and logging errors + try: + del self.tasks[task] + except KeyError: + logger.warning("Task not found in task referencer: %s", task) + + +_global_task_referencer = _TaskReferencer() + +create_referenced_task = _global_task_referencer.create_task diff --git a/chia/wallet/util/wallet_sync_utils.py b/chia/wallet/util/wallet_sync_utils.py index 82095a207bcc..c3c20c6dce4a 100644 --- a/chia/wallet/util/wallet_sync_utils.py +++ b/chia/wallet/util/wallet_sync_utils.py @@ -36,6 +36,7 @@ from chia.types.coin_spend import CoinSpend, make_spend from chia.types.header_block import HeaderBlock from chia.util.ints import uint32 +from chia.util.task_referencer import create_referenced_task from chia.wallet.util.peer_request_cache import PeerRequestCache log = logging.getLogger(__name__) @@ -316,7 +317,9 @@ async def fetch_header_blocks_in_range( res_h_blocks = await res_h_blocks_task else: log.debug(f"Fetching: {start}-{end}") - res_h_blocks_task = asyncio.create_task(_fetch_header_blocks_inner(all_peers, request_start, request_end)) + res_h_blocks_task = create_referenced_task( + _fetch_header_blocks_inner(all_peers, request_start, request_end) + ) peer_request_cache.add_to_block_requests(request_start, request_end, res_h_blocks_task) res_h_blocks = await res_h_blocks_task if res_h_blocks is None: diff --git a/chia/wallet/wallet_node.py b/chia/wallet/wallet_node.py index 1c38d7800369..a02076df028a 100644 --- a/chia/wallet/wallet_node.py +++ b/chia/wallet/wallet_node.py @@ -55,6 +55,7 @@ from chia.util.path import path_from_root from chia.util.profiler import mem_profile_task, profile_task from chia.util.streamable import Streamable, streamable +from chia.util.task_referencer import create_referenced_task from chia.wallet.puzzles.clawback.metadata import AutoClaimSettings from chia.wallet.transaction_record import TransactionRecord from chia.wallet.util.new_peak_queue import NewPeakItem, NewPeakQueue, NewPeakQueueTypes @@ -426,12 +427,10 @@ async def _start_with_fingerprint( if sys.getprofile() is not None: self.log.warning("not enabling profiler, getprofile() is already set") else: - # TODO: stop dropping tasks on the floor - asyncio.create_task(profile_task(self.root_path, "wallet", self.log)) # noqa: RUF006 + create_referenced_task(profile_task(self.root_path, "wallet", self.log), known_unreferenced=True) if self.config.get("enable_memory_profiler", False): - # TODO: stop dropping tasks on the floor - asyncio.create_task(mem_profile_task(self.root_path, "wallet", self.log)) # noqa: RUF006 + create_referenced_task(mem_profile_task(self.root_path, "wallet", self.log), known_unreferenced=True) path: Path = get_wallet_db_path(self.root_path, self.config, str(fingerprint)) path.parent.mkdir(parents=True, exist_ok=True) @@ -457,8 +456,8 @@ async def _start_with_fingerprint( self.wallet_tx_resend_timeout_secs = self.config.get("tx_resend_timeout_secs", 60 * 60) self.wallet_state_manager.set_pending_callback(self._pending_tx_handler) self._shut_down = False - self._process_new_subscriptions_task = asyncio.create_task(self._process_new_subscriptions()) - self._retry_failed_states_task = asyncio.create_task(self._retry_failed_states()) + self._process_new_subscriptions_task = create_referenced_task(self._process_new_subscriptions()) + self._retry_failed_states_task = create_referenced_task(self._retry_failed_states()) self.sync_event = asyncio.Event() self.log_in(fingerprint) @@ -519,8 +518,7 @@ def _set_state_changed_callback(self, callback: StateChangedProtocol) -> None: def _pending_tx_handler(self) -> None: if self._wallet_state_manager is None: return None - # TODO: stop dropping tasks on the floor - asyncio.create_task(self._resend_queue()) # noqa: RUF006 + create_referenced_task(self._resend_queue(), known_unreferenced=True) async def _resend_queue(self) -> None: if self._shut_down or self._server is None or self._wallet_state_manager is None: @@ -721,8 +719,7 @@ def initialize_wallet_peers(self) -> None: default_port, self.log, ) - # TODO: stop dropping tasks on the floor - asyncio.create_task(self.wallet_peers.start()) # noqa: RUF006 + create_referenced_task(self.wallet_peers.start()) async def on_disconnect(self, peer: WSChiaConnection) -> None: if self.is_trusted(peer): @@ -1003,7 +1000,7 @@ async def validate_and_add(inner_states: list[CoinState], inner_idx_start: int) self.log.info("Terminating receipt and validation due to shut down request") await asyncio.gather(*all_tasks) return False - all_tasks.append(asyncio.create_task(validate_and_add(batch.entries, idx))) + all_tasks.append(create_referenced_task(validate_and_add(batch.entries, idx))) idx += len(batch.entries) still_connected = self._server is not None and peer.peer_node_id in self.server.all_connections @@ -1242,7 +1239,7 @@ async def long_sync_from_untrusted(self, syncing: bool, new_peak_hb: HeaderBlock self.log.info("Secondary peer syncing") # In this case we will not rollback so it's OK to check some older updates as well, to ensure # that no recent transactions are being hidden. - self._secondary_peer_sync_task = asyncio.create_task( + self._secondary_peer_sync_task = create_referenced_task( self.long_sync(new_peak_hb.height, peer, 0, rollback=False) ) diff --git a/ruff.toml b/ruff.toml index 35cabb84634d..9213a15935c8 100644 --- a/ruff.toml +++ b/ruff.toml @@ -75,8 +75,7 @@ ignore = [ ban-relative-imports = "all" [lint.flake8-tidy-imports.banned-api] -# for use with another pr -# "asyncio.create_task".msg = "Use `from chia.util.pit import pit` and `pit.create_task()`" + "asyncio.create_task".msg = "Use `from chia.util.task_referencer import create_referenced_task` and `create_referenced_task()`" [lint.isort] required-imports = ["from __future__ import annotations"]