Skip to content

Commit

Permalink
improve client-server communication
Browse files Browse the repository at this point in the history
  • Loading branch information
baskiton committed Oct 8, 2024
1 parent 624f948 commit e56a3d2
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 80 deletions.
39 changes: 25 additions & 14 deletions tools/client_server/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from collections import deque

from tools.client_server.server import CMD, RDY_CMD, RDY_CMD_NAME, SRV_HDR, SRV_HDR_SIGN, SRV_HDR_VER
from tools.client_server import common as cli_srv_common


class TcpSender(threading.Thread):
Expand Down Expand Up @@ -66,14 +66,13 @@ def send(self, data):
with self.ssl_ctx.wrap_socket(sk.create_connection(self.addr)) as ss:
self.log.debug('send `%s`', fp.name)

ss.send(SRV_HDR.pack(SRV_HDR_SIGN, SRV_HDR_VER, len(d_raw)))
ss.send(d_raw)
ss.send(cli_srv_common.pack_hdr(cli_srv_common.HDR_CMD_SEND, d_raw))

if self._send_routine(ss, fp):
if self._send_routine(ss, fp, data['fsize']):
self.log.debug('send `%s` done', fp.name)
return fp

def _send_routine(self, ss: ssl.SSLSocket, fp: pathlib.Path):
def _send_routine(self, ss: ssl.SSLSocket, fp: pathlib.Path, fsz: int):
poller = select.poll()
poller.register(ss, select.POLLIN)
poller.register(self._stop_rd, select.POLLIN)
Expand All @@ -87,17 +86,28 @@ def _send_routine(self, ss: ssl.SSLSocket, fp: pathlib.Path):
return

if ss.fileno() in x:
cmd, = CMD.unpack(ss.recv(CMD.size))
if cmd == RDY_CMD_NAME:
off, = RDY_CMD.unpack(ss.recv(RDY_CMD.size))
to = 0
f = fp.open('rb')
f.seek(off, os.SEEK_SET)
if self.compress:
zo = zlib.compressobj(wbits=-9)
z = cli_srv_common.read_cmd(ss)
if not z:
if fsz:
raise ConnectionError('Connection lost')

else:
return 1
cmd, args = z
if cmd == cli_srv_common.REPLY_CMD_RDY:
off, = args
to = 0
f = fp.open('rb')
f.seek(off, os.SEEK_SET)
fsz -= off
if self.compress:
zo = zlib.compressobj(wbits=-9)

elif cmd == cli_srv_common.REPLY_CMD_END:
return 1

else:
self.log.error('%s', cli_srv_common.reply_name.get(cmd, cmd))
return

if to == 0:
d = f.read(self.buffer_sz)
Expand All @@ -108,6 +118,7 @@ def _send_routine(self, ss: ssl.SSLSocket, fp: pathlib.Path):
f.close()
return 1

fsz -= len(d)
if not self.compress:
ss.send(d)
else:
Expand Down
57 changes: 57 additions & 0 deletions tools/client_server/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import collections
import struct


HDR = struct.Struct('!4sBBI')
Hdr = collections.namedtuple('Hdr', 'sign ver cmd sz')

HDR_SIGN = b'SREC'
HDR_VER = 1

HDR_CMD_SEND = 1


def pack_hdr(cmd: int, data: bytes):
return HDR.pack(*Hdr(HDR_SIGN, HDR_VER, cmd, len(data))) + data


def verify_hdr(hdr: Hdr):
if hdr.sign != HDR_SIGN:
return 'SIGN', ()

if hdr.ver != HDR_VER:
return 'VER', (REPLY_CMD_VER,)


REPLY_CMD_S = struct.Struct('!B')
REPLY_CMD_ERR = 1 # some err
REPLY_CMD_VER = 2 # invalid version
REPLY_CMD_RDY = 3
REPLY_CMD_END = 255

reply_name = {
REPLY_CMD_ERR: 'ERR',
REPLY_CMD_VER: 'VER',
REPLY_CMD_RDY: 'RDY',
}
_reply_map = {
REPLY_CMD_ERR: '',
REPLY_CMD_VER: '',
REPLY_CMD_RDY: 'Q', # offset
}


def get_reply_struct(cmd):
return struct.Struct('!' + _reply_map[cmd])


def pack_reply(cmd, *a):
return REPLY_CMD_S.pack(cmd) + get_reply_struct(cmd).pack(*a)


def read_cmd(con):
x = con.recv(REPLY_CMD_S.size)
if x:
cmd, = REPLY_CMD_S.unpack(x)
s = get_reply_struct(cmd)
return cmd, s.unpack(con.recv(s.size))
142 changes: 76 additions & 66 deletions tools/client_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,7 @@
from sats_receiver.utils import Decode, MapShapes, numbi_disp, close, Waterfall, WfMode, RawFileType

from tools.client_server import gr_decoder


SRV_HDR = struct.Struct('!4sBI') # SREC, ver, sz
SRV_HDR_SIGN = b'SREC'
SRV_HDR_VER = 1

CMD = struct.Struct('!3s')
RDY_CMD = struct.Struct('!Q') # offset
RDY_CMD_NAME = b'RDY'
from tools.client_server import common as cli_srv_common


class Worker(mp.Process):
Expand Down Expand Up @@ -250,82 +242,62 @@ def apt_to_png_map(apt_filename, config):
return ret_fn

def __init__(self, request, client_address, server):
self.log = logging.getLogger('Handler')
try:
self.prefix = '%s:%s' % client_address
except:
self.prefix = '%s' % client_address
self.log = logging.getLogger('Handler: ' + self.prefix)
self.ctx = None
super().__init__(request, client_address, server)

