Skip to content

Commit

Permalink
Move some things from session to region
Browse files Browse the repository at this point in the history
  • Loading branch information
SaladDais committed Dec 14, 2023
1 parent c5ed1cf commit 92c9c82
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 51 deletions.
5 changes: 5 additions & 0 deletions hippolyzer/lib/base/message/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def _send_prepared_message(self, message: Message, transport=None):
raise
return self.send_datagram(serialized, message.direction, transport=transport)

def disconnect(self):
self.packet_id_base = 0
self.unacked_reliable.clear()
self.is_alive = False

def send_datagram(self, data: bytes, direction: Direction, transport=None):
self.last_packet_at = dt.datetime.now()
src_addr, dst_addr = self.host, self.near_host
Expand Down
112 changes: 61 additions & 51 deletions hippolyzer/lib/client/hippo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ def __init__(self, circuit_addr, seed_cap: str, session: HippoClientSession, han
self.transfer_manager = TransferManager(proxify(self), session.agent_id, session.id)
self.asset_uploader = AssetUploader(proxify(self))
self.objects = ClientObjectManager(proxify(self))
self._llsd_serializer = LLSDMessageSerializer()
self._eq_task: Optional[asyncio.Task] = None

self.message_handler.subscribe("StartPingCheck", self._handle_ping_check)

def update_caps(self, caps: Mapping[str, str]) -> None:
self.caps.update(caps)
Expand All @@ -118,13 +122,64 @@ def update_caps(self, caps: Mapping[str, str]) -> None:
def cap_urls(self) -> multidict.MultiDict:
return self.caps.copy()

async def connect(self):
# Disconnect if we're already connected
await self.disconnect()

self._eq_task = asyncio.get_event_loop().create_task(self._poll_event_queue())
self.circuit.is_alive = True

async def disconnect(self):
if self._eq_task is not None:
self._eq_task.cancel()
self._eq_task = None
self.circuit.disconnect()

async def _poll_event_queue(self):
ack: Optional[int] = None
while True:
payload = {"ack": ack, "done": False}
async with self.caps_client.post("EventQueueGet", llsd=payload) as resp:
if resp.status != 200:
await asyncio.sleep(0.1)
continue
polled = await resp.read_llsd()
for event in polled["events"]:
if self._llsd_serializer.can_handle(event["message"]):
msg = self._llsd_serializer.deserialize(event)
else:
# If this isn't a templated message (like some EQ-only events are),
# then we wrap it in a synthetic `Message` so that the API for handling
# both EQ-only and templated message events can be the same. Ick.
msg = Message(event["message"])
if isinstance(event["body"], dict):
msg.add_block(Block("EventData", **event["body"]))
else:
# Shouldn't be any events that have anything other than a dict
# as a body, but just to be sure...
msg.add_block(Block("EventData", Data=event["body"]))
msg.synthetic = True
self.session().message_handler.handle(msg)
self.message_handler.handle(msg)
ack = polled["id"]
await asyncio.sleep(0.001)

async def _handle_ping_check(self, message: Message):
self.circuit.send(
Message(
"CompletePingCheck",
Block("PingID", PingID=message["PingID"]["PingID"]),
)
)


class HippoClientSession(BaseClientSession):
"""Represents a client's view of a remote session"""
REGION_CLS = HippoClientRegion

region_by_handle: Callable[[int], Optional[HippoClientRegion]]
region_by_circuit_addr: Callable[[ADDR_TUPLE], Optional[HippoClientRegion]]
regions: List[HippoClientRegion]
session_manager: HippoClient

def __init__(self, id, secure_session_id, agent_id, circuit_code, session_manager: Optional[HippoClient] = None,
Expand Down Expand Up @@ -155,8 +210,6 @@ async def open_circuit(self, circuit_addr: ADDR_TUPLE):
valid_circuit = True

if valid_circuit:
# TODO: This is a little bit crap, we need to know if a UseCircuitCode was ever ACKed
# before we can start sending other packets, otherwise we might have a race.
await region.circuit.send_reliable(
Message(
"UseCircuitCode",
Expand Down Expand Up @@ -322,8 +375,6 @@ def __init__(self, options: Optional[Set[str]] = None):
self.session: Optional[HippoClientSession] = None
self.settings = ClientSettings()
self._resend_task: Optional[asyncio.Task] = None
self._eq_task: Optional[asyncio.Task] = None
self._llsd_serializer = LLSDMessageSerializer()

async def aclose(self):
try:
Expand Down Expand Up @@ -459,18 +510,14 @@ async def login(
async with seed_resp_fut as seed_resp:
seed_resp.raise_for_status()
region.update_caps(await seed_resp.read_llsd())
self._eq_task = asyncio.get_event_loop().create_task(self._poll_event_queue())
self.session.main_region.message_handler.subscribe("StartPingCheck", self._handle_ping_check)
await region.connect()

async def logout(self):
if not self.session:
return
if self._resend_task:
self._resend_task.cancel()
self._resend_task = None
if self._eq_task:
self._eq_task.cancel()
self._eq_task = None

session = self.session
self.session = None
Expand All @@ -482,6 +529,8 @@ async def logout(self):
Block("AgentData", AgentID=session.agent_id, SessionID=session.id),
)
)
for region in session.regions:
await region.disconnect()
session.transport.close()

def send_chat(self, message: Union[bytes, str], channel: int = 0, chat_type=ChatType.NORMAL) -> asyncio.Future:
Expand All @@ -493,49 +542,10 @@ def send_chat(self, message: Union[bytes, str], channel: int = 0, chat_type=Chat

async def _attempt_resends(self):
while True:
await asyncio.sleep(0.5)
if self.session is None:
continue
break
for region in self.session.regions:
if not region.circuit or not region.circuit.is_alive:
if not region.circuit.is_alive:
continue
region.circuit.resend_unacked()

async def _poll_event_queue(self):
ack: Optional[int] = None
while True:
if self.session is None or self.session.main_region is None:
return
payload = {"ack": ack, "done": False}
async with self.session.main_region.caps_client.post("EventQueueGet", llsd=payload) as resp:
if resp.status != 200:
await asyncio.sleep(0.1)
continue
polled = await resp.read_llsd()
for event in polled["events"]:
if self._llsd_serializer.can_handle(event["message"]):
msg = self._llsd_serializer.deserialize(event)
else:
# If this isn't a templated message (like some EQ-only events are),
# then we wrap it in a synthetic `Message` so that the API for handling
# both EQ-only and templated message events can be the same. Ick.
msg = Message(event["message"])
if isinstance(event["body"], dict):
msg.add_block(Block("EventData", **event["body"]))
else:
# Shouldn't be any events that have anything other than a dict
# as a body, but just to be sure...
msg.add_block(Block("EventData", Data=event["body"]))
msg.synthetic = True
self.session.message_handler.handle(msg)
self.session.main_region.message_handler.handle(msg)
ack = polled["id"]
await asyncio.sleep(0.001)

async def _handle_ping_check(self, message: Message):
self.session.main_region.circuit.send(
Message(
"CompletePingCheck",
Block("PingID", PingID=message["PingID"]["PingID"]),
)
)
await asyncio.sleep(0.5)

0 comments on commit 92c9c82

Please sign in to comment.