Skip to content

Commit

Permalink
More client refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
SaladDais committed Dec 14, 2023
1 parent 0456b4b commit a2ef3d9
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 65 deletions.
10 changes: 10 additions & 0 deletions hippolyzer/lib/base/message/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dataclasses
import datetime as dt
import logging
from collections import deque
from typing import *
from typing import Optional

Expand Down Expand Up @@ -40,6 +41,8 @@ def __init__(
self.packet_id_base = 0
self.unacked_reliable: Dict[Tuple[Direction, int], ReliableResendInfo] = {}
self.resend_every: float = 3.0
# Reliable messages that we've already seen and handled, for resend suppression
self.seen_reliable: deque[int] = deque(maxlen=1_000)

def _send_prepared_message(self, message: Message, transport=None):
try:
Expand Down Expand Up @@ -131,6 +134,13 @@ def send_acks(self, to_ack: Sequence[int], direction=Direction.OUT, packet_id=No
message.direction = direction
self.send(message)

def track_reliable(self, packet_id: int) -> bool:
"""Tracks a reliable packet, returning if it's a new message"""
if packet_id in self.seen_reliable:
return False
self.seen_reliable.append(packet_id)
return True

def __repr__(self):
return "<%s %r : %r>" % (self.__class__.__name__, self.near_host, self.host)

Expand Down
140 changes: 75 additions & 65 deletions hippolyzer/lib/client/hippo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,18 @@ def datagram_received(self, data, source_addr: ADDR_TUPLE):

region.circuit.collect_acks(message)

# TODO: Ignore resends we already have, our ACK may have been missed
should_handle = True
if message.reliable:
# This is a bit crap. We send an ACK immediately through a PacketAck.
# This is pretty wasteful, we should batch them up and send them on a timer.
# We should ACK even if it's a resend of something we've already handled, maybe
# they never got the ACK.
region.circuit.send_acks((message.packet_id,))
should_handle = region.circuit.track_reliable(message.packet_id)

try:
self.session.message_handler.handle(message)
if should_handle:
self.session.message_handler.handle(message)
except:
LOG.exception("Failed in region message handler")
region.message_handler.handle(message)
Expand Down Expand Up @@ -123,12 +127,79 @@ def cap_urls(self) -> multidict.MultiDict:
return self.caps.copy()

async def connect(self):
# Disconnect if we're already connected
# Disconnect first if we're already connected
await self.disconnect()

self._eq_task = asyncio.get_event_loop().create_task(self._poll_event_queue())
await self.circuit.send_reliable(
Message(
"UseCircuitCode",
Block("CircuitCode", Code=self.session().circuit_code, SessionID=self.session().id, ID=self.session().agent_id),
)
)
# TODO: What happens if a circuit code is invalid, again?
self.circuit.is_alive = True

# Clear out any old caps urls except the seed URL, we're about to fetch new caps.
seed_url = self.caps["Seed"]
self.caps.clear()
self.caps["Seed"] = seed_url

# Kick this off and await it later
seed_resp_fut = self.caps_client.post("Seed", llsd=list(self.session().session_manager.SUPPORTED_CAPS))

# Register first so we can handle it even if the ack happens after the message is sent
region_handshake_fut = self.message_handler.wait_for(("RegionHandshake",))

# TODO: This is only for the "main" circuit, shouldn't be done for others
await self.circuit.send_reliable(
Message(
"CompleteAgentMovement",
Block(
"AgentData",
AgentID=self.session().agent_id,
SessionID=self.session().id,
CircuitCode=self.session().circuit_code
),
)
)
self.name = str((await region_handshake_fut)["RegionInfo"][0]["SimName"])
await self.circuit.send_reliable(
Message(
"RegionHandshakeReply",
Block("AgentData", AgentID=self.session().agent_id, SessionID=self.session().id),
Block(
"RegionInfo",
Flags=(
RegionHandshakeReplyFlags.SUPPORTS_SELF_APPEARANCE
| RegionHandshakeReplyFlags.VOCACHE_IS_EMPTY
)
)
)
)
# TODO: do we need to send this for every region or just the first?
await self.circuit.send_reliable(
Message(
"AgentThrottle",
Block(
"AgentData",
AgentID=self.session().agent_id,
SessionID=self.session().id,
CircuitCode=self.session().circuit_code,
),
Block(
"Throttle",
GenCounter=0,
# Reasonable defaults, I guess
Throttles_=[207360.0, 165376.0, 33075.19921875, 33075.19921875, 682700.75, 682700.75, 269312.0],
)
)
)
async with seed_resp_fut as seed_resp:
seed_resp.raise_for_status()
self.update_caps(await seed_resp.read_llsd())

self._eq_task = asyncio.get_event_loop().create_task(self._poll_event_queue())

async def disconnect(self):
if self._eq_task is not None:
self._eq_task.cancel()
Expand Down Expand Up @@ -208,16 +279,6 @@ async def open_circuit(self, circuit_addr: ADDR_TUPLE):
# Whatever, already open
logging.debug("Tried to re-open circuit for %r" % (circuit_addr,))
valid_circuit = True

if valid_circuit:
await region.circuit.send_reliable(
Message(
"UseCircuitCode",
Block("CircuitCode", Code=self.circuit_code, SessionID=self.id, ID=self.agent_id),
)
)
# TODO: What happens if a circuit code is invalid, again?
region.circuit.is_alive = True
return valid_circuit
return False

Expand Down Expand Up @@ -465,57 +526,6 @@ async def login(
assert await self.session.open_circuit(self.session.regions[-1].circuit_addr)
region = self.session.regions[-1]
self.session.main_region = region

# Kick this off and await it later
seed_resp_fut = region.caps_client.post("Seed", llsd=list(self.SUPPORTED_CAPS))

# Register first so we can handle it even if the ack happens after the message is sent
region_handshake_fut = region.message_handler.wait_for(("RegionHandshake",))
await region.circuit.send_reliable(
Message(
"CompleteAgentMovement",
Block(
"AgentData",
AgentID=self.session.agent_id,
SessionID=self.session.id,
CircuitCode=self.session.circuit_code
),
)
)
region.name = str((await region_handshake_fut)["RegionInfo"][0]["SimName"])
await region.circuit.send_reliable(
Message(
"RegionHandshakeReply",
Block("AgentData", AgentID=self.session.agent_id, SessionID=self.session.id),
Block(
"RegionInfo",
Flags=(
RegionHandshakeReplyFlags.SUPPORTS_SELF_APPEARANCE
| RegionHandshakeReplyFlags.VOCACHE_IS_EMPTY
)
)
)
)
await region.circuit.send_reliable(
Message(
"AgentThrottle",
Block(
"AgentData",
AgentID=self.session.agent_id,
SessionID=self.session.id,
CircuitCode=self.session.circuit_code,
),
Block(
"Throttle",
GenCounter=0,
# Reasonable defaults, I guess
Throttles_=[207360.0, 165376.0, 33075.19921875, 33075.19921875, 682700.75, 682700.75, 269312.0],
)
)
)
async with seed_resp_fut as seed_resp:
seed_resp.raise_for_status()
region.update_caps(await seed_resp.read_llsd())
await region.connect()

async def logout(self):
Expand Down

0 comments on commit a2ef3d9

Please sign in to comment.