Skip to content

Commit

Permalink
Updated libtorrent session creation to be async (#8112)
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink authored Sep 2, 2024
2 parents b053617 + ddc4144 commit 7d7c943
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 79 deletions.
10 changes: 5 additions & 5 deletions src/tribler/core/libtorrent/download_manager/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,20 +517,20 @@ def on_performance_alert(self, alert: lt.performance_alert) -> None:

# When the send buffer watermark is too low, double the buffer size to a
# maximum of 50MiB. This is the same mechanism as Deluge uses.
lt_session = self.download_manager.get_session(self.config.get_hops())
lt_session = self.download_manager.get_session(self.config.get_hops()).result()
settings = self.download_manager.get_session_settings(lt_session)
if alert.message().endswith("send buffer watermark too low (upload rate will suffer)"):
if settings["send_buffer_watermark"] <= 26214400:
self._logger.info("Setting send_buffer_watermark to %s", 2 * settings["send_buffer_watermark"])
settings["send_buffer_watermark"] *= 2
self.download_manager.set_session_settings(self.download_manager.get_session(), settings)
self.download_manager.set_session_settings(self.download_manager.get_session().result(), settings)
# When the write cache is too small, double the buffer size to a maximum
# of 64MiB. Again, this is the same mechanism as Deluge uses.
elif (alert.message().endswith("max outstanding disk writes reached")
and settings["max_queued_disk_bytes"] <= 33554432):
self._logger.info("Setting max_queued_disk_bytes to %s", 2 * settings["max_queued_disk_bytes"])
settings["max_queued_disk_bytes"] *= 2
self.download_manager.set_session_settings(self.download_manager.get_session(), settings)
self.download_manager.set_session_settings(self.download_manager.get_session().result(), settings)

def on_torrent_removed_alert(self, alert: lt.torrent_removed_alert) -> None:
"""
Expand Down Expand Up @@ -771,7 +771,7 @@ def get_tracker_status(self) -> dict[str, tuple[int, str]]:
if info.source & info.pex:
pex_peers += 1

ltsession = self.download_manager.get_session(self.config.get_hops())
ltsession = self.download_manager.get_session(self.config.get_hops()).result()
public = self.tdef and not self.tdef.is_private()

result = self.tracker_status.copy()
Expand Down Expand Up @@ -864,7 +864,7 @@ def set_def(self, tdef: TorrentDef) -> None:
self.tdef = tdef

@check_handle(None)
def add_trackers(self, trackers: list[str]) -> None:
def add_trackers(self, trackers: list[bytes]) -> None:
"""
Add the given trackers to the handle.
"""
Expand Down
82 changes: 41 additions & 41 deletions src/tribler/core/libtorrent/download_manager/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import logging
import os
import time
from asyncio import CancelledError, gather, iscoroutine, shield, sleep, wait_for
from asyncio import CancelledError, Future, gather, iscoroutine, shield, sleep, wait_for
from binascii import hexlify, unhexlify
from collections import defaultdict
from copy import deepcopy
Expand Down Expand Up @@ -94,16 +94,16 @@ def __init__(self, config: TriblerConfigManager, notifier: Notifier,

self.state_dir = Path(config.get_version_state_dir())
self.ltsettings: dict[lt.session, dict] = {} # Stores a copy of the settings dict for each libtorrent session
self.ltsessions: dict[int, lt.session] = {}
self.ltsessions: dict[int, Future[lt.session]] = {}
self.dht_health_manager: DHTHealthManager | None = None
self.listen_ports: dict[int, dict[str, int]] = defaultdict(dict)

self.socks_listen_ports = config.get("libtorrent/socks_listen_ports")

self.notifier = notifier

self.set_upload_rate_limit(0)
self.set_download_rate_limit(0)
self.register_task("Set default upload rate limit", self.set_upload_rate_limit, 0)
self.register_task("Set default download rate limit", self.set_download_rate_limit, 0)

self.downloads: Dict[bytes, Download] = {}

Expand Down Expand Up @@ -171,20 +171,16 @@ async def _check_dht_ready(self, min_dht_peers: int = 60) -> None:
See https://github.com/Tribler/tribler/issues/5319
"""
while not (self.get_session() and self.get_session().status().dht_nodes > min_dht_peers):
while (await self.get_session()).status().dht_nodes < min_dht_peers:
await asyncio.sleep(1)

def initialize(self) -> None:
async def initialize(self) -> None:
"""
Initialize the directory structure, launch the periodic tasks and start libtorrent background processes.
"""
# Create the checkpoints directory
self.checkpoint_directory.mkdir(exist_ok=True, parents=True)

# Start upnp
if self.config.get("libtorrent/upnp"):
self.get_session().start_upnp()

# Register tasks
self.register_task("process_alerts", self._task_process_alerts, interval=1, ignore=(Exception, ))
if self.dht_readiness_timeout > 0 and self.config.get("libtorrent/dht"):
Expand All @@ -194,6 +190,10 @@ def initialize(self) -> None:

self.set_download_states_callback(self.sesscb_states_callback)

# Start upnp
if self.config.get("libtorrent/upnp"):
(await self.get_session()).start_upnp()

def start(self) -> None:
"""
Start loading the checkpoints from disk.
Expand Down Expand Up @@ -248,13 +248,14 @@ async def shutdown(self, timeout: int = 30) -> None:
if self.has_session():
logger.info("Saving state...")
self.notify_shutdown_state("Writing session state to disk.")
session = await self.get_session()
with open(self.state_dir / LTSTATE_FILENAME, "wb") as ltstate_file: # noqa: ASYNC230
ltstate_file.write(lt.bencode(self.get_session().save_state()))
ltstate_file.write(lt.bencode(session.save_state()))

if self.has_session() and self.config.get("libtorrent/upnp"):
logger.info("Stopping upnp...")
self.notify_shutdown_state("Stopping UPnP.")
self.get_session().stop_upnp()
(await self.get_session()).stop_upnp()

# Remove metadata temporary directory
if self.metadata_tmpdir:
Expand Down Expand Up @@ -360,12 +361,12 @@ def has_session(self, hops: int = 0) -> bool:
"""
return hops in self.ltsessions

def get_session(self, hops: int = 0) -> lt.session:
def get_session(self, hops: int = 0) -> Future[lt.session]:
"""
Get the session for the given number of anonymization hops.
"""
if hops not in self.ltsessions:
self.ltsessions[hops] = self.create_session(hops)
self.ltsessions[hops] = self.register_executor_task(f"Create session {hops}", self.create_session, hops)

return self.ltsessions[hops]

Expand All @@ -392,7 +393,7 @@ def set_max_connections(self, conns: int, hops: int | None = None) -> None:
"""
self._map_call_on_ltsessions(hops, "set_max_connections", conns)

def set_upload_rate_limit(self, rate: int) -> None:
async def set_upload_rate_limit(self, rate: int) -> None:
"""
Set the upload rate limit for the given session.
"""
Expand All @@ -403,18 +404,19 @@ def set_upload_rate_limit(self, rate: int) -> None:
# Pass outgoing_port and num_outgoing_ports to dict due to bug in libtorrent 0.16.18
settings_dict = {"upload_rate_limit": libtorrent_rate, "outgoing_port": 0, "num_outgoing_ports": 1}
for session in self.ltsessions.values():
self.set_session_settings(session, settings_dict)
self.set_session_settings(await session, settings_dict)

def get_upload_rate_limit(self, hops: int = 0) -> int:
async def get_upload_rate_limit(self, hops: int = 0) -> int:
"""
Get the upload rate limit for the session with the given hop count.
"""
# Rate conversion due to the fact that we had a different system with Swift
# and the old python BitTorrent core: unlimited == 0, stop == -1, else rate in kbytes
libtorrent_rate = self.get_session(hops).upload_rate_limit()
session = await self.get_session(hops)
libtorrent_rate = session.upload_rate_limit()
return self.reverse_convert_rate(rate=libtorrent_rate)

def set_download_rate_limit(self, rate: int) -> None:
async def set_download_rate_limit(self, rate: int) -> None:
"""
Set the download rate limit for the given session.
"""
Expand All @@ -423,13 +425,14 @@ def set_download_rate_limit(self, rate: int) -> None:
# Pass outgoing_port and num_outgoing_ports to dict due to bug in libtorrent 0.16.18
settings_dict = {"download_rate_limit": libtorrent_rate}
for session in self.ltsessions.values():
self.set_session_settings(session, settings_dict)
self.set_session_settings(await session, settings_dict)

def get_download_rate_limit(self, hops: int = 0) -> int:
async def get_download_rate_limit(self, hops: int = 0) -> int:
"""
Get the download rate limit for the session with the given hop count.
"""
libtorrent_rate = self.get_session(hops=hops).download_rate_limit()
session = await self.get_session(hops)
libtorrent_rate = session.download_rate_limit()
return self.reverse_convert_rate(rate=libtorrent_rate)

def process_alert(self, alert: lt.alert, hops: int = 0) -> None: # noqa: C901, PLR0912
Expand Down Expand Up @@ -598,23 +601,21 @@ def _task_cleanup_metainfo_cache(self) -> None:
if last_time < oldest_time:
del self.metainfo_cache[info_hash]

def _request_torrent_updates(self) -> None:
async def _request_torrent_updates(self) -> None:
for ltsession in self.ltsessions.values():
if ltsession:
ltsession.post_torrent_updates(0xffffffff)
(await ltsession).post_torrent_updates(0xffffffff)

def _task_process_alerts(self) -> None:
async def _task_process_alerts(self) -> None:
for hops, ltsession in list(self.ltsessions.items()):
if ltsession:
for alert in ltsession.pop_alerts():
self.process_alert(alert, hops=hops)
for alert in (await ltsession).pop_alerts():
self.process_alert(alert, hops=hops)

def _map_call_on_ltsessions(self, hops: int | None, funcname: str, *args: Any, **kwargs) -> None: # noqa: ANN401
if hops is None:
for session in self.ltsessions.values():
getattr(session, funcname)(*args, **kwargs)
session.add_done_callback(lambda s: getattr(s.result(), funcname)(*args, **kwargs))
else:
getattr(self.get_session(hops), funcname)(*args, **kwargs)
self.get_session(hops).add_done_callback(lambda s: getattr(s.result(), funcname)(*args, **kwargs))

async def start_download_from_uri(self, uri: str, config: DownloadConfig | None = None) -> Download:
"""
Expand Down Expand Up @@ -732,7 +733,7 @@ async def start_handle(self, download: Download, atp: dict) -> None:
if resume_data:
logger.debug("Download resume data: %s", str(atp["resume_data"]))

ltsession = self.get_session(download.config.get_hops())
ltsession = await self.get_session(download.config.get_hops())
infohash = download.get_def().get_infohash()

if infohash in self.metainfo_requests and self.metainfo_requests[infohash].download != download:
Expand Down Expand Up @@ -813,20 +814,20 @@ def update_max_rates_from_config(self) -> None:
This is the extra step necessary to apply a new maximum download/upload rate setting.
:return:
"""
rate = DownloadManager.get_libtorrent_max_upload_rate(self.config)
download_rate = DownloadManager.get_libtorrent_max_download_rate(self.config)
settings = {"download_rate_limit": download_rate,
"upload_rate_limit": rate}
for lt_session in self.ltsessions.values():
rate = DownloadManager.get_libtorrent_max_upload_rate(self.config)
download_rate = DownloadManager.get_libtorrent_max_download_rate(self.config)
settings = {"download_rate_limit": download_rate,
"upload_rate_limit": rate}
self.set_session_settings(lt_session, settings)
lt_session.add_done_callback(lambda s: self.set_session_settings(s.result(), settings))

def post_session_stats(self) -> None:
"""
Gather statistics and cause a ``session_stats_alert``.
"""
logger.info("Post session stats")
for session in self.ltsessions.values():
session.post_session_stats()
session.add_done_callback(lambda s: s.result().post_session_stats())

async def remove_download(self, download: Download, remove_content: bool = False,
remove_checkpoint: bool = True) -> None:
Expand All @@ -844,8 +845,7 @@ async def remove_download(self, download: Download, remove_content: bool = False
if download.stream is not None:
download.stream.disable()
logger.debug("Removing handle %s", hexlify(infohash))
ltsession = self.get_session(download.config.get_hops())
ltsession.remove_torrent(handle, int(remove_content))
(await self.get_session(download.config.get_hops())).remove_torrent(handle, int(remove_content))
else:
logger.debug("Cannot remove handle %s because it does not exists", hexlify(infohash))
await download.shutdown()
Expand Down Expand Up @@ -892,7 +892,7 @@ async def update_hops(self, download: Download, new_hops: int) -> None:

await self.start_download(tdef=download.tdef, config=config)

def update_trackers(self, infohash: bytes, trackers: list[str]) -> None:
def update_trackers(self, infohash: bytes, trackers: list[bytes]) -> None:
"""
Update the trackers for a download.
Expand Down
6 changes: 3 additions & 3 deletions src/tribler/core/libtorrent/restapi/libtorrent_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def get_libtorrent_settings(self, request: Request) -> RESTResponse:
if hop not in self.download_manager.ltsessions:
return RESTResponse({"hop": hop, "settings": {}})

lt_session = self.download_manager.ltsessions[hop]
lt_session = await self.download_manager.ltsessions[hop]
if hop == 0:
lt_settings = self.download_manager.get_session_settings(lt_session)
lt_settings["peer_fingerprint"] = hexlify(lt_settings["peer_fingerprint"].encode()).decode()
Expand Down Expand Up @@ -107,10 +107,10 @@ def on_session_stats_alert_received(alert: libtorrent.session_stats_alert) -> No
hop = int(args["hop"])

if hop not in self.download_manager.ltsessions or \
not hasattr(self.download_manager.ltsessions[hop], "post_session_stats"):
not hasattr(self.download_manager.ltsessions[hop].result(), "post_session_stats"):
return RESTResponse({"hop": hop, "session": {}})

self.download_manager.session_stats_callback = on_session_stats_alert_received
self.download_manager.ltsessions[hop].post_session_stats()
(await self.download_manager.ltsessions[hop]).post_session_stats()
stats = await session_stats
return RESTResponse({"hop": hop, "session": stats})
2 changes: 1 addition & 1 deletion src/tribler/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def start(self) -> None:
for server in self.socks_servers:
await server.start()
self.download_manager.socks_listen_ports = [s.port for s in self.socks_servers]
self.download_manager.initialize()
await self.download_manager.initialize()
self.download_manager.start()

# IPv8
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/test_integration/test_anon_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def add_mock_download_config(self, manager: DownloadManager, hops: int) ->
manager.metadata_tmpdir = Mock(name=config.get_dest_dir())
manager.checkpoint_directory = config.get_dest_dir()
manager.peer_mid = b"0000"
manager.initialize()
await manager.initialize()
manager.start()
await sleep(0)

Expand Down
4 changes: 2 additions & 2 deletions src/tribler/test_integration/test_hidden_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async def add_mock_download_config(self, manager: DownloadManager, hops: int) ->
manager.metadata_tmpdir = Mock(name=config.get_dest_dir())
manager.checkpoint_directory = config.get_dest_dir()
manager.peer_mid = b"0000"
manager.initialize()
await manager.initialize()
manager.start()
await sleep(0)

Expand All @@ -206,7 +206,7 @@ async def start_seeding(self) -> bytes:
"""
config = await self.add_mock_download_config(self.download_manager_seeder, 1)

with open(config.get_dest_dir() / "ubuntu-15.04-desktop-amd64.iso", "wb") as f: # noqa: ASYNC101
with open(config.get_dest_dir() / "ubuntu-15.04-desktop-amd64.iso", "wb") as f: # noqa: ASYNC230
f.write(bytes([0] * 524288))

metainfo = create_torrent_file([config.get_dest_dir() / "ubuntu-15.04-desktop-amd64.iso"], {})["metainfo"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,15 +669,17 @@ def test_on_save_resume_data_alert_permission_denied(self) -> None:
self.assertTrue(download.config.config["TEST_CRASH"])
self.assertEqual("name", download.config.config["download_defaults"]["name"])

def test_get_tracker_status_unicode_decode_error(self) -> None:
async def test_get_tracker_status_unicode_decode_error(self) -> None:
"""
Test if a tracker status is returned when getting trackers leads to a UnicodeDecodeError.
See: https://github.com/Tribler/tribler/issues/7036
"""
download = Download(TorrentDefNoMetainfo(b"\x01" * 20, b"name"), None, checkpoint_disabled=True,
config=self.create_mock_download_config())
download.download_manager = Mock(get_session=Mock(return_value=Mock(is_dht_running=Mock(return_value=False))))
fut = Future()
fut.set_result(Mock(is_dht_running=Mock(return_value=False)))
download.download_manager = Mock(get_session=Mock(return_value=fut))
download.handle = Mock(is_valid=Mock(return_value=True),
get_peer_info=Mock(
return_value=[Mock(source=1, dht=1, pex=0)] * 42 + [Mock(source=1, pex=1, dht=0)] * 7
Expand Down
Loading

0 comments on commit 7d7c943

Please sign in to comment.