def recv_file(self, fp: pathlib.Path, fsz: int, compress: bool):
numfsz = numbi_disp(fsz)
sz_left = fsz
t = time.monotonic()
t_next = t + 1

mode = 'ab'
if compress:
# resume download not awailable
zo = zlib.decompressobj(wbits=-9)
mode = 'wb'
with fp.open(mode) as f:
off = f.tell()
sz_left -= off
self.wfile.write(CMD.pack(RDY_CMD_NAME))
self.wfile.write(RDY_CMD.pack(off))

while sz_left:
data = self.request.recv(self.server.buf_sz)
if not data:
if compress:
sz_left -= f.write(zo.flush(zlib.Z_FINISH))
if f.tell() == fsz:
break
raise ConnectionError('Connection lost')

if not compress:
sz_left -= f.write(data)
else:
sz_left -= f.write(zo.decompress(data))
sz_left -= f.write(zo.flush(zlib.Z_FULL_FLUSH))

t = time.monotonic()
if t > t_next:
t_next = t + 10
self.log.debug('%s %s/%s', fp.name, numbi_disp(fsz - sz_left), numfsz)

self.log.debug('%s %s/%s', fp.name, numbi_disp(fsz - sz_left), numfsz)
def send_reply(self, *a):
if a:
self.wfile.write(cli_srv_common.pack_reply(*a))

def handle(self):
try:
self._handle()
except KeyError as e:
self.log.error('%s -> Missing key: %s', self.client_address, e)
self.log.error('Missing key: %s', e)
except:
self.log.error('_handle from %s', self.client_address, exc_info=True)
self.log.error('_handle', exc_info=True)
finally:
try:
self.wfile.write(CMD.pack(b'END'))
self.send_reply(cli_srv_common.REPLY_CMD_END)
except:
pass

def _handle(self):
hdr_b = self.rfile.read(SRV_HDR.size)
if not hdr_b:
hdr = self.rfile.read(cli_srv_common.HDR.size)
if not hdr:
return

try:
sign, ver, sz = hdr = SRV_HDR.unpack_from(hdr_b)
except struct.error as e:
self.log.debug('%s -> %s', self.client_address, e)
hdr = cli_srv_common.Hdr._make(cli_srv_common.HDR.unpack(hdr))
except (struct.error, TypeError) as e:
self.log.error('%s: %s', e, hdr)
return

if not (sign == SRV_HDR_SIGN and ver == SRV_HDR_VER):
self.log.debug('%s -> Invalid HDR: %s (%s)', self.client_address, hdr, hdr_b)
e = cli_srv_common.verify_hdr(hdr)
if e:
cause, e = e
self.log.error('Invalid %s: %s', e, hdr)
self.send_reply(*e)
return

data = self.rfile.read(sz)
if hdr.cmd == cli_srv_common.HDR_CMD_SEND:
e = self.cmd_send(hdr)
if e:
self.send_reply(*e)
return

def cmd_send(self, hdr: cli_srv_common.Hdr):
data = self.rfile.read(hdr.sz)
try:
params = json.loads(data.decode())
except json.JSONDecodeError as e:
self.log.error('%s -> %s %s Invalid JSON: %s', self.client_address, hdr, data, e)
return
self.log.error('Invalid JSON: %s', e)
return cli_srv_common.REPLY_CMD_ERR,

s = io.StringIO()
s.write(str(self.client_address) + '\n')
Expand All @@ -336,16 +308,16 @@ def _handle(self):
clients = json.load(self.server.client_list.open())
client_info = clients.get(params['secret'])
if not client_info:
self.log.debug('%s -> Unknown secret: %s', self.client_address, params['secret'])
return
self.log.debug('Unknown secret: %s', params['secret'])
return cli_srv_common.REPLY_CMD_ERR,

try:
dtype = Decode[params['decoder_type']]
except ValueError:
self.log.warning('%s -> Invalid dtype. Skip', self.client_address)
return
self.log.warning('Invalid dtype `%s`. Skip', params['decoder_type'])
return cli_srv_common.REPLY_CMD_ERR,

self.log.info('%s -> %s for %s', self.client_address, dtype, params['sat_name'])
self.log.info('%s for %s', dtype, params['sat_name'])

out_dir = self.server.recv_path / params['sat_name']
out_dir.mkdir(parents=True, exist_ok=True)
Expand All @@ -355,6 +327,44 @@ def _handle(self):

self.server.worker.put(params, fp, dtype)

def recv_file(self, fp: pathlib.Path, fsz: int, compress: bool):
numfsz = numbi_disp(fsz)
sz_left = fsz
t = time.monotonic()
t_next = t + 1

mode = 'ab'
if compress:
# resume download not awailable
zo = zlib.decompressobj(wbits=-9)
mode = 'wb'
with fp.open(mode) as f:
off = f.tell()
sz_left -= off
self.send_reply(cli_srv_common.REPLY_CMD_RDY, off)

while sz_left:
data = self.request.recv(self.server.buf_sz)
if not data:
if compress:
sz_left -= f.write(zo.flush(zlib.Z_FINISH))
if f.tell() == fsz:
break
raise ConnectionError('Connection lost')

if not compress:
sz_left -= f.write(data)
else:
sz_left -= f.write(zo.decompress(data))
sz_left -= f.write(zo.flush(zlib.Z_FULL_FLUSH))

t = time.monotonic()
if t > t_next:
t_next = t + 10
self.log.debug('%s/%s', numbi_disp(fsz - sz_left), numfsz)

self.log.debug('%s %s/%s', fp.name, numbi_disp(fsz - sz_left), numfsz)


class _TcpServer(socketserver.TCPServer):
allow_reuse_address = 1
Expand Down

0 comments on commit e56a3d2

Please sign in to comment.