From 486d76ca4a1bd03750fd4259ddf4b40cad88f850 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Wed, 11 Dec 2024 12:32:02 +0200 Subject: [PATCH 01/10] Fix for loosing remote logging status on connectors after gateway restart --- .../tb_utility/tb_gateway_remote_configurator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index 219faf50..5448d5cc 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -200,12 +200,14 @@ def send_current_configuration(self): def send_connector_current_configuration(self, connector_configuration: dict): config_to_send = {**connector_configuration, 'logLevel': connector_configuration.get('configurationJson', {}).get('logLevel', 'INFO'), + 'enableRemoteLogging': connector_configuration.get('configurationJson', {}).get('enableRemoteLogging', False), 'ts': int(time() * 1000) } if config_to_send.get('config') is not None: config_to_send.pop('config', None) if config_to_send.get("configurationJson", {}) is not None: config_to_send.get("configurationJson", {}).pop('logLevel', None) + config_to_send.get("configurationJson", {}).pop('enableRemoteLogging', None) if config_to_send.get('configurationJson', {}).get(REPORT_STRATEGY_PARAMETER) is not None: config_to_send[REPORT_STRATEGY_PARAMETER] = config_to_send['configurationJson'].pop(REPORT_STRATEGY_PARAMETER) # noqa if config_to_send.get('configurationJson', {}).get(CONFIG_VERSION_PARAMETER) is not None: From d7c5f84733bf61b1134c1ba3e403fbba6e121ac4 Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 12 Dec 2024 10:10:28 +0200 Subject: [PATCH 02/10] Fixed BACnet error handling and getting default values from config --- thingsboard_gateway/connectors/bacnet/bacnet_connector.py | 5 ++++- .../connectors/bacnet/entities/device_object_config.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py index e79f367d..7033b838 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py +++ b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py @@ -153,8 +153,11 @@ async def __discover_devices(self): for device_config in self.__config.get('devices', []): try: DeviceObjectConfig.update_address_in_config_util(device_config) - await self.__application.do_who_is(device_address=device_config['address']) + result = await self.__application.do_who_is(device_address=device_config['address']) self.__log.debug('WhoIs request sent to device %s', device_config['address']) + + if result is None: + self.__log.error('Device %s not found', device_config['address']) except Exception as e: self.__log.error('Error discovering device %s: %s', device_config['address'], e) diff --git a/thingsboard_gateway/connectors/bacnet/entities/device_object_config.py b/thingsboard_gateway/connectors/bacnet/entities/device_object_config.py index ea432445..29325017 100644 --- a/thingsboard_gateway/connectors/bacnet/entities/device_object_config.py +++ b/thingsboard_gateway/connectors/bacnet/entities/device_object_config.py @@ -23,7 +23,7 @@ def __init__(self, config): self.__network_number_quality = config.get('networkNumberQuality', 'configured') self.__max_apdu_length_accepted = int(config.get('maxApduLengthAccepted', 1024)) self.__segmentation_supported = config.get('segmentationSupported', 'segmentedBoth') - self.__vendor_identifier = int(config.get('vendorIdentifier', 15)) + self.__vendor_identifier = int(config.get('vendorIdentifier', 15) or 15) @property def device_object_config(self): From f294ee065a01bfff0d3360bcfba46002d5a0e01e Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 12 Dec 2024 12:49:40 +0200 Subject: [PATCH 03/10] Added ability to two attributes point out to same node --- .../connectors/opcua/opcua_connector.py | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index b6f63d98..1c8759ab 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -587,9 +587,10 @@ def __convert_sub_data(self): if device not in device_converted_data_map: device_converted_data_map[device] = ConvertedData(device_name=device.name) - node = device.nodes_data_change_subscriptions[sub_node.nodeid] + nodes_configs = device.nodes_data_change_subscriptions[sub_node.nodeid]['nodes_configs'] + nodes_values = [data.monitored_item.Value for _ in range(len(nodes_configs))] - converted_data = device.converter_for_sub.convert(node, data.monitored_item.Value) + converted_data = device.converter_for_sub.convert(nodes_configs, nodes_values) if converted_data: converted_data.add_to_metadata({ @@ -598,10 +599,11 @@ def __convert_sub_data(self): CONVERTED_TS_PARAMETER: int(time() * 1000) }) - if node['section'] == 'attributes': - device_converted_data_map[device].add_to_attributes(converted_data.attributes) - else: - device_converted_data_map[device].add_to_telemetry(converted_data.telemetry) + for node_config in nodes_configs: + if node_config['section'] == 'attributes': + device_converted_data_map[device].add_to_attributes(converted_data.attributes) + else: + device_converted_data_map[device].add_to_telemetry(converted_data.telemetry) except Exception as e: self.__log.exception("Error converting data: %s", e) @@ -704,8 +706,15 @@ async def _load_devices_nodes(self): if found_node.nodeid not in self.__nodes_config_cache: self.__nodes_config_cache[found_node.nodeid] = [] self.__nodes_config_cache[found_node.nodeid].append(device) - node_config['subscription'] = None - device.nodes_data_change_subscriptions[found_node.nodeid] = node_config + + device_node_config = { + 'subscription': None, + 'node': found_node, + 'nodes_configs': [] + } + device.nodes_data_change_subscriptions[found_node.nodeid] = device_node_config + + device.nodes_data_change_subscriptions[found_node.nodeid]['nodes_configs'].append(node_config) if device.subscription is None: device.subscription = await self.__client.create_subscription( From b5f5e248a31035d6fe7588d92b5cce0d28ab3cff Mon Sep 17 00:00:00 2001 From: samson0v Date: Fri, 13 Dec 2024 15:49:51 +0200 Subject: [PATCH 04/10] Added separated loggers for connector and connector, fixed adding file handler for loggers --- thingsboard_gateway/config/logs.json | 14 ++--- .../connectors/bacnet/bacnet_connector.py | 6 +- .../connectors/ble/ble_connector.py | 12 +++- .../connectors/can/can_connector.py | 11 +++- .../connectors/ftp/ftp_connector.py | 9 ++- .../connectors/modbus/modbus_connector.py | 7 ++- .../connectors/modbus/slave.py | 4 +- .../connectors/mqtt/mqtt_connector.py | 6 +- .../connectors/ocpp/ocpp_connector.py | 9 ++- .../connectors/odbc/odbc_connector.py | 9 ++- .../connectors/opcua/opcua_connector.py | 12 ++-- .../connectors/request/request_connector.py | 17 +++-- .../connectors/rest/rest_connector.py | 9 ++- .../connectors/snmp/snmp_connector.py | 9 ++- .../connectors/socket/socket_connector.py | 4 +- .../connectors/xmpp/xmpp_connector.py | 9 ++- thingsboard_gateway/tb_utility/tb_handler.py | 22 +------ thingsboard_gateway/tb_utility/tb_logger.py | 51 +++++++++------ .../tb_utility/tb_rotating_file_handler.py | 63 +++++++++++++++++++ 19 files changed, 200 insertions(+), 83 deletions(-) create mode 100644 thingsboard_gateway/tb_utility/tb_rotating_file_handler.py diff --git a/thingsboard_gateway/config/logs.json b/thingsboard_gateway/config/logs.json index 0f28055c..037b4523 100644 --- a/thingsboard_gateway/config/logs.json +++ b/thingsboard_gateway/config/logs.json @@ -16,14 +16,14 @@ "stream": "ext://sys.stdout" }, "databaseHandler": { - "class": "thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler", + "class": "thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler", "formatter": "LogFormatter", "filename": "./logs/database.log", "backupCount": 1, "encoding": "utf-8" }, "serviceHandler": { - "class": "thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler", + "class": "thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler", "formatter": "LogFormatter", "filename": "./logs/service.log", "backupCount": 7, @@ -32,7 +32,7 @@ "encoding": "utf-8" }, "connectorHandler": { - "class": "thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler", + "class": "thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler", "formatter": "LogFormatter", "filename": "./logs/connector.log", "backupCount": 7, @@ -41,7 +41,7 @@ "encoding": "utf-8" }, "converterHandler": { - "class": "thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler", + "class": "thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler", "formatter": "LogFormatter", "filename": "./logs/converter.log", "backupCount": 7, @@ -50,7 +50,7 @@ "encoding": "utf-8" }, "tb_connectionHandler": { - "class": "thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler", + "class": "thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler", "formatter": "LogFormatter", "filename": "./logs/tb_connection.log", "backupCount": 7, @@ -59,7 +59,7 @@ "encoding": "utf-8" }, "storageHandler": { - "class": "thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler", + "class": "thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler", "formatter": "LogFormatter", "filename": "./logs/storage.log", "backupCount": 7, @@ -68,7 +68,7 @@ "encoding": "utf-8" }, "extensionHandler": { - "class": "thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler", + "class": "thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler", "formatter": "LogFormatter", "filename": "./logs/extension.log", "backupCount": 7, diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py index e79f367d..c7f6a01b 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py +++ b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py @@ -58,10 +58,10 @@ def __init__(self, gateway, config, connector_type): self.__log = init_logger(self.__gateway, self.name, log_level, enable_remote_logging=remote_logging, - is_connector_logger=True, connector_name=self.name) - self.__converter_log = init_logger(self.__gateway, self.name, log_level, + is_connector_logger=True) + self.__converter_log = init_logger(self.__gateway, self.name + '_converter', log_level, enable_remote_logging=remote_logging, - is_converter_logger=True, connector_name=self.name) + is_converter_logger=True, attr_name=self.name) self.__log.info('Starting BACnet connector...') if BackwardCompatibilityAdapter.is_old_config(config): diff --git a/thingsboard_gateway/connectors/ble/ble_connector.py b/thingsboard_gateway/connectors/ble/ble_connector.py index 751aa9b0..1ad5fb77 100644 --- a/thingsboard_gateway/connectors/ble/ble_connector.py +++ b/thingsboard_gateway/connectors/ble/ble_connector.py @@ -48,8 +48,14 @@ def __init__(self, gateway, config, connector_type): self.__config = config self.__id = self.__config.get('id') self.name = self.__config.get("name", 'BLE Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) - self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + self.__log = init_logger(self.__gateway, self.name, + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True) + self.__converter_log = init_logger(self.__gateway, self.name + '_converter', + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_converter_logger=True, attr_name=self.name) self.daemon = True @@ -138,7 +144,7 @@ def __process_data(self): StatisticsService.count_connector_bytes(self.name, data, stat_parameter_name='connectorBytesReceived') try: - converter = converter(device_config, self.__log) + converter = converter(device_config, self.__converter_log) converted_data: ConvertedData = converter.convert(config, data) self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1 self.__log.debug(converted_data) diff --git a/thingsboard_gateway/connectors/can/can_connector.py b/thingsboard_gateway/connectors/can/can_connector.py index fc3b5447..b59f6f8a 100644 --- a/thingsboard_gateway/connectors/can/can_connector.py +++ b/thingsboard_gateway/connectors/can/can_connector.py @@ -76,7 +76,12 @@ def __init__(self, gateway, config, connector_type): self.__config = config self.__id = self.__config.get('id') self._log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True) + self._converter_log = init_logger(self.__gateway, self.name + '_converter', + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_converter_logger=True, attr_name=self.name) self.__bus_conf = {} self.__bus = None self.__reconnect_count = 0 @@ -591,11 +596,11 @@ def __get_converter(self, config, need_uplink): else: if need_uplink: uplink = config.get("uplink") - return BytesCanUplinkConverter(self._log) if uplink is None \ + return BytesCanUplinkConverter(self._converter_log) if uplink is None \ else TBModuleLoader.import_module(self._connector_type, uplink) else: downlink = config.get("downlink") - return BytesCanDownlinkConverter(self._log) if downlink is None \ + return BytesCanDownlinkConverter(self._converter_log) if downlink is None \ else TBModuleLoader.import_module(self._connector_type, downlink) def get_config(self): diff --git a/thingsboard_gateway/connectors/ftp/ftp_connector.py b/thingsboard_gateway/connectors/ftp/ftp_connector.py index df36ad69..cb8720c2 100644 --- a/thingsboard_gateway/connectors/ftp/ftp_connector.py +++ b/thingsboard_gateway/connectors/ftp/ftp_connector.py @@ -49,7 +49,12 @@ def __init__(self, gateway, config, connector_type): self.__tls_support = self.__config.get("TLSSupport", False) self.name = self.__config.get("name", "".join(choice(ascii_lowercase) for _ in range(5))) self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True) + self.__converter_log = init_logger(self.__gateway, self.name + '_converter', + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_converter_logger=True, attr_name=self.name) self.daemon = True self.__stopped = False self.__requests_in_progress = [] @@ -135,7 +140,7 @@ def __process_paths(self, ftp): time_point = timer() if time_point - path.last_polled_time >= path.poll_period or path.last_polled_time == 0: configuration = path.config - converter = FTPUplinkConverter(configuration, self.__log) + converter = FTPUplinkConverter(configuration, self.__converter_log) path.last_polled_time = time_point if '*' in path.path: diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index ed0898d7..a3dbe7a6 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -84,7 +84,12 @@ def __init__(self, gateway, config, connector_type): self.name = self.__config.get("name", 'Modbus Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) self.__log = init_logger(self.__gateway, config.get('name', self.name), config.get('logLevel', 'INFO'), - enable_remote_logging=config.get('enableRemoteLogging', False)) + enable_remote_logging=config.get('enableRemoteLogging', False), + is_connector_logger=True) + self.__converter_log = init_logger(self.__gateway, self.name + '_converter', + config.get('logLevel', 'INFO'), + enable_remote_logging=config.get('enableRemoteLogging', False), + is_converter_logger=True, attr_name=self.name) self.__log.info('Starting Modbus Connector...') self.__id = self.__config.get('id') self.__connected = False diff --git a/thingsboard_gateway/connectors/modbus/slave.py b/thingsboard_gateway/connectors/modbus/slave.py index 0f41d7d1..6f87ce52 100644 --- a/thingsboard_gateway/connectors/modbus/slave.py +++ b/thingsboard_gateway/connectors/modbus/slave.py @@ -130,9 +130,9 @@ def __load_converter(self, config, converter_type, converter_config: Union[Dict, converter = config[converter_type + CONVERTER_PARAMETER] else: if converter_type == DOWNLINK_PREFIX: - converter = BytesModbusDownlinkConverter(converter_config, self._log) + converter = BytesModbusDownlinkConverter(converter_config, self.connector.__converter_log) else: - converter = BytesModbusUplinkConverter(converter_config, self._log) + converter = BytesModbusUplinkConverter(converter_config, self.connector.__converter_log) return converter except Exception as e: diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 1084104a..1b930dc5 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -118,6 +118,10 @@ def __init__(self, gateway, config, connector_type): config.get('logLevel', 'INFO'), enable_remote_logging=config.get('enableRemoteLogging', False), is_connector_logger=True) + self.__converter_log = init_logger(self.__gateway, config['name'] + '_converter', + config.get('logLevel', 'INFO'), + enable_remote_logging=config.get('enableRemoteLogging', False), + is_converter_logger=True, attr_name=config['name']) # check if the configuration is in the old format using_old_config_format_detected = BackwardCompatibilityAdapter.is_old_config_format(config) @@ -454,7 +458,7 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params): if module: self.__log.debug('Converter %s for topic %s - found!', converter_class_name, mapping["topicFilter"]) - converter = module(mapping, self.__log) + converter = module(mapping, self.__converter_log) if sharing_id: self.__shared_custom_converters[sharing_id] = converter else: diff --git a/thingsboard_gateway/connectors/ocpp/ocpp_connector.py b/thingsboard_gateway/connectors/ocpp/ocpp_connector.py index 28484744..9a0f2ce4 100644 --- a/thingsboard_gateway/connectors/ocpp/ocpp_connector.py +++ b/thingsboard_gateway/connectors/ocpp/ocpp_connector.py @@ -69,7 +69,12 @@ def __init__(self, gateway, config, connector_type): self._gateway = gateway self.name = self._config.get("name", 'OCPP Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) self._log = init_logger(self._gateway, self.name, self._config.get('logLevel', 'INFO'), - enable_remote_logging=self._config.get('enableRemoteLogging', False)) + enable_remote_logging=self._config.get('enableRemoteLogging', False), + is_connector_logger=True) + self._converter_log = init_logger(self._gateway, self.name + '_converter', + self._config.get('logLevel', 'INFO'), + enable_remote_logging=self._config.get('enableRemoteLogging', False), + is_converter_logger=True, attr_name=self.name) self._default_converters = {'uplink': 'OcppUplinkConverter'} self._server = None @@ -184,7 +189,7 @@ async def on_connect(self, websocket, path): if is_valid: uplink_converter_name = cp_config.get('extension', self._default_converters['uplink']) cp = ChargePoint(charge_point_id, websocket, {**cp_config, 'uplink_converter_name': uplink_converter_name}, - OcppConnector._callback, self._log) + OcppConnector._callback, self._converter_log) cp.authorized = True self._log.info('Connected Charge Point with id: %s', charge_point_id) diff --git a/thingsboard_gateway/connectors/odbc/odbc_connector.py b/thingsboard_gateway/connectors/odbc/odbc_connector.py index b443ae98..85926af4 100644 --- a/thingsboard_gateway/connectors/odbc/odbc_connector.py +++ b/thingsboard_gateway/connectors/odbc/odbc_connector.py @@ -63,7 +63,12 @@ def __init__(self, gateway, config, connector_type): self.__config = config self.__id = self.__config.get('id') self._log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True) + self._converter_log = init_logger(self.__gateway, self.name + '_converter', + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_converter_logger=True, attr_name=self.name) self._connector_type = connector_type self.__stopped = False @@ -81,7 +86,7 @@ def __init__(self, gateway, config, connector_type): self.__attribute_columns = [] self.__timeseries_columns = [] - self.__converter = OdbcUplinkConverter(self._log) if not self.__config.get("converter", "") else \ + self.__converter = OdbcUplinkConverter(self._converter_log) if not self.__config.get("converter", "") else \ TBModuleLoader.import_module(self._connector_type, self.__config["converter"]) self.__configure_pyodbc() diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index b6f63d98..7135b393 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -83,7 +83,11 @@ def __init__(self, gateway: 'TBGatewayService', config, connector_type): self.__config.get('server', {}).get('mapping', [])))) self.__enable_remote_logging = self.__config.get('enableRemoteLogging', False) self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__enable_remote_logging) + enable_remote_logging=self.__enable_remote_logging, is_connector_logger=True) + self.__converter_log = init_logger(self.__gateway, self.name + '_converter', + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__enable_remote_logging, + is_converter_logger=True, attr_name=self.name) self.__replace_loggers() report_strategy = self.__config.get('reportStrategy') self.__connector_report_strategy_config = gateway.get_report_strategy_service().get_main_report_strategy() @@ -150,7 +154,7 @@ def __replace_loggers(self): 'ERROR', enable_remote_logging=self.__enable_remote_logging, is_connector_logger=True, - connector_name=self.name) + attr_name=self.name) def open(self): self.__stopped = False @@ -638,9 +642,9 @@ async def _create_new_devices(self): device_config = {**device_config, 'device_name': device_name, 'device_type': device_type} self.__device_nodes.append( Device(path=node, name=device_name, config=device_config, - converter=converter(device_config, self.__log), + converter=converter(device_config, self.__converter_log), converter_for_sub=converter(device_config, - self.__log) if self.__enable_subscriptions else None, + self.__converter_log) if self.__enable_subscriptions else None, logger=self.__log)) self.__log.info('Added device node: %s', device_name) self.__log.debug('Device nodes: %s', self.__device_nodes) diff --git a/thingsboard_gateway/connectors/request/request_connector.py b/thingsboard_gateway/connectors/request/request_connector.py index 85374969..0df80fc6 100644 --- a/thingsboard_gateway/connectors/request/request_connector.py +++ b/thingsboard_gateway/connectors/request/request_connector.py @@ -56,7 +56,12 @@ def __init__(self, gateway, config, connector_type): self.__gateway = gateway self.name = self.__config.get("name", "".join(choice(ascii_lowercase) for _ in range(5))) self._log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True) + self._converter_log = init_logger(self.__gateway, self.name + '_converter', + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_converter_logger=True, attr_name=self.name) self.__security = HTTPBasicAuth(self.__config["security"]["username"], self.__config["security"]["password"]) if \ self.__config["security"]["type"] == "basic" else None self.__host = None @@ -173,7 +178,7 @@ def __fill_requests(self): module = TBModuleLoader.import_module(self._connector_type, endpoint["converter"]["extension"]) if module is not None: self._log.debug('Custom converter for url %s - found!', endpoint["url"]) - converter = module(endpoint, self._log) + converter = module(endpoint, self._converter_log) else: self._log.error( "\n\nCannot find extension module for %s url.\nPlease check your configuration.\n", @@ -191,18 +196,18 @@ def __fill_attribute_updates(self): for attribute_request in self.__config.get("attributeUpdates", []): if attribute_request.get("converter") is not None: converter = TBModuleLoader.import_module("request", attribute_request["converter"])(attribute_request, - self._log) + self._converter_log) else: - converter = JsonRequestDownlinkConverter(attribute_request, self._log) + converter = JsonRequestDownlinkConverter(attribute_request, self._converter_log) attribute_request_dict = {**attribute_request, "converter": converter} self.__attribute_updates.append(attribute_request_dict) def __fill_rpc_requests(self): for rpc_request in self.__config.get("serverSideRpc", []): if rpc_request.get("converter") is not None: - converter = TBModuleLoader.import_module("request", rpc_request["converter"])(rpc_request, self._log) + converter = TBModuleLoader.import_module("request", rpc_request["converter"])(rpc_request, self._converter_log) else: - converter = JsonRequestDownlinkConverter(rpc_request, self._log) + converter = JsonRequestDownlinkConverter(rpc_request, self._converter_log) rpc_request_dict = {**rpc_request, "converter": converter} self.__rpc_requests.append(rpc_request_dict) diff --git a/thingsboard_gateway/connectors/rest/rest_connector.py b/thingsboard_gateway/connectors/rest/rest_connector.py index 8ad445f6..bd245d69 100644 --- a/thingsboard_gateway/connectors/rest/rest_connector.py +++ b/thingsboard_gateway/connectors/rest/rest_connector.py @@ -71,7 +71,12 @@ def __init__(self, gateway, config, connector_type): self.name = config.get("name", 'REST Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) self.__gateway = gateway self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True) + self.__converter_log = init_logger(self.__gateway, self.name + '_converter', + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True, attr_name=self.name) self._default_downlink_converter = TBModuleLoader.import_module(self._connector_type, self._default_converters['downlink']) self._default_uplink_converter = TBModuleLoader.import_module(self._connector_type, @@ -130,7 +135,7 @@ def load_handlers(self): for http_method in mapping['HTTPMethods']: handler = data_handlers[security_type](self.collect_statistic_and_send, self.get_name(), self.get_id(), self.endpoints[mapping["endpoint"]], - self.__log, provider=self.__event_provider) + self.__converter_log, provider=self.__event_provider) handlers.append(web.route(http_method, mapping['endpoint'], handler)) except Exception as e: self.__log.error("Error on creating handlers - %s", str(e)) diff --git a/thingsboard_gateway/connectors/snmp/snmp_connector.py b/thingsboard_gateway/connectors/snmp/snmp_connector.py index 7e43c679..571a3a21 100644 --- a/thingsboard_gateway/connectors/snmp/snmp_connector.py +++ b/thingsboard_gateway/connectors/snmp/snmp_connector.py @@ -58,7 +58,12 @@ def __init__(self, gateway, config, connector_type): self.__id = self.__config.get('id') self.name = config.get("name", 'SNMP Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) self._log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True) + self._converter_log = init_logger(self.__gateway, self.name + "_converter", + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True, attr_name=self.name) self.__devices = self.__config["devices"] self.statistics = {'MessagesReceived': 0, 'MessagesSent': 0} @@ -235,7 +240,7 @@ def __fill_converters(self): device["uplink_converter"] = TBModuleLoader.import_module("snmp", device.get('converter', self._default_converters[ "uplink"]))(device, - self._log) + self._converter_log) device["downlink_converter"] = TBModuleLoader.import_module("snmp", device.get('converter', self._default_converters[ "downlink"]))(device) diff --git a/thingsboard_gateway/connectors/socket/socket_connector.py b/thingsboard_gateway/connectors/socket/socket_connector.py index 532391ed..0b1db57c 100644 --- a/thingsboard_gateway/connectors/socket/socket_connector.py +++ b/thingsboard_gateway/connectors/socket/socket_connector.py @@ -56,10 +56,10 @@ def __init__(self, gateway, config, connector_type): log_level = self.__config.get('logLevel', 'INFO') self.__log = init_logger(self.__gateway, self.name, log_level, enable_remote_logging=remote_logging, - is_connector_logger=True, connector_name=self.name) + is_connector_logger=True) self.__converter_log = init_logger(self.__gateway, self.name, log_level, enable_remote_logging=remote_logging, - is_converter_logger=True, connector_name=self.name) + is_converter_logger=True, attr_name=self.name) if is_using_old_config: self.__log.warning("Old Socket connector configuration format detected. Automatic conversion is applied.") diff --git a/thingsboard_gateway/connectors/xmpp/xmpp_connector.py b/thingsboard_gateway/connectors/xmpp/xmpp_connector.py index 99002c95..4996a1ee 100644 --- a/thingsboard_gateway/connectors/xmpp/xmpp_connector.py +++ b/thingsboard_gateway/connectors/xmpp/xmpp_connector.py @@ -60,7 +60,12 @@ def __init__(self, gateway, config, connector_type): self._devices_config = config.get('devices', []) self.name = config.get("name", 'XMPP Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True) + self.__converter_log = init_logger(self.__gateway, self.name + '_converter', + self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False), + is_connector_logger=True, attr_name=self.name) self._devices = {} self._reformat_devices_config() @@ -94,7 +99,7 @@ def _reformat_devices_config(self): attribute_updates=config.get('attributeUpdates', []), server_side_rpc=config.get('serverSideRpc', []) ) - self._devices[device_jid].set_converter(converter(config, self.__log)) + self._devices[device_jid].set_converter(converter(config, self.__converter_log)) except KeyError as e: self.__log.error('Invalid configuration %s with key error %s', config, e) continue diff --git a/thingsboard_gateway/tb_utility/tb_handler.py b/thingsboard_gateway/tb_utility/tb_handler.py index c1344329..7ef2ab89 100644 --- a/thingsboard_gateway/tb_utility/tb_handler.py +++ b/thingsboard_gateway/tb_utility/tb_handler.py @@ -14,10 +14,7 @@ import logging import logging.handlers -import os import threading -from os import environ -from pathlib import Path from queue import Queue, Empty from sys import stdout from time import time, sleep @@ -124,8 +121,8 @@ def handle(self, record): if record.levelno < remote_logging_level: return # Remote logging level set higher than record level - if logger and hasattr(logger, 'connector_name'): - name = logger.connector_name + if logger and hasattr(logger, 'attr_name'): + name = logger.attr_name if name: record = self.formatter.format(record) @@ -165,18 +162,3 @@ def get_logger_level_id(log_level): if isinstance(log_level, str): return 100 if log_level == 'NONE' else logging.getLevelName(log_level) return log_level - - -class TimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler): - def __init__(self, filename, when='h', interval=1, backupCount=0, - encoding=None, delay=False, utc=False): - config_path = environ.get('TB_GW_LOGS_PATH') - if config_path: - filename = config_path + os.sep + filename.split(os.sep)[-1] - - if not Path(filename).exists(): - with open(filename, 'w'): - pass - - super().__init__(filename, when=when, interval=interval, backupCount=backupCount, - encoding=encoding, delay=delay, utc=utc) diff --git a/thingsboard_gateway/tb_utility/tb_logger.py b/thingsboard_gateway/tb_utility/tb_logger.py index 24a30f3b..53ed45a7 100644 --- a/thingsboard_gateway/tb_utility/tb_logger.py +++ b/thingsboard_gateway/tb_utility/tb_logger.py @@ -20,6 +20,7 @@ if TYPE_CHECKING: from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService +from thingsboard_gateway.tb_utility.tb_rotating_file_handler import TimedRotatingFileHandler from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService TRACE_LOGGING_LEVEL = 5 @@ -27,7 +28,7 @@ def init_logger(gateway: 'TBGatewayService', name, level, enable_remote_logging=False, is_connector_logger=False, - is_converter_logger=False, connector_name=None): + is_converter_logger=False, attr_name=None): """ For creating a Logger with all config automatically Create a Logger manually only if you know what you are doing! @@ -37,12 +38,30 @@ def init_logger(gateway: 'TBGatewayService', name, level, enable_remote_logging= log.is_connector_logger = is_connector_logger log.is_converter_logger = is_converter_logger - if connector_name: - log.connector_name = connector_name + if attr_name: + log.attr_name = attr_name + '_ERRORS_COUNT' if hasattr(gateway, 'main_handler') and gateway.main_handler not in log.handlers: log.addHandler(gateway.main_handler) + # Add file handler to the connector or converter logger + # First check if it is a main module logger (for example OPC-UA connector logger) + # If it is, add a file handler to the main module logger + # If it is not (for example asyncua logger), add the main module file handler to the logger + if TbLogger.is_main_module_logger(name, attr_name, is_converter_logger): + if is_connector_logger: + file_handler = TimedRotatingFileHandler.get_connector_file_handler(log.name + '_connector') + + if is_converter_logger: + file_handler = TimedRotatingFileHandler.get_converter_file_handler(log.name) + + if file_handler: + log.addHandler(file_handler) + else: + main_file_handler = TimedRotatingFileHandler.get_time_rotating_file_handler_by_logger_name(attr_name) + if main_file_handler: + log.addHandler(main_file_handler) + from thingsboard_gateway.tb_utility.tb_handler import TBRemoteLoggerHandler if not hasattr(gateway, 'remote_handler'): gateway.remote_handler = TBRemoteLoggerHandler(gateway) @@ -78,23 +97,22 @@ class TbLogger(logging.Logger): PREVIOUS_ERRORS_RESET_TIME = 0 def __init__(self, name, level=logging.NOTSET, is_connector_logger=False, is_converter_logger=False, - connector_name=None): + attr_name=None): super(TbLogger, self).__init__(name=name, level=level) self.propagate = True self.parent = self.root - self.__previous_number_of_errors = -1 self.__is_connector_logger = is_connector_logger self.__is_converter_logger = is_converter_logger self.__previous_reset_errors_time = TbLogger.PREVIOUS_ERRORS_RESET_TIME logging.Logger.trace = TbLogger.trace - if connector_name: - self.__connector_name = connector_name + self.errors = 0 + + if attr_name: + self.attr_name = attr_name + '_ERRORS_COUNT' else: - self.__connector_name = name + self.attr_name = self.name + '_ERRORS_COUNT' - self.errors = 0 - self.attr_name = self.__connector_name + '_ERRORS_COUNT' self._is_on_init_state = True @property @@ -113,15 +131,6 @@ def is_converter_logger(self): def is_converter_logger(self, value): self.__is_converter_logger = value - @property - def connector_name(self): - return self.__connector_name - - @connector_name.setter - def connector_name(self, value): - self.__connector_name = value - self.attr_name = self.__connector_name + '_ERRORS_COUNT' - def reset(self): with TbLogger.ERRORS_MUTEX: TbLogger.ALL_ERRORS_COUNT = max(0, TbLogger.ALL_ERRORS_COUNT - self.errors) @@ -223,5 +232,9 @@ def send_errors_if_needed(cls, gateway): cls.__PREVIOUS_BATCH_TO_SEND.pop(key, None) cls.__PREVIOUS_BATCH_TO_SEND.update(batch_to_send) + @staticmethod + def is_main_module_logger(name, attr_name, is_converter_logger): + return name == attr_name or attr_name is None or (name != attr_name and is_converter_logger) + logging.setLoggerClass(TbLogger) diff --git a/thingsboard_gateway/tb_utility/tb_rotating_file_handler.py b/thingsboard_gateway/tb_utility/tb_rotating_file_handler.py new file mode 100644 index 00000000..c5de7348 --- /dev/null +++ b/thingsboard_gateway/tb_utility/tb_rotating_file_handler.py @@ -0,0 +1,63 @@ +import os +import logging +from os import environ +from pathlib import Path + + +class TimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler): + def __init__(self, filename, when='h', interval=1, backupCount=0, + encoding=None, delay=False, utc=False): + config_path = environ.get('TB_GW_LOGS_PATH') + if config_path: + filename = config_path + os.sep + filename.split(os.sep)[-1] + + if not Path(filename).exists(): + with open(filename, 'w'): + pass + + super().__init__(filename, when=when, interval=interval, backupCount=backupCount, + encoding=encoding, delay=delay, utc=utc) + + @staticmethod + def get_connector_file_handler(file_name): + return TimedRotatingFileHandler.__get_file_handler(file_name, 'connector') + + @staticmethod + def get_converter_file_handler(file_name): + return TimedRotatingFileHandler.__get_file_handler(file_name, 'converter') + + @staticmethod + def get_time_rotating_file_handler_by_logger_name(logger_name): + file_handler_filter = list(filter(lambda x: isinstance(x, TimedRotatingFileHandler), + logging.getLogger(logger_name).handlers)) + if len(file_handler_filter): + return file_handler_filter[0] + + @staticmethod + def __get_file_handler(file_name, logger_name): + file_handler = TimedRotatingFileHandler.get_time_rotating_file_handler_by_logger_name(logger_name) + + file_name = file_name + '.log' + + if file_handler: + return TimedRotatingFileHandler.__create_file_handler_copy(file_handler, file_name) + else: + return TimedRotatingFileHandler.__create_default_file_handler(file_name) + + @staticmethod + def __create_file_handler_copy(handler, file_name): + file_name = f'{os.sep}'.join(handler.baseFilename.split(os.sep)[:-1]) + os.sep + file_name + handler_copy = TimedRotatingFileHandler(file_name, + when=handler.when, + backupCount=handler.backupCount, + interval=handler.interval, + encoding=handler.encoding, + delay=handler.delay, + utc=handler.utc) + handler_copy.setFormatter(handler.formatter) + return handler_copy + + @staticmethod + def __create_default_file_handler(file_name): + file_name = '.' + os.sep + file_name + '.log' + return TimedRotatingFileHandler(file_name) From 4604d8f77336e5d2824167488446130e5a5b43e8 Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 16 Dec 2024 10:33:17 +0200 Subject: [PATCH 05/10] Added file handlers updating method --- .../gateway/tb_gateway_service.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index a0c2efad..61fa4b24 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -59,6 +59,7 @@ from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader from thingsboard_gateway.tb_utility.tb_logger import TbLogger from thingsboard_gateway.tb_utility.tb_remote_shell import RemoteShell +from thingsboard_gateway.tb_utility.tb_rotating_file_handler import TimedRotatingFileHandler from thingsboard_gateway.tb_utility.tb_updater import TBUpdater from thingsboard_gateway.tb_utility.tb_utility import TBUtility @@ -2065,6 +2066,7 @@ def get_status(self): def update_loggers(self): self.__update_base_loggers() + self.__update_connectors_and_converters_loggers() global log log = logging.getLogger('service') @@ -2081,6 +2083,25 @@ def __update_base_loggers(self): logger.addHandler(self.remote_handler) + def __update_connectors_and_converters_loggers(self): + for logger in logging.Logger.manager.loggerDict.values(): + if hasattr(logger, 'is_connector_logger') or hasattr(logger, 'is_converter_logger'): + file_handler_filter = list(filter(lambda handler: isinstance(handler, TimedRotatingFileHandler), + logger.handlers)) + if len(file_handler_filter): + old_file_handler = file_handler_filter[0] + + new_file_handler = None + if logger.is_connector_logger: + new_file_handler = TimedRotatingFileHandler.get_connector_file_handler(old_file_handler.baseFilename.split('/')[-1]) # noqa + + if logger.is_converter_logger: + new_file_handler = TimedRotatingFileHandler.get_converter_file_handler(old_file_handler.baseFilename.split('/')[-1]) # noqa + + if new_file_handler: + logger.addHandler(new_file_handler) + logger.removeHandler(old_file_handler) + def is_latency_metrics_enabled(self): return self.__latency_debug_mode From 0f169693933bb48a93cd97f6ec4b5d91e1e6e7fc Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 16 Dec 2024 10:47:28 +0200 Subject: [PATCH 06/10] Moved file handler updating to TbLogger class, added file handlers class name checking --- .../gateway/tb_gateway_service.py | 22 +------------------ .../tb_gateway_remote_configurator.py | 7 ++++++ thingsboard_gateway/tb_utility/tb_logger.py | 20 +++++++++++++++++ 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 61fa4b24..ad62a513 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -59,7 +59,6 @@ from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader from thingsboard_gateway.tb_utility.tb_logger import TbLogger from thingsboard_gateway.tb_utility.tb_remote_shell import RemoteShell -from thingsboard_gateway.tb_utility.tb_rotating_file_handler import TimedRotatingFileHandler from thingsboard_gateway.tb_utility.tb_updater import TBUpdater from thingsboard_gateway.tb_utility.tb_utility import TBUtility @@ -2066,7 +2065,7 @@ def get_status(self): def update_loggers(self): self.__update_base_loggers() - self.__update_connectors_and_converters_loggers() + TbLogger.update_file_handlers() global log log = logging.getLogger('service') @@ -2083,25 +2082,6 @@ def __update_base_loggers(self): logger.addHandler(self.remote_handler) - def __update_connectors_and_converters_loggers(self): - for logger in logging.Logger.manager.loggerDict.values(): - if hasattr(logger, 'is_connector_logger') or hasattr(logger, 'is_converter_logger'): - file_handler_filter = list(filter(lambda handler: isinstance(handler, TimedRotatingFileHandler), - logger.handlers)) - if len(file_handler_filter): - old_file_handler = file_handler_filter[0] - - new_file_handler = None - if logger.is_connector_logger: - new_file_handler = TimedRotatingFileHandler.get_connector_file_handler(old_file_handler.baseFilename.split('/')[-1]) # noqa - - if logger.is_converter_logger: - new_file_handler = TimedRotatingFileHandler.get_converter_file_handler(old_file_handler.baseFilename.split('/')[-1]) # noqa - - if new_file_handler: - logger.addHandler(new_file_handler) - logger.removeHandler(old_file_handler) - def is_latency_metrics_enabled(self): return self.__latency_debug_mode diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index 219faf50..49ea448d 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -421,6 +421,7 @@ def _handle_grpc_configuration_update(self, config): def _handle_logs_configuration_update(self, config): self.__log.debug('Processing logs configuration update...') try: + self.__check_file_handlers_class_name(config) self.__log = getLogger('service') logs_conf_file_path = self._gateway.get_config_path() + 'logs.json' target_handlers = {} @@ -994,3 +995,9 @@ def __is_username_password_match(old_security, new_security): def __is_ca_cert_match(old_security, new_security): return new_security.get('accessToken') == old_security.get('accessToken') and \ new_security.get('caCert') == old_security.get('caCert') + + @staticmethod + def __check_file_handlers_class_name(config): + for handler_config in config.get('handlers', {}).values(): + if handler_config.get('class', '') == 'thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler': + handler_config['class'] = 'thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler' diff --git a/thingsboard_gateway/tb_utility/tb_logger.py b/thingsboard_gateway/tb_utility/tb_logger.py index 53ed45a7..07a0ffe3 100644 --- a/thingsboard_gateway/tb_utility/tb_logger.py +++ b/thingsboard_gateway/tb_utility/tb_logger.py @@ -236,5 +236,25 @@ def send_errors_if_needed(cls, gateway): def is_main_module_logger(name, attr_name, is_converter_logger): return name == attr_name or attr_name is None or (name != attr_name and is_converter_logger) + @staticmethod + def update_file_handlers(): + for logger in logging.Logger.manager.loggerDict.values(): + if hasattr(logger, 'is_connector_logger') or hasattr(logger, 'is_converter_logger'): + file_handler_filter = list(filter(lambda handler: isinstance(handler, TimedRotatingFileHandler), + logger.handlers)) + if len(file_handler_filter): + old_file_handler = file_handler_filter[0] + + new_file_handler = None + if logger.is_connector_logger: + new_file_handler = TimedRotatingFileHandler.get_connector_file_handler(old_file_handler.baseFilename.split('/')[-1]) # noqa + + if logger.is_converter_logger: + new_file_handler = TimedRotatingFileHandler.get_converter_file_handler(old_file_handler.baseFilename.split('/')[-1]) # noqa + + if new_file_handler: + logger.addHandler(new_file_handler) + logger.removeHandler(old_file_handler) + logging.setLoggerClass(TbLogger) From eeec07683fed5cb11dc76773fee6dbbd2dd10429 Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 16 Dec 2024 11:03:13 +0200 Subject: [PATCH 07/10] Moved check_and_update_file_handlers_class_name method to TbLogger class --- thingsboard_gateway/gateway/tb_gateway_service.py | 2 ++ .../tb_utility/tb_gateway_remote_configurator.py | 9 ++------- thingsboard_gateway/tb_utility/tb_logger.py | 6 ++++++ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index ad62a513..ed6ee2e0 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -166,6 +166,8 @@ def __init__(self, config_file=None): try: with open(self._config_dir + 'logs.json', 'r') as file: log_config = load(file) + + TbLogger.check_and_update_file_handlers_class_name(log_config) logging.config.dictConfig(log_config) except Exception as e: logging_error = e diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index 49ea448d..df78051f 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -35,6 +35,7 @@ if TYPE_CHECKING: from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService +from thingsboard_gateway.tb_utility.tb_logger import TbLogger from thingsboard_gateway.tb_utility.tb_utility import TBUtility @@ -421,7 +422,7 @@ def _handle_grpc_configuration_update(self, config): def _handle_logs_configuration_update(self, config): self.__log.debug('Processing logs configuration update...') try: - self.__check_file_handlers_class_name(config) + TbLogger.check_and_update_file_handlers_class_name(config) self.__log = getLogger('service') logs_conf_file_path = self._gateway.get_config_path() + 'logs.json' target_handlers = {} @@ -995,9 +996,3 @@ def __is_username_password_match(old_security, new_security): def __is_ca_cert_match(old_security, new_security): return new_security.get('accessToken') == old_security.get('accessToken') and \ new_security.get('caCert') == old_security.get('caCert') - - @staticmethod - def __check_file_handlers_class_name(config): - for handler_config in config.get('handlers', {}).values(): - if handler_config.get('class', '') == 'thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler': - handler_config['class'] = 'thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler' diff --git a/thingsboard_gateway/tb_utility/tb_logger.py b/thingsboard_gateway/tb_utility/tb_logger.py index 07a0ffe3..2b8f430e 100644 --- a/thingsboard_gateway/tb_utility/tb_logger.py +++ b/thingsboard_gateway/tb_utility/tb_logger.py @@ -256,5 +256,11 @@ def update_file_handlers(): logger.addHandler(new_file_handler) logger.removeHandler(old_file_handler) + @staticmethod + def check_and_update_file_handlers_class_name(config): + for handler_config in config.get('handlers', {}).values(): + if handler_config.get('class', '') == 'thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler': + handler_config['class'] = 'thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler' + logging.setLoggerClass(TbLogger) From dbc4f99beb1b6795273993cafc434e5efb5e54dc Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 16 Dec 2024 11:38:35 +0200 Subject: [PATCH 08/10] Fixed modbus connector --- thingsboard_gateway/connectors/modbus/modbus_connector.py | 8 ++++---- thingsboard_gateway/connectors/modbus/slave.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index a3dbe7a6..6fc513f8 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -86,10 +86,10 @@ def __init__(self, gateway, config, connector_type): config.get('logLevel', 'INFO'), enable_remote_logging=config.get('enableRemoteLogging', False), is_connector_logger=True) - self.__converter_log = init_logger(self.__gateway, self.name + '_converter', - config.get('logLevel', 'INFO'), - enable_remote_logging=config.get('enableRemoteLogging', False), - is_converter_logger=True, attr_name=self.name) + self.converter_log = init_logger(self.__gateway, self.name + '_converter', + config.get('logLevel', 'INFO'), + enable_remote_logging=config.get('enableRemoteLogging', False), + is_converter_logger=True, attr_name=self.name) self.__log.info('Starting Modbus Connector...') self.__id = self.__config.get('id') self.__connected = False diff --git a/thingsboard_gateway/connectors/modbus/slave.py b/thingsboard_gateway/connectors/modbus/slave.py index 6f87ce52..d7ab231a 100644 --- a/thingsboard_gateway/connectors/modbus/slave.py +++ b/thingsboard_gateway/connectors/modbus/slave.py @@ -130,9 +130,9 @@ def __load_converter(self, config, converter_type, converter_config: Union[Dict, converter = config[converter_type + CONVERTER_PARAMETER] else: if converter_type == DOWNLINK_PREFIX: - converter = BytesModbusDownlinkConverter(converter_config, self.connector.__converter_log) + converter = BytesModbusDownlinkConverter(converter_config, self.connector.converter_log) else: - converter = BytesModbusUplinkConverter(converter_config, self.connector.__converter_log) + converter = BytesModbusUplinkConverter(converter_config, self.connector.converter_log) return converter except Exception as e: From dddd74c07e3be3c57180b1aabcdc28157e0d9c54 Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 16 Dec 2024 12:36:08 +0200 Subject: [PATCH 09/10] Fixed integration tests --- tests/integration/connectors/can/test_can_connector.py | 2 +- thingsboard_gateway/tb_utility/tb_rotating_file_handler.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/connectors/can/test_can_connector.py b/tests/integration/connectors/can/test_can_connector.py index 3e728cba..3e5242f0 100644 --- a/tests/integration/connectors/can/test_can_connector.py +++ b/tests/integration/connectors/can/test_can_connector.py @@ -32,7 +32,7 @@ from thingsboard_gateway.tb_utility.tb_handler import TBRemoteLoggerHandler from thingsboard_gateway.tb_utility.tb_logger import TbLogger, init_logger -try : +try: from can import Notifier, BufferedReader, Bus, Message except (ImportError, ModuleNotFoundError): from thingsboard_gateway.tb_utility.tb_utility import TBUtility diff --git a/thingsboard_gateway/tb_utility/tb_rotating_file_handler.py b/thingsboard_gateway/tb_utility/tb_rotating_file_handler.py index c5de7348..fa794956 100644 --- a/thingsboard_gateway/tb_utility/tb_rotating_file_handler.py +++ b/thingsboard_gateway/tb_utility/tb_rotating_file_handler.py @@ -1,10 +1,11 @@ import os import logging +from logging.handlers import TimedRotatingFileHandler from os import environ from pathlib import Path -class TimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler): +class TimedRotatingFileHandler(TimedRotatingFileHandler): def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False): config_path = environ.get('TB_GW_LOGS_PATH') From d811346d21d54342e99f429c7f0405d80128e97d Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 17 Dec 2024 08:22:43 +0200 Subject: [PATCH 10/10] Reformatting --- thingsboard_gateway/gateway/constants.py | 8 +-- .../gateway/entities/attributes.py | 2 +- .../gateway/entities/converted_data.py | 25 +++++---- .../gateway/entities/datapoint_key.py | 1 - .../entities/report_strategy_config.py | 8 +-- .../report_strategy_data_cache.py | 21 +++++--- .../report_strategy_service.py | 52 ++++++++++++------- .../gateway/statistics/statistics_service.py | 12 +++-- thingsboard_gateway/gateway/tb_client.py | 33 ++++++++---- thingsboard_gateway/tb_gateway.py | 2 + .../tb_gateway_remote_configurator.py | 3 +- thingsboard_gateway/tb_utility/tb_loader.py | 3 +- thingsboard_gateway/tb_utility/tb_logger.py | 2 +- .../tb_utility/tb_remote_shell.py | 7 +-- thingsboard_gateway/tb_utility/tb_utility.py | 12 +++-- thingsboard_gateway/version.py | 2 +- 16 files changed, 121 insertions(+), 72 deletions(-) diff --git a/thingsboard_gateway/gateway/constants.py b/thingsboard_gateway/gateway/constants.py index 4f2ae190..1e52f192 100644 --- a/thingsboard_gateway/gateway/constants.py +++ b/thingsboard_gateway/gateway/constants.py @@ -76,8 +76,7 @@ "metadata": { RECEIVED_TS_PARAMETER: int(time() * 1000), CONNECTOR_PARAMETER: "", - "publishedTs": int(time() * 1000)}} - , separators=(',', ':'), skipkeys=True).encode("utf-8")) + "publishedTs": int(time() * 1000)}}, separators=(',', ':'), skipkeys=True).encode("utf-8")) # Report strategy parameters REPORT_STRATEGY_PARAMETER = "reportStrategy" @@ -85,6 +84,7 @@ TYPE_PARAMETER = "type" AGGREGATION_FUNCTION_PARAMETER = "aggregationFunction" + class ReportStrategy(Enum): ON_REPORT_PERIOD = "ON_REPORT_PERIOD" ON_CHANGE = "ON_CHANGE" @@ -98,6 +98,7 @@ def from_string(cls, value: str): return strategy raise ValueError("Invalid report strategy value: %r" % value) + DEFAULT_REPORT_STRATEGY_CONFIG = { TYPE_PARAMETER: ReportStrategy.ON_RECEIVED.value, REPORT_PERIOD_PARAMETER: 10000 @@ -111,4 +112,5 @@ def from_string(cls, value: str): DATA_PARAMETER = "data" # Attribute constants -ATTRIBUTES_FOR_REQUEST = ["RemoteLoggingLevel", "general_configuration", "storage_configuration", "grpc_configuration", "logs_configuration", "active_connectors"] +ATTRIBUTES_FOR_REQUEST = ["RemoteLoggingLevel", "general_configuration", "storage_configuration", + "grpc_configuration", "logs_configuration", "active_connectors"] diff --git a/thingsboard_gateway/gateway/entities/attributes.py b/thingsboard_gateway/gateway/entities/attributes.py index 73930902..4c59f056 100644 --- a/thingsboard_gateway/gateway/entities/attributes.py +++ b/thingsboard_gateway/gateway/entities/attributes.py @@ -18,7 +18,7 @@ class Attributes: - def __init__(self, values: Dict[DatapointKey, Any]=None): + def __init__(self, values: Dict[DatapointKey, Any] = None): self.values: Dict[DatapointKey, Any] = values or {} def __str__(self): diff --git a/thingsboard_gateway/gateway/entities/converted_data.py b/thingsboard_gateway/gateway/entities/converted_data.py index 6420c4da..f5b489fc 100644 --- a/thingsboard_gateway/gateway/entities/converted_data.py +++ b/thingsboard_gateway/gateway/entities/converted_data.py @@ -43,7 +43,7 @@ def split_large_entries(entries: dict, first_item_max_data_size: int, max_data_s split_chunk_sizes.append(current_size) ts_check = False # Start a new chunk - current_chunk = {original_key: value} # New dict is created to avoid modifying the original dict + current_chunk = {original_key: value} # New dict is created to avoid modifying the original dict current_size = entry_size else: # Add to current chunk @@ -59,7 +59,9 @@ def split_large_entries(entries: dict, first_item_max_data_size: int, max_data_s class ConvertedData: - __slots__ = ['device_name', 'device_type', 'telemetry', 'attributes', 'metadata', '_telemetry_datapoints_count', 'ts_index'] + __slots__ = ['device_name', 'device_type', 'telemetry', 'attributes', + 'metadata', '_telemetry_datapoints_count', 'ts_index'] + def __init__(self, device_name, device_type='default', metadata=None): self.device_name = device_name self.device_type = device_type @@ -118,7 +120,7 @@ def _add_single_telemetry_entry(self, telemetry_entry: Union[dict, TelemetryEntr existing_values = self.telemetry[index].values new_keys = telemetry_entry.values.keys() - existing_values.keys() - new_values = {key_and_report_strategy: telemetry_entry.values[key_and_report_strategy] for key_and_report_strategy in new_keys} + new_values = {key_and_rs: telemetry_entry.values[key_and_rs] for key_and_rs in new_keys} self._telemetry_datapoints_count += len(new_values) existing_values.update(new_values) @@ -127,7 +129,7 @@ def _add_single_telemetry_entry(self, telemetry_entry: Union[dict, TelemetryEntr self._telemetry_datapoints_count += len(telemetry_entry.values) self.ts_index[telemetry_entry.ts] = len(self.telemetry) - 1 - def add_to_attributes(self, key_or_entry: Union[dict, str, List[dict], DatapointKey], value = None): + def add_to_attributes(self, key_or_entry: Union[dict, str, List[dict], DatapointKey], value=None): if isinstance(key_or_entry, list): for entry in key_or_entry: if not isinstance(entry, dict): @@ -182,8 +184,8 @@ def convert_to_objects_with_maximal_size(self, max_data_size) -> List['Converted current_data_size += attributes_bytes_size else: split_attributes_and_sizes = split_large_entries(attributes_dict, - max_data_size - current_data_size, - available_data_size) + max_data_size - current_data_size, + available_data_size) for data_chunk, chunk_size in split_attributes_and_sizes: if current_data_size + chunk_size >= max_data_size: converted_objects.append(current_data) @@ -196,17 +198,18 @@ def convert_to_objects_with_maximal_size(self, max_data_size) -> List['Converted telemetry_values = telemetry_entry.values ts_data_size = TBUtility.get_data_size({"ts": telemetry_entry.ts}) + 1 - telemetry_obj_size = TBUtility.get_data_size({datapoint_key.key: value for datapoint_key, value in telemetry_values.items()}) + ts_data_size + telemetry_obj_size = TBUtility.get_data_size( + {datapoint_key.key: value for datapoint_key, value in telemetry_values.items()}) + ts_data_size if telemetry_obj_size <= max_data_size - current_data_size: current_data.add_to_telemetry(telemetry_entry) current_data_size += telemetry_obj_size else: split_telemetry_and_sizes = split_large_entries(telemetry_values, - max_data_size - current_data_size, - available_data_size, - telemetry_entry.ts, - ts_data_size) + max_data_size - current_data_size, + available_data_size, + telemetry_entry.ts, + ts_data_size) for telemetry_chunk, chunk_size in split_telemetry_and_sizes: if current_data_size + chunk_size > max_data_size: diff --git a/thingsboard_gateway/gateway/entities/datapoint_key.py b/thingsboard_gateway/gateway/entities/datapoint_key.py index cbf016aa..67f1c5f4 100644 --- a/thingsboard_gateway/gateway/entities/datapoint_key.py +++ b/thingsboard_gateway/gateway/entities/datapoint_key.py @@ -29,7 +29,6 @@ def __str__(self): def __repr__(self): return self.__str__() - def __hash__(self): return hash((self.key, self.report_strategy)) diff --git a/thingsboard_gateway/gateway/entities/report_strategy_config.py b/thingsboard_gateway/gateway/entities/report_strategy_config.py index 236c0dc9..b30db26e 100644 --- a/thingsboard_gateway/gateway/entities/report_strategy_config.py +++ b/thingsboard_gateway/gateway/entities/report_strategy_config.py @@ -35,7 +35,7 @@ def from_string(cls, value: str): class ReportStrategyConfig: - def __init__(self, config, default_report_strategy_config = {}): + def __init__(self, config, default_report_strategy_config={}): if isinstance(config, ReportStrategyConfig): self.report_period = config.report_period self.report_strategy = config.report_strategy @@ -49,7 +49,8 @@ def __init__(self, config, default_report_strategy_config = {}): and not default_report_strategy_config): raise ValueError("Report strategy config is not specified") if default_report_strategy_config: - self.report_period = max(config.get(REPORT_PERIOD_PARAMETER, default_report_strategy_config[REPORT_PERIOD_PARAMETER]) - 10, 1) + self.report_period = max(config.get(REPORT_PERIOD_PARAMETER, + default_report_strategy_config[REPORT_PERIOD_PARAMETER]) - 10, 1) report_strategy_type = config.get(TYPE_PARAMETER, default_report_strategy_config[TYPE_PARAMETER]) else: self.report_period = max(config.get(REPORT_PERIOD_PARAMETER, 0), 1) @@ -82,4 +83,5 @@ def __eq__(self, other): and self.aggregation_function == other.aggregation_function) def __str__(self): - return f"ReportStrategyConfig(report_period={self.report_period}, report_strategy={self.report_strategy}, aggregation_function={self.aggregation_function})" \ No newline at end of file + return f"ReportStrategyConfig(report_period={self.report_period}, report_strategy={self.report_strategy},\ + aggregation_function={self.aggregation_function})" diff --git a/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py b/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py index 6f07a731..23615522 100644 --- a/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py +++ b/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from time import monotonic from thingsboard_gateway.gateway.constants import ReportStrategy from thingsboard_gateway.gateway.entities.datapoint_key import DatapointKey @@ -20,8 +19,9 @@ class ReportStrategyDataRecord: - __slots__ = ["_value", "_device_name", "_device_type","_connector_name", - "_connector_id", "_report_strategy", "_last_report_time", "_is_telemetry","_ts"] + __slots__ = ["_value", "_device_name", "_device_type", "_connector_name", + "_connector_id", "_report_strategy", "_last_report_time", "_is_telemetry", "_ts"] + def __init__(self, value, device_name, device_type, connector_name, connector_id, report_strategy, is_telemetry): self._value = value self._device_name = device_name @@ -60,7 +60,8 @@ def update_ts(self, ts): self._ts = ts def should_be_reported_by_period(self, current_time): - if self._report_strategy.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD): + if self._report_strategy.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, + ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD): return (self._last_report_time is None or current_time - self._last_report_time + 50 >= self._report_strategy.report_period) else: @@ -75,8 +76,15 @@ def __init__(self, config): self._config = config self._data_cache = {} - def put(self, datapoint_key: DatapointKey, data: str, device_name, device_type, connector_name, connector_id, report_strategy, is_telemetry): - self._data_cache[(datapoint_key, device_name, connector_id)] = ReportStrategyDataRecord(data, device_name, device_type, connector_name, connector_id, report_strategy, is_telemetry) + def put(self, datapoint_key: DatapointKey, data: str, device_name, + device_type, connector_name, connector_id, report_strategy, is_telemetry): + self._data_cache[(datapoint_key, device_name, connector_id)] = ReportStrategyDataRecord(data, + device_name, + device_type, + connector_name, + connector_id, + report_strategy, + is_telemetry) def get(self, datapoint_key: DatapointKey, device_name, connector_id) -> ReportStrategyDataRecord: return self._data_cache.get((datapoint_key, device_name, connector_id)) @@ -97,4 +105,3 @@ def delete_all_records_for_connector_by_connector_id(self, connector_id): def clear(self): self._data_cache.clear() - diff --git a/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py b/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py index a749a8cc..dab8c501 100644 --- a/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py +++ b/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py @@ -15,16 +15,18 @@ from queue import SimpleQueue from threading import Thread from time import monotonic, time -from typing import Dict, Set, Union +from typing import Dict, Set, Union, TYPE_CHECKING -from thingsboard_gateway.gateway.constants import DEFAULT_REPORT_STRATEGY_CONFIG, ReportStrategy, DEVICE_NAME_PARAMETER, \ - DEVICE_TYPE_PARAMETER, REPORT_STRATEGY_PARAMETER +from thingsboard_gateway.gateway.constants import DEFAULT_REPORT_STRATEGY_CONFIG, \ + ReportStrategy, DEVICE_NAME_PARAMETER, DEVICE_TYPE_PARAMETER, REPORT_STRATEGY_PARAMETER from thingsboard_gateway.gateway.entities.converted_data import ConvertedData from thingsboard_gateway.gateway.entities.datapoint_key import DatapointKey from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry from thingsboard_gateway.gateway.report_strategy.report_strategy_data_cache import ReportStrategyDataCache from thingsboard_gateway.tb_utility.tb_logger import TbLogger +if TYPE_CHECKING: + from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService class ReportStrategyService: @@ -37,14 +39,18 @@ def __init__(self, config: dict, gateway: 'TBGatewayService', send_data_queue: S self._report_strategy_data_cache = ReportStrategyDataCache(config) self._connectors_report_strategies: Dict[str, ReportStrategyConfig] = {} self.__keys_to_report_periodically: Set[DatapointKey, str, str] = set() - self.__periodical_reporting_thread = Thread(target=self.__periodical_reporting, daemon=True, name="Periodical Reporting Thread") + self.__periodical_reporting_thread = Thread(target=self.__periodical_reporting, + daemon=True, + name="Periodical Reporting Thread") self.__periodical_reporting_thread.start() def get_main_report_strategy(self): return self.main_report_strategy - def register_connector_report_strategy(self, connector_name: str, connector_id: str, report_strategy_config: ReportStrategyConfig): - if report_strategy_config is None: # Default report strategy will be used if no report strategy is provided + def register_connector_report_strategy(self, connector_name: str, + connector_id: str, + report_strategy_config: ReportStrategyConfig): + if report_strategy_config is None: # Default report strategy will be used if no report strategy is provided return self._connectors_report_strategies[connector_id] = report_strategy_config self._connectors_report_strategies[connector_name] = report_strategy_config @@ -53,8 +59,8 @@ def filter_data_and_send(self, data: Union[ConvertedData, dict], connector_name: data_to_send = data if isinstance(data, dict): data_to_send = ConvertedData(device_name=data.get(DEVICE_NAME_PARAMETER), - device_type=data.get(DEVICE_TYPE_PARAMETER, "default"), - metadata=data.get("metadata")) + device_type=data.get(DEVICE_TYPE_PARAMETER, "default"), + metadata=data.get("metadata")) for ts_kv in data.get("telemetry", []): data_to_send.add_to_telemetry(ts_kv) data_to_send.add_to_attributes(data.get("attributes", {})) @@ -78,7 +84,7 @@ def filter_data_and_send(self, data: Union[ConvertedData, dict], connector_name: report_strategy = report_strategy_config if isinstance(datapoint_key, str): - datapoint_key = DatapointKey(datapoint_key) # TODO: remove this DatapointKey creation after refactoring, added to avoid errors with old string keys + datapoint_key = DatapointKey(datapoint_key) # TODO: remove this DatapointKey creation after refactoring, added to avoid errors with old string keys # noqa if datapoint_key.report_strategy is not None: report_strategy = datapoint_key.report_strategy @@ -101,7 +107,7 @@ def filter_data_and_send(self, data: Union[ConvertedData, dict], connector_name: report_strategy = report_strategy_config if isinstance(datapoint_key, str): - datapoint_key = DatapointKey(datapoint_key) # TODO: remove this DatapointKey creation after refactoring, added to avoid errors with old string keys + datapoint_key = DatapointKey(datapoint_key) # TODO: remove this DatapointKey creation after refactoring, added to avoid errors with old string keys # noqa if datapoint_key.report_strategy is not None: report_strategy = datapoint_key.report_strategy @@ -119,7 +125,9 @@ def filter_data_and_send(self, data: Union[ConvertedData, dict], connector_name: if converted_data_to_send.telemetry or converted_data_to_send.attributes: self.__send_data_queue.put_nowait((connector_name, connector_id, converted_data_to_send)) - def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_name, device_type, connector_name, connector_id, report_strategy_config: ReportStrategyConfig, is_telemetry: bool): + def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_name, device_type, + connector_name, connector_id, report_strategy_config: ReportStrategyConfig, + is_telemetry: bool): if report_strategy_config is None: report_strategy_config = self.main_report_strategy.report_strategy if datapoint_key.report_strategy is not None: @@ -139,7 +147,8 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) else: - self._report_strategy_data_cache.put(datapoint_key, data, device_name, device_type, connector_name, connector_id, report_strategy_config, is_telemetry) + self._report_strategy_data_cache.put(datapoint_key, data, device_name, device_type, + connector_name, connector_id, report_strategy_config, is_telemetry) if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) return True @@ -158,19 +167,22 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n return False elif report_strategy_config.report_strategy == ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD: self._report_strategy_data_cache.update_key_value(datapoint_key, device_name, connector_id, data) - # self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time) + # self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time) # noqa if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) return True else: return False else: - self._report_strategy_data_cache.put(datapoint_key, data, device_name, device_type, connector_name, connector_id, report_strategy_config, is_telemetry) - if report_strategy_config.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD): + self._report_strategy_data_cache.put(datapoint_key, data, device_name, device_type, + connector_name, connector_id, report_strategy_config, is_telemetry) + if report_strategy_config.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, + ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD): if isinstance(datapoint_key, tuple): datapoint_key, _ = datapoint_key self.__keys_to_report_periodically.add((datapoint_key, device_name, connector_id)) - self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time) + self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, + connector_id, current_time) if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) return True @@ -211,7 +223,7 @@ def __periodical_reporting(self): data_entry = data_to_report[data_report_key] if report_strategy_data_record.is_telemetry(): - # data_entry.add_to_telemetry(TelemetryEntry({key: value}, report_strategy_data_record.get_ts())) # Can be used to keep first ts, instead of overwriting it with current ts + # data_entry.add_to_telemetry(TelemetryEntry({key: value}, report_strategy_data_record.get_ts())) # Can be used to keep first ts, instead of overwriting it with current ts # noqa current_ts = int(time() * 1000) data_entry.add_to_telemetry(TelemetryEntry({key: value}, current_ts)) report_strategy_data_record.update_ts(current_ts) @@ -231,7 +243,8 @@ def __periodical_reporting(self): check_report_strategy_end = int(time() * 1000) if check_report_strategy_end - check_report_strategy_start > 100: - self._logger.warning("The periodical reporting took too long: %d ms", check_report_strategy_end - check_report_strategy_start) + self._logger.warning("The periodical reporting took too long: %d ms", + check_report_strategy_end - check_report_strategy_start) self._logger.warning("The number of keys to report periodically: %d", len(keys_set)) self._logger.warning("The number of reported data: %d", reported_data_length) @@ -243,7 +256,8 @@ def __periodical_reporting(self): self._logger.exception("An error occurred in the Periodical Reporting Thread: %s", e) if occurred_errors > 1: self._logger.error( - "Too many errors occurred in the Periodical Reporting Thread for 5 seconds. Suppressing further error messages." + "Too many errors occurred in the Periodical Reporting Thread for 5 seconds.\ + Suppressing further error messages." ) previous_error_printed_time = current_monotonic occurred_errors = 0 diff --git a/thingsboard_gateway/gateway/statistics/statistics_service.py b/thingsboard_gateway/gateway/statistics/statistics_service.py index 202e22df..8896bb92 100644 --- a/thingsboard_gateway/gateway/statistics/statistics_service.py +++ b/thingsboard_gateway/gateway/statistics/statistics_service.py @@ -111,7 +111,6 @@ def clear_statistics(cls): cls.STATISTICS_STORAGE[key] = 0 cls.CONNECTOR_STATISTICS_STORAGE = {} - @staticmethod def count_connector_message(connector_name, stat_parameter_name, count=1): StatisticsService.add_count(connector_name, stat_parameter_name=stat_parameter_name, @@ -227,7 +226,8 @@ def run(self) -> None: cur_monotonic = int(monotonic()) next_service_poll = (self._last_service_poll + self._stats_send_period_in_seconds) - cur_monotonic - next_custom_command_poll = (self._last_custom_command_poll + self._custom_stats_send_period_in_seconds) - cur_monotonic + next_custom_command_poll = (self._last_custom_command_poll + + self._custom_stats_send_period_in_seconds) - cur_monotonic wait_time = max(0, min(next_service_poll, next_custom_command_poll)) @@ -235,15 +235,17 @@ def run(self) -> None: Event().wait(wait_time) cur_monotonic = int(monotonic()) - if cur_monotonic - self._last_service_poll >= self._stats_send_period_in_seconds or self._last_service_poll == 0: + if (cur_monotonic - self._last_service_poll >= self._stats_send_period_in_seconds or + self._last_service_poll == 0): self._last_service_poll = cur_monotonic self.__send_statistics() self.clear_statistics() - if cur_monotonic - self._last_custom_command_poll >= self._custom_stats_send_period_in_seconds or self._last_custom_command_poll == 0: + if (cur_monotonic - self._last_custom_command_poll >= self._custom_stats_send_period_in_seconds or + self._last_custom_command_poll == 0): self._last_custom_command_poll = cur_monotonic self.__send_custom_command_statistics() except Exception as e: self._log.error("Error in statistics thread: %s", e) - Event().wait(5) \ No newline at end of file + Event().wait(5) diff --git a/thingsboard_gateway/gateway/tb_client.py b/thingsboard_gateway/gateway/tb_client.py index c0d4dfd5..d01dc785 100644 --- a/thingsboard_gateway/gateway/tb_client.py +++ b/thingsboard_gateway/gateway/tb_client.py @@ -29,11 +29,13 @@ from thingsboard_gateway.tb_utility.tb_utility import TBUtility try: - from tb_gateway_mqtt import TBGatewayMqttClient, TBDeviceMqttClient, GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, GATEWAY_ATTRIBUTES_TOPIC + from tb_gateway_mqtt import TBGatewayMqttClient, TBDeviceMqttClient, \ + GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, GATEWAY_ATTRIBUTES_TOPIC except ImportError: print("tb-mqtt-client library not found - installing...") TBUtility.install_package('tb-mqtt-client') - from tb_gateway_mqtt import TBGatewayMqttClient, TBDeviceMqttClient, GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, GATEWAY_ATTRIBUTES_TOPIC + from tb_gateway_mqtt import TBGatewayMqttClient, TBDeviceMqttClient, \ + GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, GATEWAY_ATTRIBUTES_TOPIC import tb_device_mqtt tb_device_mqtt.DEFAULT_TIMEOUT = 3 @@ -182,9 +184,12 @@ def _create_mqtt_client(self, credentials): if 'rate_limit' in inspect.signature(TBGatewayMqttClient.__init__).parameters: rate_limits_config = {} if self.__config.get('rateLimits'): - rate_limits_config['rate_limit'] = 'DEFAULT_RATE_LIMIT' if self.__config.get('rateLimits') == 'DEFAULT_TELEMETRY_RATE_LIMIT' else self.__config['rateLimits'] - if 'dp_rate_limit' in inspect.signature(TBGatewayMqttClient.__init__).parameters and self.__config.get('dpRateLimits'): - rate_limits_config['dp_rate_limit'] = 'DEFAULT_RATE_LIMIT' if self.__config['dpRateLimits'] == 'DEFAULT_TELEMETRY_DP_RATE_LIMIT' else self.__config['dpRateLimits'] + rate_limits_config['rate_limit'] = 'DEFAULT_RATE_LIMIT' if self.__config.get( + 'rateLimits') == 'DEFAULT_TELEMETRY_RATE_LIMIT' else self.__config['rateLimits'] + if ('dp_rate_limit' in inspect.signature(TBGatewayMqttClient.__init__).parameters and + self.__config.get('dpRateLimits')): + rate_limits_config['dp_rate_limit'] = 'DEFAULT_RATE_LIMIT' if self.__config[ + 'dpRateLimits'] == 'DEFAULT_TELEMETRY_DP_RATE_LIMIT' else self.__config['dpRateLimits'] if rate_limits_config: self.client = TBGatewayMqttClient(self.__host, self.__port, self.__username, self.__password, self, @@ -217,7 +222,8 @@ def _create_mqtt_client(self, credentials): daemon=True) self._check_cert_thread.start() - cert_required = CERT_REQUIRED if self.__ca_cert and self.__cert else ssl.CERT_OPTIONAL if self.__cert else ssl.CERT_NONE + cert_required = CERT_REQUIRED if (self.__ca_cert and + self.__cert) else ssl.CERT_OPTIONAL if self.__cert else ssl.CERT_NONE # if self.__ca_cert is None: # self.__logger.info("CA certificate is not provided. Using system CA certificates.") @@ -294,10 +300,12 @@ def is_connected(self): def _on_connect(self, client, userdata, flags, result_code, *extra_params): self.__logger.debug('TB client %s connected to platform', str(client)) - if (isinstance(result_code, int) and result_code == 0) or (hasattr(result_code, 'value') and result_code.value == 0): + if ((isinstance(result_code, int) and result_code == 0) or + (hasattr(result_code, 'value') and result_code.value == 0)): self.__is_connected = True if self.__initial_connection_done: - self.client.rate_limits_received = True # Added to avoid stuck on reconnect, if rate limits reached, TODO: move to high priority. + self.client.rate_limits_received = True # Added to avoid stuck on reconnect, if rate limits reached, + # TODO: move to high priority. else: self.__initial_connection_done = True # pylint: disable=protected-access @@ -454,9 +462,12 @@ def is_subscribed_to_service_attributes(self): return GATEWAY_ATTRIBUTES_RESPONSE_TOPIC in self.client._gw_subscriptions.values() and GATEWAY_ATTRIBUTES_TOPIC in self.client._gw_subscriptions.values() # noqa pylint: disable=protected-access def update_client_rate_limits_with_previous(self, previous_limits): - self.client._devices_connected_through_gateway_telemetry_messages_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('devices_connected_through_gateway_telemetry_messages_rate_limit', {})) - self.client._devices_connected_through_gateway_telemetry_datapoints_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('devices_connected_through_gateway_telemetry_datapoints_rate_limit', {})) - self.client._devices_connected_through_gateway_messages_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('devices_connected_through_gateway_messages_rate_limit', {})) + self.client._devices_connected_through_gateway_telemetry_messages_rate_limit = tb_device_mqtt.RateLimit( + previous_limits.get('devices_connected_through_gateway_telemetry_messages_rate_limit', {})) + self.client._devices_connected_through_gateway_telemetry_datapoints_rate_limit = tb_device_mqtt.RateLimit( + previous_limits.get('devices_connected_through_gateway_telemetry_datapoints_rate_limit', {})) + self.client._devices_connected_through_gateway_messages_rate_limit = tb_device_mqtt.RateLimit( + previous_limits.get('devices_connected_through_gateway_messages_rate_limit', {})) self.client._messages_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('messages_rate_limit', {})) # noqa pylint: disable=protected-access self.client._telemetry_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('telemetry_rate_limit', {})) # noqa pylint: disable=protected-access diff --git a/thingsboard_gateway/tb_gateway.py b/thingsboard_gateway/tb_gateway.py index 35d76827..ed8908a6 100644 --- a/thingsboard_gateway/tb_gateway.py +++ b/thingsboard_gateway/tb_gateway.py @@ -17,6 +17,7 @@ from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService from thingsboard_gateway.gateway.hot_reloader import HotReloader + def main(): if "logs" not in listdir(curdir): mkdir("logs") @@ -37,6 +38,7 @@ def daemon(): config_path = __get_config_path("/etc/thingsboard-gateway/config/".replace('/', path.sep)) TBGatewayService(config_path + "tb_gateway.json") + def __get_config_path(default_config_path): config_path = environ.get("TB_GW_CONFIG_DIR", default_config_path) if not config_path.endswith(path.sep): diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index e8b8a486..961dd27d 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -201,7 +201,8 @@ def send_current_configuration(self): def send_connector_current_configuration(self, connector_configuration: dict): config_to_send = {**connector_configuration, 'logLevel': connector_configuration.get('configurationJson', {}).get('logLevel', 'INFO'), - 'enableRemoteLogging': connector_configuration.get('configurationJson', {}).get('enableRemoteLogging', False), + 'enableRemoteLogging': + connector_configuration.get('configurationJson', {}).get('enableRemoteLogging', False), 'ts': int(time() * 1000) } if config_to_send.get('config') is not None: diff --git a/thingsboard_gateway/tb_utility/tb_loader.py b/thingsboard_gateway/tb_utility/tb_loader.py index 7e3cdef0..0858b840 100644 --- a/thingsboard_gateway/tb_utility/tb_loader.py +++ b/thingsboard_gateway/tb_utility/tb_loader.py @@ -59,7 +59,8 @@ def import_module(extension_type, module_name): for file in listdir(current_extension_path): if not file.startswith('__') and (file.endswith('.py') or file.endswith('.pyc')): try: - module_spec = spec_from_file_location(module_name, current_extension_path + path.sep + file) + module_spec = spec_from_file_location(module_name, + current_extension_path + path.sep + file) log.debug(module_spec) if module_spec is None: diff --git a/thingsboard_gateway/tb_utility/tb_logger.py b/thingsboard_gateway/tb_utility/tb_logger.py index 2b8f430e..881315c4 100644 --- a/thingsboard_gateway/tb_utility/tb_logger.py +++ b/thingsboard_gateway/tb_utility/tb_logger.py @@ -260,7 +260,7 @@ def update_file_handlers(): def check_and_update_file_handlers_class_name(config): for handler_config in config.get('handlers', {}).values(): if handler_config.get('class', '') == 'thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler': - handler_config['class'] = 'thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler' + handler_config['class'] = 'thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler' # noqa logging.setLoggerClass(TbLogger) diff --git a/thingsboard_gateway/tb_utility/tb_remote_shell.py b/thingsboard_gateway/tb_utility/tb_remote_shell.py index 7ff4f94c..54b9e483 100644 --- a/thingsboard_gateway/tb_utility/tb_remote_shell.py +++ b/thingsboard_gateway/tb_utility/tb_remote_shell.py @@ -13,7 +13,6 @@ # limitations under the License. # -from logging import getLogger from os import chdir, getcwd from subprocess import PIPE, Popen, STDOUT, TimeoutExpired @@ -46,7 +45,8 @@ def send_command(self, *args): chdir(cwd) if command.split(): if self.command_in_progress is not None: - self.__logger.debug("Received a new command: \"%s\", during old command is running, terminating old command...", command) + self.__logger.debug("Received a new command: \"%s\", during old command is running,\ + terminating old command...", command) old_command = self.command_in_progress.args self.terminate_command() self.__logger.debug("Old command: \"%s\" terminated.", old_command) @@ -57,7 +57,8 @@ def send_command(self, *args): self.command_in_progress = "cd" else: self.__logger.debug("Run command in remote shell: %s", command) - self.command_in_progress = Popen(command, shell=True, stdout=PIPE, stdin=PIPE, stderr=STDOUT, universal_newlines=True) + self.command_in_progress = Popen(command, shell=True, stdout=PIPE, + stdin=PIPE, stderr=STDOUT, universal_newlines=True) result.update({"ok": True}) return result diff --git a/thingsboard_gateway/tb_utility/tb_utility.py b/thingsboard_gateway/tb_utility/tb_utility.py index 5f7f79ad..57baa390 100644 --- a/thingsboard_gateway/tb_utility/tb_utility.py +++ b/thingsboard_gateway/tb_utility/tb_utility.py @@ -17,7 +17,7 @@ from os import environ from platform import system as platform_system from re import search, findall -from typing import Union +from typing import Union, TYPE_CHECKING from uuid import uuid4 from cryptography import x509 @@ -33,6 +33,9 @@ from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig from thingsboard_gateway.tb_utility.tb_logger import TbLogger +if TYPE_CHECKING: + from thingsboard_gateway.gateway.entities.converted_data import ConvertedData + setLoggerClass(TbLogger) log = getLogger("service") @@ -121,13 +124,15 @@ def get_value(expression, body=None, value_type="string", get_tag=False, express try: if isinstance(body, dict) and target_str.split()[0] in body: if value_type.lower() == "string": - full_value = str(expression[0: max(p1 - 2, 0)]) + str(body[target_str.split()[0]]) + str(expression[p2 + 1:len(expression)]) + full_value = (str(expression[0: max(p1 - 2, 0)]) + + str(body[target_str.split()[0]]) + + str(expression[p2 + 1:len(expression)])) else: full_value = body.get(target_str.split()[0]) elif isinstance(body, (dict, list)): try: if " " in target_str: - target_str = '.'.join('"' + section_key + '"' if " " in section_key else section_key for section_key in target_str.split('.')) + target_str = '.'.join('"' + section_key + '"' if " " in section_key else section_key for section_key in target_str.split('.')) # noqa jsonpath_expression = parse(target_str) jsonpath_match = jsonpath_expression.find(body) if jsonpath_match: @@ -386,4 +391,3 @@ def get_service_environmental_variables(): converted_env_variables[key] = value return converted_env_variables - diff --git a/thingsboard_gateway/version.py b/thingsboard_gateway/version.py index 1413560c..aebff413 100644 --- a/thingsboard_gateway/version.py +++ b/thingsboard_gateway/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -VERSION = "3.6.2" \ No newline at end of file +VERSION = "3.6.2"