Skip to content

Commit

Permalink
Fix Not reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
olijeffers0n committed Apr 3, 2022
1 parent b609822 commit 0dfb897
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 45 deletions.
4 changes: 2 additions & 2 deletions rustplus/api/base_rust_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ def _generate_protobuf(self) -> AppRequest:

return app_request

async def connect(self, retries: int = float("inf")) -> None:
async def connect(self, retries: int = float("inf"), delay: int = 20) -> None:
"""
Attempts to open a connection to the rust game server specified in the constructor
:return: None
"""
try:
if self.remote.ws is None:
self.remote.connect(retries=retries)
self.remote.connect(retries=retries, delay=delay)
await self.send_wakeup_request()
await self.heartbeat.start_beat()
except ConnectionRefusedError:
Expand Down
2 changes: 1 addition & 1 deletion rustplus/api/remote/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def _heart_beat(self) -> None:

async def beat(self) -> None:

if self.rust_api.remote.ws is not None and self.rust_api.remote.ws.open:
if self.rust_api.remote.ws is not None and self.rust_api.remote.ws.connection_status:
await self.rust_api.send_wakeup_request()

def reset_rhythm(self) -> None:
Expand Down
44 changes: 24 additions & 20 deletions rustplus/api/remote/rust_remote_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,27 @@

from .event_handler import EventHandler
from .rustplus_proto import *
from .rustws import RustWebsocket
from .rustws import RustWebsocket, PENDING_CONNECTION
from .token_bucket import RateLimiter
from ...commands import CommandHandler
from ...exceptions import (
ClientNotConnectedError,
ResponseNotRecievedError,
ResponseNotReceivedError,
RequestError,
)
from ...utils import RegisteredListener


class RustRemote:
def __init__(
self,
ip,
port,
command_options,
ratelimit_limit,
ratelimit_refill,
websocket_length=600,
use_proxy: bool = False,
self,
ip,
port,
command_options,
ratelimit_limit,
ratelimit_refill,
websocket_length=600,
use_proxy: bool = False,
) -> None:

self.ip = ip
Expand All @@ -37,11 +37,10 @@ def __init__(
ratelimit_limit, ratelimit_limit, 1, ratelimit_refill
)
self.ws = None
self.is_pending = False
self.websocket_length = websocket_length
self.responses = {}
self.ignored_responses = []
self.pending_requests = {}
self.pending_for_response = {}
self.sent_requests = []

