From b1ccf750fff024176e1cbff75f413fb410822bb8 Mon Sep 17 00:00:00 2001 From: Vasily Evseenko Date: Sat, 23 Nov 2024 12:54:06 +0300 Subject: [PATCH] Add JSON API #382 --- wfb_ng/conf/master.cfg | 9 ++++-- wfb_ng/protocols.py | 69 +++++++++++++++++++++++++++++++++++++----- wfb_ng/server.py | 11 +++++-- 3 files changed, 76 insertions(+), 13 deletions(-) diff --git a/wfb_ng/conf/master.cfg b/wfb_ng/conf/master.cfg index 5fac8400..eb07cdf1 100644 --- a/wfb_ng/conf/master.cfg +++ b/wfb_ng/conf/master.cfg @@ -106,8 +106,9 @@ base_port_node = 11000 # UDP ports allocated on nodes ## Required fiels in top level profiles ## 1. "streams" -- that have required keys: "name", "service_type" and "profiles". Any other k/v pairs can be used to override values inherided from low level profiles. -## 2. "stats_port" -- UDP port for CLI. Should be int or None to disable statistics completely. -## 3. "link_domain" -- moved from common.link_id +## 2. "stats_port" -- TCP port for CLI. Should be int or None to disable statistics completely. +## 3. "api_port" -- TCP port for public JSON API. Should be int or None to disable API. +## 4. "link_domain" -- moved from common.link_id ## All streams can be point-to-point(one- or two-way transfers) or point-to-multipoint(one-way transfers only), but not multipoint-to-point or multipoint-to-multipoint ## In case of one-way stream you can specify only "stream_rx" on RX side and "stream_tx" on TX side. @@ -125,6 +126,7 @@ streams = [{'name': 'video', 'stream_rx': None, 'stream_tx': 0x00, 'service_ty ] stats_port = 8002 # used by wfb-cli, set to None completely disable statistics API +api_port = 8102 # public JSON API link_domain = "default" # It will be hashed and mapped to three bytes of MAC # You can use different link ids for multi-vehicle setup without stream remapping. @@ -140,6 +142,7 @@ streams = [{'name': 'video', 'stream_rx': 0x00, 'stream_tx': None, 'service_ty ] stats_port = 8003 # used by wfb-cli +api_port = 8103 # public JSON API link_domain = "default" ## Example of custom profiles not related to drones @@ -149,6 +152,7 @@ streams = [{'name': 'audio', 'stream_rx': 0xb0, 'stream_tx': 0x30, 'service_type 'profiles': ['base', 'radio_base'], 'peer': 'listen://127.0.0.1:1235', 'keypair': 'drone.key'}] stats_port = 8004 +api_port = 8104 link_domain = "test_two_way_udp" [two_way_udp_example_side_b] @@ -156,6 +160,7 @@ streams = [{'name': 'audio', 'stream_rx': 0x30, 'stream_tx': 0xb0, 'service_type 'profiles': ['base', 'radio_base'], 'peer': 'connect://127.0.0.1:1234', 'keypair': 'gs.key'}] stats_port = 8005 +api_port = 8105 link_domain = "test_two_way_udp" ########################################################################################################################### diff --git a/wfb_ng/protocols.py b/wfb_ng/protocols.py index 51b83a3c..4c45db7d 100644 --- a/wfb_ng/protocols.py +++ b/wfb_ng/protocols.py @@ -21,8 +21,10 @@ import msgpack import os import time +import json from itertools import groupby +from copy import deepcopy from twisted.python import log, failure from twisted.internet import reactor, defer, threads, task from twisted.internet.protocol import ProcessProtocol, Factory @@ -42,9 +44,7 @@ class WFBFlags(object): fec_types = {1: 'VDM_RS'} -class StatisticsProtocol(Int32StringReceiver): - MAX_LENGTH = 1024 * 1024 - +class StatisticsMsgPackProtocol(Int32StringReceiver): def connectionMade(self): # Push all config values for CLI into session # to allow CLI run without config file @@ -69,6 +69,43 @@ def send_stats(self, data): self.sendString(msgpack.packb(data, use_bin_type=True)) +class StatisticsJSONProtocol(LineReceiver): + delimiter = b'\n' + + def connectionMade(self): + # Push all config values on the start + msg = json.dumps(dict(type='settings', + profile=self.factory.profile, + is_cluster=self.factory.is_cluster, + settings = deepcopy(settings))) + + self.sendLine(msg.encode('utf-8')) + self.factory.ui_sessions.append(self) + + def lineReceived(self, line): + pass + + def connectionLost(self, reason): + self.factory.ui_sessions.remove(self) + + def send_stats(self, data): + data = dict(data) + + if data['type'] == 'rx': + ka = ('ant', 'freq', 'mcs', 'bw') + va = ('pkt_recv', 'rssi_min', 'rssi_avg', 'rssi_max', 'snr_min', 'snr_avg', 'snr_max') + data['rx_ant_stats'] = list(dict(zip(ka + va, (ant_id,) + k + v)) + for (k, ant_id), v in data.pop('rx_ant_stats').items()) + elif data['type'] == 'tx': + ka = ('ant',) + va = ('pkt_sent', 'pkt_drop', 'lat_min', 'lat_avg', 'lat_max') + data['tx_ant_stats'] = list(dict(zip(ka + va, (k,) + v)) + for k, v in data.pop('latency').items()) + + msg = json.dumps(data) + self.sendLine(msg.encode('utf-8')) + + class RFTempMeter(object): def __init__(self, wlans, measurement_interval): # RF module temperature by rf_path @@ -112,16 +149,33 @@ def _got_temp(temp_d): return threads.deferToThread(_read_temperature).addCallback(_got_temp) -class StatsAndSelectorFactory(Factory): + +class MsgPackAPIFactory(Factory): noisy = False - protocol = StatisticsProtocol + protocol = StatisticsMsgPackProtocol + + def __init__(self, ui_sessions, is_cluster=False, cli_title=None): + self.ui_sessions = ui_sessions + self.is_cluster = is_cluster + self.cli_title = cli_title + +class JSONAPIFactory(Factory): + noisy = False + protocol = StatisticsJSONProtocol + + def __init__(self, ui_sessions, is_cluster=False, profile=None): + self.ui_sessions = ui_sessions + self.is_cluster = is_cluster + self.profile = profile + + +class AntStatsAndSelector(object): """ Aggregate RX stats and select TX antenna """ - def __init__(self, logger, cli_title=None, rf_temp_meter=None, is_cluster=False, rx_only_wlan_ids=None): - self.is_cluster = is_cluster + def __init__(self, logger, rx_only_wlan_ids=None, rf_temp_meter=None): self.rx_only_wlan_ids = rx_only_wlan_ids or set() self.ant_sel_cb_list = [] self.rssi_cb_l = [] @@ -141,7 +195,6 @@ def __init__(self, logger, cli_title=None, rf_temp_meter=None, is_cluster=False, if logger is not None: self.ui_sessions.append(logger) - self.cli_title = cli_title self.rf_temp_meter = rf_temp_meter self.lc = task.LoopingCall(self.aggregate_stats) diff --git a/wfb_ng/server.py b/wfb_ng/server.py index 1a3e1394..ca35de49 100644 --- a/wfb_ng/server.py +++ b/wfb_ng/server.py @@ -32,7 +32,7 @@ from . import _log_msg, ConsoleObserver, ErrorSafeLogFile, call_and_check_rc, ExecError, version_msg from .common import abort_on_crash, exit_status, df_sleep, search_attr -from .protocols import StatsAndSelectorFactory, RFTempMeter, SSHClientProtocol +from .protocols import AntStatsAndSelector, RFTempMeter, SSHClientProtocol, MsgPackAPIFactory, JSONAPIFactory from .services import parse_services, init_udp_direct_tx, init_udp_direct_rx, init_mavlink, init_tunnel, init_udp_proxy, hash_link_domain, bandwidth_map from .cluster import parse_cluster_services, gen_cluster_scripts from .conf import settings, cfg_files @@ -216,13 +216,18 @@ def _cleanup(x): 'cluster' if is_cluster else ', '.join(wlans), profile_cfg.link_domain) - ant_sel_f = StatsAndSelectorFactory(logger, cli_title, rf_temp_meter, is_cluster, rx_only_wlan_ids) + ant_sel_f = AntStatsAndSelector(logger, rx_only_wlan_ids, rf_temp_meter) cleanup_l.append(ant_sel_f) link_id = hash_link_domain(profile_cfg.link_domain) if profile_cfg.stats_port: - sockets.append(reactor.listenTCP(profile_cfg.stats_port, ant_sel_f)) + p_f = MsgPackAPIFactory(ant_sel_f.ui_sessions, is_cluster, cli_title) + sockets.append(reactor.listenTCP(profile_cfg.stats_port, p_f)) + + if profile_cfg.api_port: + p_f = JSONAPIFactory(ant_sel_f.ui_sessions, is_cluster, profile) + sockets.append(reactor.listenTCP(profile_cfg.api_port, p_f)) for service_name, service_type, srv_cfg in service_list: log.msg('Starting %s/%s@%s' % (profile, service_name, profile_cfg.link_domain))