diff --git a/tools/client_server/client.py b/tools/client_server/client.py index 45ca274..ae69fe0 100644 --- a/tools/client_server/client.py +++ b/tools/client_server/client.py @@ -13,7 +13,7 @@ from collections import deque -from tools.client_server.server import CMD, RDY_CMD, RDY_CMD_NAME, SRV_HDR, SRV_HDR_SIGN, SRV_HDR_VER +from tools.client_server import common as cli_srv_common class TcpSender(threading.Thread): @@ -66,14 +66,13 @@ def send(self, data): with self.ssl_ctx.wrap_socket(sk.create_connection(self.addr)) as ss: self.log.debug('send `%s`', fp.name) - ss.send(SRV_HDR.pack(SRV_HDR_SIGN, SRV_HDR_VER, len(d_raw))) - ss.send(d_raw) + ss.send(cli_srv_common.pack_hdr(cli_srv_common.HDR_CMD_SEND, d_raw)) - if self._send_routine(ss, fp): + if self._send_routine(ss, fp, data['fsize']): self.log.debug('send `%s` done', fp.name) return fp - def _send_routine(self, ss: ssl.SSLSocket, fp: pathlib.Path): + def _send_routine(self, ss: ssl.SSLSocket, fp: pathlib.Path, fsz: int): poller = select.poll() poller.register(ss, select.POLLIN) poller.register(self._stop_rd, select.POLLIN) @@ -87,17 +86,28 @@ def _send_routine(self, ss: ssl.SSLSocket, fp: pathlib.Path): return if ss.fileno() in x: - cmd, = CMD.unpack(ss.recv(CMD.size)) - if cmd == RDY_CMD_NAME: - off, = RDY_CMD.unpack(ss.recv(RDY_CMD.size)) - to = 0 - f = fp.open('rb') - f.seek(off, os.SEEK_SET) - if self.compress: - zo = zlib.compressobj(wbits=-9) + z = cli_srv_common.read_cmd(ss) + if not z: + if fsz: + raise ConnectionError('Connection lost') else: - return 1 + cmd, args = z + if cmd == cli_srv_common.REPLY_CMD_RDY: + off, = args + to = 0 + f = fp.open('rb') + f.seek(off, os.SEEK_SET) + fsz -= off + if self.compress: + zo = zlib.compressobj(wbits=-9) + + elif cmd == cli_srv_common.REPLY_CMD_END: + return 1 + + else: + self.log.error('%s', cli_srv_common.reply_name.get(cmd, cmd)) + return if to == 0: d = f.read(self.buffer_sz) @@ -108,6 +118,7 @@ def _send_routine(self, ss: ssl.SSLSocket, fp: pathlib.Path): f.close() return 1 + fsz -= len(d) if not self.compress: ss.send(d) else: diff --git a/tools/client_server/common.py b/tools/client_server/common.py new file mode 100644 index 0000000..afb3c3a --- /dev/null +++ b/tools/client_server/common.py @@ -0,0 +1,57 @@ +import collections +import struct + + +HDR = struct.Struct('!4sBBI') +Hdr = collections.namedtuple('Hdr', 'sign ver cmd sz') + +HDR_SIGN = b'SREC' +HDR_VER = 1 + +HDR_CMD_SEND = 1 + + +def pack_hdr(cmd: int, data: bytes): + return HDR.pack(*Hdr(HDR_SIGN, HDR_VER, cmd, len(data))) + data + + +def verify_hdr(hdr: Hdr): + if hdr.sign != HDR_SIGN: + return 'SIGN', () + + if hdr.ver != HDR_VER: + return 'VER', (REPLY_CMD_VER,) + + +REPLY_CMD_S = struct.Struct('!B') +REPLY_CMD_ERR = 1 # some err +REPLY_CMD_VER = 2 # invalid version +REPLY_CMD_RDY = 3 +REPLY_CMD_END = 255 + +reply_name = { + REPLY_CMD_ERR: 'ERR', + REPLY_CMD_VER: 'VER', + REPLY_CMD_RDY: 'RDY', +} +_reply_map = { + REPLY_CMD_ERR: '', + REPLY_CMD_VER: '', + REPLY_CMD_RDY: 'Q', # offset +} + + +def get_reply_struct(cmd): + return struct.Struct('!' + _reply_map[cmd]) + + +def pack_reply(cmd, *a): + return REPLY_CMD_S.pack(cmd) + get_reply_struct(cmd).pack(*a) + + +def read_cmd(con): + x = con.recv(REPLY_CMD_S.size) + if x: + cmd, = REPLY_CMD_S.unpack(x) + s = get_reply_struct(cmd) + return cmd, s.unpack(con.recv(s.size)) diff --git a/tools/client_server/server.py b/tools/client_server/server.py index 89f6377..14b0733 100644 --- a/tools/client_server/server.py +++ b/tools/client_server/server.py @@ -28,15 +28,7 @@ from sats_receiver.utils import Decode, MapShapes, numbi_disp, close, Waterfall, WfMode, RawFileType from tools.client_server import gr_decoder - - -SRV_HDR = struct.Struct('!4sBI') # SREC, ver, sz -SRV_HDR_SIGN = b'SREC' -SRV_HDR_VER = 1 - -CMD = struct.Struct('!3s') -RDY_CMD = struct.Struct('!Q') # offset -RDY_CMD_NAME = b'RDY' +from tools.client_server import common as cli_srv_common class Worker(mp.Process): @@ -250,82 +242,62 @@ def apt_to_png_map(apt_filename, config): return ret_fn def __init__(self, request, client_address, server): - self.log = logging.getLogger('Handler') + try: + self.prefix = '%s:%s' % client_address + except: + self.prefix = '%s' % client_address + self.log = logging.getLogger('Handler: ' + self.prefix) + self.ctx = None super().__init__(request, client_address, server) - def recv_file(self, fp: pathlib.Path, fsz: int, compress: bool): - numfsz = numbi_disp(fsz) - sz_left = fsz - t = time.monotonic() - t_next = t + 1 - - mode = 'ab' - if compress: - # resume download not awailable - zo = zlib.decompressobj(wbits=-9) - mode = 'wb' - with fp.open(mode) as f: - off = f.tell() - sz_left -= off - self.wfile.write(CMD.pack(RDY_CMD_NAME)) - self.wfile.write(RDY_CMD.pack(off)) - - while sz_left: - data = self.request.recv(self.server.buf_sz) - if not data: - if compress: - sz_left -= f.write(zo.flush(zlib.Z_FINISH)) - if f.tell() == fsz: - break - raise ConnectionError('Connection lost') - - if not compress: - sz_left -= f.write(data) - else: - sz_left -= f.write(zo.decompress(data)) - sz_left -= f.write(zo.flush(zlib.Z_FULL_FLUSH)) - - t = time.monotonic() - if t > t_next: - t_next = t + 10 - self.log.debug('%s %s/%s', fp.name, numbi_disp(fsz - sz_left), numfsz) - - self.log.debug('%s %s/%s', fp.name, numbi_disp(fsz - sz_left), numfsz) + def send_reply(self, *a): + if a: + self.wfile.write(cli_srv_common.pack_reply(*a)) def handle(self): try: self._handle() except KeyError as e: - self.log.error('%s -> Missing key: %s', self.client_address, e) + self.log.error('Missing key: %s', e) except: - self.log.error('_handle from %s', self.client_address, exc_info=True) + self.log.error('_handle', exc_info=True) finally: try: - self.wfile.write(CMD.pack(b'END')) + self.send_reply(cli_srv_common.REPLY_CMD_END) except: pass def _handle(self): - hdr_b = self.rfile.read(SRV_HDR.size) - if not hdr_b: + hdr = self.rfile.read(cli_srv_common.HDR.size) + if not hdr: return try: - sign, ver, sz = hdr = SRV_HDR.unpack_from(hdr_b) - except struct.error as e: - self.log.debug('%s -> %s', self.client_address, e) + hdr = cli_srv_common.Hdr._make(cli_srv_common.HDR.unpack(hdr)) + except (struct.error, TypeError) as e: + self.log.error('%s: %s', e, hdr) return - if not (sign == SRV_HDR_SIGN and ver == SRV_HDR_VER): - self.log.debug('%s -> Invalid HDR: %s (%s)', self.client_address, hdr, hdr_b) + e = cli_srv_common.verify_hdr(hdr) + if e: + cause, e = e + self.log.error('Invalid %s: %s', e, hdr) + self.send_reply(*e) return - data = self.rfile.read(sz) + if hdr.cmd == cli_srv_common.HDR_CMD_SEND: + e = self.cmd_send(hdr) + if e: + self.send_reply(*e) + return + + def cmd_send(self, hdr: cli_srv_common.Hdr): + data = self.rfile.read(hdr.sz) try: params = json.loads(data.decode()) except json.JSONDecodeError as e: - self.log.error('%s -> %s %s Invalid JSON: %s', self.client_address, hdr, data, e) - return + self.log.error('Invalid JSON: %s', e) + return cli_srv_common.REPLY_CMD_ERR, s = io.StringIO() s.write(str(self.client_address) + '\n') @@ -336,16 +308,16 @@ def _handle(self): clients = json.load(self.server.client_list.open()) client_info = clients.get(params['secret']) if not client_info: - self.log.debug('%s -> Unknown secret: %s', self.client_address, params['secret']) - return + self.log.debug('Unknown secret: %s', params['secret']) + return cli_srv_common.REPLY_CMD_ERR, try: dtype = Decode[params['decoder_type']] except ValueError: - self.log.warning('%s -> Invalid dtype. Skip', self.client_address) - return + self.log.warning('Invalid dtype `%s`. Skip', params['decoder_type']) + return cli_srv_common.REPLY_CMD_ERR, - self.log.info('%s -> %s for %s', self.client_address, dtype, params['sat_name']) + self.log.info('%s for %s', dtype, params['sat_name']) out_dir = self.server.recv_path / params['sat_name'] out_dir.mkdir(parents=True, exist_ok=True) @@ -355,6 +327,44 @@ def _handle(self): self.server.worker.put(params, fp, dtype) + def recv_file(self, fp: pathlib.Path, fsz: int, compress: bool): + numfsz = numbi_disp(fsz) + sz_left = fsz + t = time.monotonic() + t_next = t + 1 + + mode = 'ab' + if compress: + # resume download not awailable + zo = zlib.decompressobj(wbits=-9) + mode = 'wb' + with fp.open(mode) as f: + off = f.tell() + sz_left -= off + self.send_reply(cli_srv_common.REPLY_CMD_RDY, off) + + while sz_left: + data = self.request.recv(self.server.buf_sz) + if not data: + if compress: + sz_left -= f.write(zo.flush(zlib.Z_FINISH)) + if f.tell() == fsz: + break + raise ConnectionError('Connection lost') + + if not compress: + sz_left -= f.write(data) + else: + sz_left -= f.write(zo.decompress(data)) + sz_left -= f.write(zo.flush(zlib.Z_FULL_FLUSH)) + + t = time.monotonic() + if t > t_next: + t_next = t + 10 + self.log.debug('%s/%s', numbi_disp(fsz - sz_left), numfsz) + + self.log.debug('%s %s/%s', fp.name, numbi_disp(fsz - sz_left), numfsz) + class _TcpServer(socketserver.TCPServer): allow_reuse_address = 1