if command_options is None:
Expand All @@ -53,12 +52,12 @@ def __init__(

self.event_handler = EventHandler()

def connect(self, retries) -> None:
def connect(self, retries, delay) -> None:

self.ws = RustWebsocket(
ip=self.ip, port=self.port, remote=self, use_proxy=self.use_proxy
)
self.ws.connect(retries=retries)
self.ws.connect(retries=retries, delay=delay)

def close(self) -> None:

Expand All @@ -67,9 +66,14 @@ def close(self) -> None:
del self.ws
self.ws = None

def is_pending(self) -> bool:
if self.ws is not None:
return self.ws.connection_status == PENDING_CONNECTION
return False

async def send_message(self, request: AppRequest) -> None:

self.ws.send_message(request)
await self.ws.send_message(request)

async def get_response(self, seq: int, app_request: AppRequest, error_check: bool = True) -> AppMessage:
"""
Expand All @@ -78,7 +82,7 @@ async def get_response(self, seq: int, app_request: AppRequest, error_check: boo

attempts = 0

while seq in self.pending_requests:
while seq in self.pending_for_response and seq not in self.responses:

if seq in self.sent_requests:

Expand All @@ -90,7 +94,7 @@ async def get_response(self, seq: int, app_request: AppRequest, error_check: boo
else:

await self.send_message(app_request)
await asyncio.sleep(1)
await asyncio.sleep(0.1)
attempts = 0

if attempts <= 10:
Expand All @@ -103,7 +107,7 @@ async def get_response(self, seq: int, app_request: AppRequest, error_check: boo
attempts = 0

if seq not in self.responses:
raise ResponseNotRecievedError("Not Received")
raise ResponseNotReceivedError("Not Received")

response = self.responses.pop(seq)

Expand All @@ -122,7 +126,7 @@ async def get_response(self, seq: int, app_request: AppRequest, error_check: boo
self.ratelimiter.bucket.refresh()

# Reattempt the sending with a full bucket
cost = self.ws._get_proto_cost(app_request)
cost = self.ws.get_proto_cost(app_request)

while True:

Expand All @@ -145,7 +149,7 @@ def _sock(self, retries) -> RustWebsocket:
if self.ws is None:
raise ClientNotConnectedError("No Current Websocket Connection")

while self.is_pending:
while self.is_pending():
time.sleep(1)

if time.time() - self.ws.connected_time >= self.websocket_length:
Expand Down
52 changes: 30 additions & 22 deletions rustplus/api/remote/rustws.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import time
from datetime import datetime
Expand All @@ -7,7 +8,11 @@

from .rustplus_proto import AppMessage, AppRequest
from ..structures import RustChatMessage
from ...exceptions import ClientNotConnectedError
from ...exceptions import ClientNotConnectedError, RequestError

CONNECTED = 1
PENDING_CONNECTION = 2
CLOSED = 3


class RustWebsocket(websocket.WebSocket):
Expand All @@ -16,17 +21,17 @@ def __init__(self, ip, port, remote, use_proxy):
self.ip = ip
self.port = port
self.thread: Thread = None
self.open = False
self.connection_status = CLOSED
self.use_proxy = use_proxy
self.remote = remote
self.logger = logging.getLogger("rustplus.py")
self.connected_time = time.time()

super().__init__(enable_multithread=True)

def connect(self, retries=float("inf"), ignore=False) -> None:
def connect(self, retries=float("inf"), ignore_open_value: bool = False, delay: int = 20) -> None:

if ((not self.open) or ignore) and not self.remote.is_pending:
if (not self.connection_status == CONNECTED or ignore_open_value) and not self.remote.is_pending():

attempts = 0

Expand All @@ -35,7 +40,7 @@ def connect(self, retries=float("inf"), ignore=False) -> None:
if attempts >= retries:
raise ConnectionAbortedError("Reached Retry Limit")

self.remote.is_pending = True
self.connection_status = PENDING_CONNECTION

try:
address = (
Expand All @@ -49,39 +54,42 @@ def connect(self, retries=float("inf"), ignore=False) -> None:
except Exception:
self.logger.warning(
f"{datetime.now().strftime('%d/%m/%Y %H:%M:%S')} "
"[RustPlus.py] Cannot Connect to server. Retrying in 20 seconds"
f"[RustPlus.py] Cannot Connect to server. Retrying in {str(delay)} seconds"
)
attempts += 1
time.sleep(20)
time.sleep(delay)

self.remote.is_pending = False
self.connection_status = CONNECTED

self.open = True
if not ignore_open_value:

self.thread = Thread(
target=self.run, name="[RustPlus.py] WebsocketThread", daemon=True
)
self.thread = Thread(target=self.run, name="[RustPlus.py] WebsocketThread", daemon=True)
self.thread.start()

def close(self) -> None:

super().close()
self.open = False
self.connection_status = CLOSED

def send_message(self, message: AppRequest) -> None:
async def send_message(self, message: AppRequest) -> None:
"""
Send the Protobuf to the server
"""

if self.connection_status == CLOSED:
raise ClientNotConnectedError("Not Connected")

try:
self.remote.pending_requests[message.seq] = message
self.send_binary(message.SerializeToString())
self.remote.pending_for_response[message.seq] = message
except Exception:
if not self.open:
raise ClientNotConnectedError("Not Connected")
while self.remote.is_pending():
await asyncio.sleep(0.5)
return await self.remote.send_message(message)

def run(self) -> None:

while self.open:
while self.connection_status == CONNECTED:
try:
data = self.recv()

Expand All @@ -91,16 +99,16 @@ def run(self) -> None:
app_message.ParseFromString(data)

except Exception:
if self.open:
if self.connection_status == CONNECTED:
self.logger.warning(
f"{datetime.now().strftime('%d/%m/%Y %H:%M:%S')} [RustPlus.py] Connection interrupted, Retrying"
)
self.connect(ignore=True)
self.connect(ignore_open_value=True)
continue
return

try:
del self.remote.pending_requests[app_message.response.seq]
del self.remote.pending_for_response[app_message.response.seq]
except KeyError:
pass

Expand Down Expand Up @@ -166,7 +174,7 @@ async def _retry_failed_request(self, app_request: AppRequest):
await self.send_message(app_request)

@staticmethod
def _get_proto_cost(app_request) -> int:
def get_proto_cost(app_request) -> int:
"""
Gets the cost of an AppRequest
"""
Expand Down

0 comments on commit 0dfb897

Please sign in to comment.