diff --git a/thingsboard_gateway/gateway/constants.py b/thingsboard_gateway/gateway/constants.py index 4f2ae190..1e52f192 100644 --- a/thingsboard_gateway/gateway/constants.py +++ b/thingsboard_gateway/gateway/constants.py @@ -76,8 +76,7 @@ "metadata": { RECEIVED_TS_PARAMETER: int(time() * 1000), CONNECTOR_PARAMETER: "", - "publishedTs": int(time() * 1000)}} - , separators=(',', ':'), skipkeys=True).encode("utf-8")) + "publishedTs": int(time() * 1000)}}, separators=(',', ':'), skipkeys=True).encode("utf-8")) # Report strategy parameters REPORT_STRATEGY_PARAMETER = "reportStrategy" @@ -85,6 +84,7 @@ TYPE_PARAMETER = "type" AGGREGATION_FUNCTION_PARAMETER = "aggregationFunction" + class ReportStrategy(Enum): ON_REPORT_PERIOD = "ON_REPORT_PERIOD" ON_CHANGE = "ON_CHANGE" @@ -98,6 +98,7 @@ def from_string(cls, value: str): return strategy raise ValueError("Invalid report strategy value: %r" % value) + DEFAULT_REPORT_STRATEGY_CONFIG = { TYPE_PARAMETER: ReportStrategy.ON_RECEIVED.value, REPORT_PERIOD_PARAMETER: 10000 @@ -111,4 +112,5 @@ def from_string(cls, value: str): DATA_PARAMETER = "data" # Attribute constants -ATTRIBUTES_FOR_REQUEST = ["RemoteLoggingLevel", "general_configuration", "storage_configuration", "grpc_configuration", "logs_configuration", "active_connectors"] +ATTRIBUTES_FOR_REQUEST = ["RemoteLoggingLevel", "general_configuration", "storage_configuration", + "grpc_configuration", "logs_configuration", "active_connectors"] diff --git a/thingsboard_gateway/gateway/entities/attributes.py b/thingsboard_gateway/gateway/entities/attributes.py index 73930902..4c59f056 100644 --- a/thingsboard_gateway/gateway/entities/attributes.py +++ b/thingsboard_gateway/gateway/entities/attributes.py @@ -18,7 +18,7 @@ class Attributes: - def __init__(self, values: Dict[DatapointKey, Any]=None): + def __init__(self, values: Dict[DatapointKey, Any] = None): self.values: Dict[DatapointKey, Any] = values or {} def __str__(self): diff --git a/thingsboard_gateway/gateway/entities/converted_data.py b/thingsboard_gateway/gateway/entities/converted_data.py index 6420c4da..f5b489fc 100644 --- a/thingsboard_gateway/gateway/entities/converted_data.py +++ b/thingsboard_gateway/gateway/entities/converted_data.py @@ -43,7 +43,7 @@ def split_large_entries(entries: dict, first_item_max_data_size: int, max_data_s split_chunk_sizes.append(current_size) ts_check = False # Start a new chunk - current_chunk = {original_key: value} # New dict is created to avoid modifying the original dict + current_chunk = {original_key: value} # New dict is created to avoid modifying the original dict current_size = entry_size else: # Add to current chunk @@ -59,7 +59,9 @@ def split_large_entries(entries: dict, first_item_max_data_size: int, max_data_s class ConvertedData: - __slots__ = ['device_name', 'device_type', 'telemetry', 'attributes', 'metadata', '_telemetry_datapoints_count', 'ts_index'] + __slots__ = ['device_name', 'device_type', 'telemetry', 'attributes', + 'metadata', '_telemetry_datapoints_count', 'ts_index'] + def __init__(self, device_name, device_type='default', metadata=None): self.device_name = device_name self.device_type = device_type @@ -118,7 +120,7 @@ def _add_single_telemetry_entry(self, telemetry_entry: Union[dict, TelemetryEntr existing_values = self.telemetry[index].values new_keys = telemetry_entry.values.keys() - existing_values.keys() - new_values = {key_and_report_strategy: telemetry_entry.values[key_and_report_strategy] for key_and_report_strategy in new_keys} + new_values = {key_and_rs: telemetry_entry.values[key_and_rs] for key_and_rs in new_keys} self._telemetry_datapoints_count += len(new_values) existing_values.update(new_values) @@ -127,7 +129,7 @@ def _add_single_telemetry_entry(self, telemetry_entry: Union[dict, TelemetryEntr self._telemetry_datapoints_count += len(telemetry_entry.values) self.ts_index[telemetry_entry.ts] = len(self.telemetry) - 1 - def add_to_attributes(self, key_or_entry: Union[dict, str, List[dict], DatapointKey], value = None): + def add_to_attributes(self, key_or_entry: Union[dict, str, List[dict], DatapointKey], value=None): if isinstance(key_or_entry, list): for entry in key_or_entry: if not isinstance(entry, dict): @@ -182,8 +184,8 @@ def convert_to_objects_with_maximal_size(self, max_data_size) -> List['Converted current_data_size += attributes_bytes_size else: split_attributes_and_sizes = split_large_entries(attributes_dict, - max_data_size - current_data_size, - available_data_size) + max_data_size - current_data_size, + available_data_size) for data_chunk, chunk_size in split_attributes_and_sizes: if current_data_size + chunk_size >= max_data_size: converted_objects.append(current_data) @@ -196,17 +198,18 @@ def convert_to_objects_with_maximal_size(self, max_data_size) -> List['Converted telemetry_values = telemetry_entry.values ts_data_size = TBUtility.get_data_size({"ts": telemetry_entry.ts}) + 1 - telemetry_obj_size = TBUtility.get_data_size({datapoint_key.key: value for datapoint_key, value in telemetry_values.items()}) + ts_data_size + telemetry_obj_size = TBUtility.get_data_size( + {datapoint_key.key: value for datapoint_key, value in telemetry_values.items()}) + ts_data_size if telemetry_obj_size <= max_data_size - current_data_size: current_data.add_to_telemetry(telemetry_entry) current_data_size += telemetry_obj_size else: split_telemetry_and_sizes = split_large_entries(telemetry_values, - max_data_size - current_data_size, - available_data_size, - telemetry_entry.ts, - ts_data_size) + max_data_size - current_data_size, + available_data_size, + telemetry_entry.ts, + ts_data_size) for telemetry_chunk, chunk_size in split_telemetry_and_sizes: if current_data_size + chunk_size > max_data_size: diff --git a/thingsboard_gateway/gateway/entities/datapoint_key.py b/thingsboard_gateway/gateway/entities/datapoint_key.py index cbf016aa..67f1c5f4 100644 --- a/thingsboard_gateway/gateway/entities/datapoint_key.py +++ b/thingsboard_gateway/gateway/entities/datapoint_key.py @@ -29,7 +29,6 @@ def __str__(self): def __repr__(self): return self.__str__() - def __hash__(self): return hash((self.key, self.report_strategy)) diff --git a/thingsboard_gateway/gateway/entities/report_strategy_config.py b/thingsboard_gateway/gateway/entities/report_strategy_config.py index 236c0dc9..b30db26e 100644 --- a/thingsboard_gateway/gateway/entities/report_strategy_config.py +++ b/thingsboard_gateway/gateway/entities/report_strategy_config.py @@ -35,7 +35,7 @@ def from_string(cls, value: str): class ReportStrategyConfig: - def __init__(self, config, default_report_strategy_config = {}): + def __init__(self, config, default_report_strategy_config={}): if isinstance(config, ReportStrategyConfig): self.report_period = config.report_period self.report_strategy = config.report_strategy @@ -49,7 +49,8 @@ def __init__(self, config, default_report_strategy_config = {}): and not default_report_strategy_config): raise ValueError("Report strategy config is not specified") if default_report_strategy_config: - self.report_period = max(config.get(REPORT_PERIOD_PARAMETER, default_report_strategy_config[REPORT_PERIOD_PARAMETER]) - 10, 1) + self.report_period = max(config.get(REPORT_PERIOD_PARAMETER, + default_report_strategy_config[REPORT_PERIOD_PARAMETER]) - 10, 1) report_strategy_type = config.get(TYPE_PARAMETER, default_report_strategy_config[TYPE_PARAMETER]) else: self.report_period = max(config.get(REPORT_PERIOD_PARAMETER, 0), 1) @@ -82,4 +83,5 @@ def __eq__(self, other): and self.aggregation_function == other.aggregation_function) def __str__(self): - return f"ReportStrategyConfig(report_period={self.report_period}, report_strategy={self.report_strategy}, aggregation_function={self.aggregation_function})" \ No newline at end of file + return f"ReportStrategyConfig(report_period={self.report_period}, report_strategy={self.report_strategy},\ + aggregation_function={self.aggregation_function})" diff --git a/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py b/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py index 6f07a731..23615522 100644 --- a/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py +++ b/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from time import monotonic from thingsboard_gateway.gateway.constants import ReportStrategy from thingsboard_gateway.gateway.entities.datapoint_key import DatapointKey @@ -20,8 +19,9 @@ class ReportStrategyDataRecord: - __slots__ = ["_value", "_device_name", "_device_type","_connector_name", - "_connector_id", "_report_strategy", "_last_report_time", "_is_telemetry","_ts"] + __slots__ = ["_value", "_device_name", "_device_type", "_connector_name", + "_connector_id", "_report_strategy", "_last_report_time", "_is_telemetry", "_ts"] + def __init__(self, value, device_name, device_type, connector_name, connector_id, report_strategy, is_telemetry): self._value = value self._device_name = device_name @@ -60,7 +60,8 @@ def update_ts(self, ts): self._ts = ts def should_be_reported_by_period(self, current_time): - if self._report_strategy.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD): + if self._report_strategy.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, + ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD): return (self._last_report_time is None or current_time - self._last_report_time + 50 >= self._report_strategy.report_period) else: @@ -75,8 +76,15 @@ def __init__(self, config): self._config = config self._data_cache = {} - def put(self, datapoint_key: DatapointKey, data: str, device_name, device_type, connector_name, connector_id, report_strategy, is_telemetry): - self._data_cache[(datapoint_key, device_name, connector_id)] = ReportStrategyDataRecord(data, device_name, device_type, connector_name, connector_id, report_strategy, is_telemetry) + def put(self, datapoint_key: DatapointKey, data: str, device_name, + device_type, connector_name, connector_id, report_strategy, is_telemetry): + self._data_cache[(datapoint_key, device_name, connector_id)] = ReportStrategyDataRecord(data, + device_name, + device_type, + connector_name, + connector_id, + report_strategy, + is_telemetry) def get(self, datapoint_key: DatapointKey, device_name, connector_id) -> ReportStrategyDataRecord: return self._data_cache.get((datapoint_key, device_name, connector_id)) @@ -97,4 +105,3 @@ def delete_all_records_for_connector_by_connector_id(self, connector_id): def clear(self): self._data_cache.clear() - diff --git a/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py b/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py index a749a8cc..dab8c501 100644 --- a/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py +++ b/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py @@ -15,16 +15,18 @@ from queue import SimpleQueue from threading import Thread from time import monotonic, time -from typing import Dict, Set, Union +from typing import Dict, Set, Union, TYPE_CHECKING -from thingsboard_gateway.gateway.constants import DEFAULT_REPORT_STRATEGY_CONFIG, ReportStrategy, DEVICE_NAME_PARAMETER, \ - DEVICE_TYPE_PARAMETER, REPORT_STRATEGY_PARAMETER +from thingsboard_gateway.gateway.constants import DEFAULT_REPORT_STRATEGY_CONFIG, \ + ReportStrategy, DEVICE_NAME_PARAMETER, DEVICE_TYPE_PARAMETER, REPORT_STRATEGY_PARAMETER from thingsboard_gateway.gateway.entities.converted_data import ConvertedData from thingsboard_gateway.gateway.entities.datapoint_key import DatapointKey from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry from thingsboard_gateway.gateway.report_strategy.report_strategy_data_cache import ReportStrategyDataCache from thingsboard_gateway.tb_utility.tb_logger import TbLogger +if TYPE_CHECKING: + from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService class ReportStrategyService: @@ -37,14 +39,18 @@ def __init__(self, config: dict, gateway: 'TBGatewayService', send_data_queue: S self._report_strategy_data_cache = ReportStrategyDataCache(config) self._connectors_report_strategies: Dict[str, ReportStrategyConfig] = {} self.__keys_to_report_periodically: Set[DatapointKey, str, str] = set() - self.__periodical_reporting_thread = Thread(target=self.__periodical_reporting, daemon=True, name="Periodical Reporting Thread") + self.__periodical_reporting_thread = Thread(target=self.__periodical_reporting, + daemon=True, + name="Periodical Reporting Thread") self.__periodical_reporting_thread.start() def get_main_report_strategy(self): return self.main_report_strategy - def register_connector_report_strategy(self, connector_name: str, connector_id: str, report_strategy_config: ReportStrategyConfig): - if report_strategy_config is None: # Default report strategy will be used if no report strategy is provided + def register_connector_report_strategy(self, connector_name: str, + connector_id: str, + report_strategy_config: ReportStrategyConfig): + if report_strategy_config is None: # Default report strategy will be used if no report strategy is provided return self._connectors_report_strategies[connector_id] = report_strategy_config self._connectors_report_strategies[connector_name] = report_strategy_config @@ -53,8 +59,8 @@ def filter_data_and_send(self, data: Union[ConvertedData, dict], connector_name: data_to_send = data if isinstance(data, dict): data_to_send = ConvertedData(device_name=data.get(DEVICE_NAME_PARAMETER), - device_type=data.get(DEVICE_TYPE_PARAMETER, "default"), - metadata=data.get("metadata")) + device_type=data.get(DEVICE_TYPE_PARAMETER, "default"), + metadata=data.get("metadata")) for ts_kv in data.get("telemetry", []): data_to_send.add_to_telemetry(ts_kv) data_to_send.add_to_attributes(data.get("attributes", {})) @@ -78,7 +84,7 @@ def filter_data_and_send(self, data: Union[ConvertedData, dict], connector_name: report_strategy = report_strategy_config if isinstance(datapoint_key, str): - datapoint_key = DatapointKey(datapoint_key) # TODO: remove this DatapointKey creation after refactoring, added to avoid errors with old string keys + datapoint_key = DatapointKey(datapoint_key) # TODO: remove this DatapointKey creation after refactoring, added to avoid errors with old string keys # noqa if datapoint_key.report_strategy is not None: report_strategy = datapoint_key.report_strategy @@ -101,7 +107,7 @@ def filter_data_and_send(self, data: Union[ConvertedData, dict], connector_name: report_strategy = report_strategy_config if isinstance(datapoint_key, str): - datapoint_key = DatapointKey(datapoint_key) # TODO: remove this DatapointKey creation after refactoring, added to avoid errors with old string keys + datapoint_key = DatapointKey(datapoint_key) # TODO: remove this DatapointKey creation after refactoring, added to avoid errors with old string keys # noqa if datapoint_key.report_strategy is not None: report_strategy = datapoint_key.report_strategy @@ -119,7 +125,9 @@ def filter_data_and_send(self, data: Union[ConvertedData, dict], connector_name: if converted_data_to_send.telemetry or converted_data_to_send.attributes: self.__send_data_queue.put_nowait((connector_name, connector_id, converted_data_to_send)) - def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_name, device_type, connector_name, connector_id, report_strategy_config: ReportStrategyConfig, is_telemetry: bool): + def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_name, device_type, + connector_name, connector_id, report_strategy_config: ReportStrategyConfig, + is_telemetry: bool): if report_strategy_config is None: report_strategy_config = self.main_report_strategy.report_strategy if datapoint_key.report_strategy is not None: @@ -139,7 +147,8 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) else: - self._report_strategy_data_cache.put(datapoint_key, data, device_name, device_type, connector_name, connector_id, report_strategy_config, is_telemetry) + self._report_strategy_data_cache.put(datapoint_key, data, device_name, device_type, + connector_name, connector_id, report_strategy_config, is_telemetry) if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) return True @@ -158,19 +167,22 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n return False elif report_strategy_config.report_strategy == ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD: self._report_strategy_data_cache.update_key_value(datapoint_key, device_name, connector_id, data) - # self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time) + # self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time) # noqa if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) return True else: return False else: - self._report_strategy_data_cache.put(datapoint_key, data, device_name, device_type, connector_name, connector_id, report_strategy_config, is_telemetry) - if report_strategy_config.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD): + self._report_strategy_data_cache.put(datapoint_key, data, device_name, device_type, + connector_name, connector_id, report_strategy_config, is_telemetry) + if report_strategy_config.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, + ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD): if isinstance(datapoint_key, tuple): datapoint_key, _ = datapoint_key self.__keys_to_report_periodically.add((datapoint_key, device_name, connector_id)) - self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time) + self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, + connector_id, current_time) if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) return True @@ -211,7 +223,7 @@ def __periodical_reporting(self): data_entry = data_to_report[data_report_key] if report_strategy_data_record.is_telemetry(): - # data_entry.add_to_telemetry(TelemetryEntry({key: value}, report_strategy_data_record.get_ts())) # Can be used to keep first ts, instead of overwriting it with current ts + # data_entry.add_to_telemetry(TelemetryEntry({key: value}, report_strategy_data_record.get_ts())) # Can be used to keep first ts, instead of overwriting it with current ts # noqa current_ts = int(time() * 1000) data_entry.add_to_telemetry(TelemetryEntry({key: value}, current_ts)) report_strategy_data_record.update_ts(current_ts) @@ -231,7 +243,8 @@ def __periodical_reporting(self): check_report_strategy_end = int(time() * 1000) if check_report_strategy_end - check_report_strategy_start > 100: - self._logger.warning("The periodical reporting took too long: %d ms", check_report_strategy_end - check_report_strategy_start) + self._logger.warning("The periodical reporting took too long: %d ms", + check_report_strategy_end - check_report_strategy_start) self._logger.warning("The number of keys to report periodically: %d", len(keys_set)) self._logger.warning("The number of reported data: %d", reported_data_length) @@ -243,7 +256,8 @@ def __periodical_reporting(self): self._logger.exception("An error occurred in the Periodical Reporting Thread: %s", e) if occurred_errors > 1: self._logger.error( - "Too many errors occurred in the Periodical Reporting Thread for 5 seconds. Suppressing further error messages." + "Too many errors occurred in the Periodical Reporting Thread for 5 seconds.\ + Suppressing further error messages." ) previous_error_printed_time = current_monotonic occurred_errors = 0 diff --git a/thingsboard_gateway/gateway/statistics/statistics_service.py b/thingsboard_gateway/gateway/statistics/statistics_service.py index 202e22df..8896bb92 100644 --- a/thingsboard_gateway/gateway/statistics/statistics_service.py +++ b/thingsboard_gateway/gateway/statistics/statistics_service.py @@ -111,7 +111,6 @@ def clear_statistics(cls): cls.STATISTICS_STORAGE[key] = 0 cls.CONNECTOR_STATISTICS_STORAGE = {} - @staticmethod def count_connector_message(connector_name, stat_parameter_name, count=1): StatisticsService.add_count(connector_name, stat_parameter_name=stat_parameter_name, @@ -227,7 +226,8 @@ def run(self) -> None: cur_monotonic = int(monotonic()) next_service_poll = (self._last_service_poll + self._stats_send_period_in_seconds) - cur_monotonic - next_custom_command_poll = (self._last_custom_command_poll + self._custom_stats_send_period_in_seconds) - cur_monotonic + next_custom_command_poll = (self._last_custom_command_poll + + self._custom_stats_send_period_in_seconds) - cur_monotonic wait_time = max(0, min(next_service_poll, next_custom_command_poll)) @@ -235,15 +235,17 @@ def run(self) -> None: Event().wait(wait_time) cur_monotonic = int(monotonic()) - if cur_monotonic - self._last_service_poll >= self._stats_send_period_in_seconds or self._last_service_poll == 0: + if (cur_monotonic - self._last_service_poll >= self._stats_send_period_in_seconds or + self._last_service_poll == 0): self._last_service_poll = cur_monotonic self.__send_statistics() self.clear_statistics() - if cur_monotonic - self._last_custom_command_poll >= self._custom_stats_send_period_in_seconds or self._last_custom_command_poll == 0: + if (cur_monotonic - self._last_custom_command_poll >= self._custom_stats_send_period_in_seconds or + self._last_custom_command_poll == 0): self._last_custom_command_poll = cur_monotonic self.__send_custom_command_statistics() except Exception as e: self._log.error("Error in statistics thread: %s", e) - Event().wait(5) \ No newline at end of file + Event().wait(5) diff --git a/thingsboard_gateway/gateway/tb_client.py b/thingsboard_gateway/gateway/tb_client.py index c0d4dfd5..d01dc785 100644 --- a/thingsboard_gateway/gateway/tb_client.py +++ b/thingsboard_gateway/gateway/tb_client.py @@ -29,11 +29,13 @@ from thingsboard_gateway.tb_utility.tb_utility import TBUtility try: - from tb_gateway_mqtt import TBGatewayMqttClient, TBDeviceMqttClient, GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, GATEWAY_ATTRIBUTES_TOPIC + from tb_gateway_mqtt import TBGatewayMqttClient, TBDeviceMqttClient, \ + GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, GATEWAY_ATTRIBUTES_TOPIC except ImportError: print("tb-mqtt-client library not found - installing...") TBUtility.install_package('tb-mqtt-client') - from tb_gateway_mqtt import TBGatewayMqttClient, TBDeviceMqttClient, GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, GATEWAY_ATTRIBUTES_TOPIC + from tb_gateway_mqtt import TBGatewayMqttClient, TBDeviceMqttClient, \ + GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, GATEWAY_ATTRIBUTES_TOPIC import tb_device_mqtt tb_device_mqtt.DEFAULT_TIMEOUT = 3 @@ -182,9 +184,12 @@ def _create_mqtt_client(self, credentials): if 'rate_limit' in inspect.signature(TBGatewayMqttClient.__init__).parameters: rate_limits_config = {} if self.__config.get('rateLimits'): - rate_limits_config['rate_limit'] = 'DEFAULT_RATE_LIMIT' if self.__config.get('rateLimits') == 'DEFAULT_TELEMETRY_RATE_LIMIT' else self.__config['rateLimits'] - if 'dp_rate_limit' in inspect.signature(TBGatewayMqttClient.__init__).parameters and self.__config.get('dpRateLimits'): - rate_limits_config['dp_rate_limit'] = 'DEFAULT_RATE_LIMIT' if self.__config['dpRateLimits'] == 'DEFAULT_TELEMETRY_DP_RATE_LIMIT' else self.__config['dpRateLimits'] + rate_limits_config['rate_limit'] = 'DEFAULT_RATE_LIMIT' if self.__config.get( + 'rateLimits') == 'DEFAULT_TELEMETRY_RATE_LIMIT' else self.__config['rateLimits'] + if ('dp_rate_limit' in inspect.signature(TBGatewayMqttClient.__init__).parameters and + self.__config.get('dpRateLimits')): + rate_limits_config['dp_rate_limit'] = 'DEFAULT_RATE_LIMIT' if self.__config[ + 'dpRateLimits'] == 'DEFAULT_TELEMETRY_DP_RATE_LIMIT' else self.__config['dpRateLimits'] if rate_limits_config: self.client = TBGatewayMqttClient(self.__host, self.__port, self.__username, self.__password, self, @@ -217,7 +222,8 @@ def _create_mqtt_client(self, credentials): daemon=True) self._check_cert_thread.start() - cert_required = CERT_REQUIRED if self.__ca_cert and self.__cert else ssl.CERT_OPTIONAL if self.__cert else ssl.CERT_NONE + cert_required = CERT_REQUIRED if (self.__ca_cert and + self.__cert) else ssl.CERT_OPTIONAL if self.__cert else ssl.CERT_NONE # if self.__ca_cert is None: # self.__logger.info("CA certificate is not provided. Using system CA certificates.") @@ -294,10 +300,12 @@ def is_connected(self): def _on_connect(self, client, userdata, flags, result_code, *extra_params): self.__logger.debug('TB client %s connected to platform', str(client)) - if (isinstance(result_code, int) and result_code == 0) or (hasattr(result_code, 'value') and result_code.value == 0): + if ((isinstance(result_code, int) and result_code == 0) or + (hasattr(result_code, 'value') and result_code.value == 0)): self.__is_connected = True if self.__initial_connection_done: - self.client.rate_limits_received = True # Added to avoid stuck on reconnect, if rate limits reached, TODO: move to high priority. + self.client.rate_limits_received = True # Added to avoid stuck on reconnect, if rate limits reached, + # TODO: move to high priority. else: self.__initial_connection_done = True # pylint: disable=protected-access @@ -454,9 +462,12 @@ def is_subscribed_to_service_attributes(self): return GATEWAY_ATTRIBUTES_RESPONSE_TOPIC in self.client._gw_subscriptions.values() and GATEWAY_ATTRIBUTES_TOPIC in self.client._gw_subscriptions.values() # noqa pylint: disable=protected-access def update_client_rate_limits_with_previous(self, previous_limits): - self.client._devices_connected_through_gateway_telemetry_messages_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('devices_connected_through_gateway_telemetry_messages_rate_limit', {})) - self.client._devices_connected_through_gateway_telemetry_datapoints_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('devices_connected_through_gateway_telemetry_datapoints_rate_limit', {})) - self.client._devices_connected_through_gateway_messages_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('devices_connected_through_gateway_messages_rate_limit', {})) + self.client._devices_connected_through_gateway_telemetry_messages_rate_limit = tb_device_mqtt.RateLimit( + previous_limits.get('devices_connected_through_gateway_telemetry_messages_rate_limit', {})) + self.client._devices_connected_through_gateway_telemetry_datapoints_rate_limit = tb_device_mqtt.RateLimit( + previous_limits.get('devices_connected_through_gateway_telemetry_datapoints_rate_limit', {})) + self.client._devices_connected_through_gateway_messages_rate_limit = tb_device_mqtt.RateLimit( + previous_limits.get('devices_connected_through_gateway_messages_rate_limit', {})) self.client._messages_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('messages_rate_limit', {})) # noqa pylint: disable=protected-access self.client._telemetry_rate_limit = tb_device_mqtt.RateLimit(previous_limits.get('telemetry_rate_limit', {})) # noqa pylint: disable=protected-access diff --git a/thingsboard_gateway/tb_gateway.py b/thingsboard_gateway/tb_gateway.py index 35d76827..ed8908a6 100644 --- a/thingsboard_gateway/tb_gateway.py +++ b/thingsboard_gateway/tb_gateway.py @@ -17,6 +17,7 @@ from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService from thingsboard_gateway.gateway.hot_reloader import HotReloader + def main(): if "logs" not in listdir(curdir): mkdir("logs") @@ -37,6 +38,7 @@ def daemon(): config_path = __get_config_path("/etc/thingsboard-gateway/config/".replace('/', path.sep)) TBGatewayService(config_path + "tb_gateway.json") + def __get_config_path(default_config_path): config_path = environ.get("TB_GW_CONFIG_DIR", default_config_path) if not config_path.endswith(path.sep): diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index e8b8a486..961dd27d 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -201,7 +201,8 @@ def send_current_configuration(self): def send_connector_current_configuration(self, connector_configuration: dict): config_to_send = {**connector_configuration, 'logLevel': connector_configuration.get('configurationJson', {}).get('logLevel', 'INFO'), - 'enableRemoteLogging': connector_configuration.get('configurationJson', {}).get('enableRemoteLogging', False), + 'enableRemoteLogging': + connector_configuration.get('configurationJson', {}).get('enableRemoteLogging', False), 'ts': int(time() * 1000) } if config_to_send.get('config') is not None: diff --git a/thingsboard_gateway/tb_utility/tb_loader.py b/thingsboard_gateway/tb_utility/tb_loader.py index 7e3cdef0..0858b840 100644 --- a/thingsboard_gateway/tb_utility/tb_loader.py +++ b/thingsboard_gateway/tb_utility/tb_loader.py @@ -59,7 +59,8 @@ def import_module(extension_type, module_name): for file in listdir(current_extension_path): if not file.startswith('__') and (file.endswith('.py') or file.endswith('.pyc')): try: - module_spec = spec_from_file_location(module_name, current_extension_path + path.sep + file) + module_spec = spec_from_file_location(module_name, + current_extension_path + path.sep + file) log.debug(module_spec) if module_spec is None: diff --git a/thingsboard_gateway/tb_utility/tb_logger.py b/thingsboard_gateway/tb_utility/tb_logger.py index 2b8f430e..881315c4 100644 --- a/thingsboard_gateway/tb_utility/tb_logger.py +++ b/thingsboard_gateway/tb_utility/tb_logger.py @@ -260,7 +260,7 @@ def update_file_handlers(): def check_and_update_file_handlers_class_name(config): for handler_config in config.get('handlers', {}).values(): if handler_config.get('class', '') == 'thingsboard_gateway.tb_utility.tb_handler.TimedRotatingFileHandler': - handler_config['class'] = 'thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler' + handler_config['class'] = 'thingsboard_gateway.tb_utility.tb_rotating_file_handler.TimedRotatingFileHandler' # noqa logging.setLoggerClass(TbLogger) diff --git a/thingsboard_gateway/tb_utility/tb_remote_shell.py b/thingsboard_gateway/tb_utility/tb_remote_shell.py index 7ff4f94c..54b9e483 100644 --- a/thingsboard_gateway/tb_utility/tb_remote_shell.py +++ b/thingsboard_gateway/tb_utility/tb_remote_shell.py @@ -13,7 +13,6 @@ # limitations under the License. # -from logging import getLogger from os import chdir, getcwd from subprocess import PIPE, Popen, STDOUT, TimeoutExpired @@ -46,7 +45,8 @@ def send_command(self, *args): chdir(cwd) if command.split(): if self.command_in_progress is not None: - self.__logger.debug("Received a new command: \"%s\", during old command is running, terminating old command...", command) + self.__logger.debug("Received a new command: \"%s\", during old command is running,\ + terminating old command...", command) old_command = self.command_in_progress.args self.terminate_command() self.__logger.debug("Old command: \"%s\" terminated.", old_command) @@ -57,7 +57,8 @@ def send_command(self, *args): self.command_in_progress = "cd" else: self.__logger.debug("Run command in remote shell: %s", command) - self.command_in_progress = Popen(command, shell=True, stdout=PIPE, stdin=PIPE, stderr=STDOUT, universal_newlines=True) + self.command_in_progress = Popen(command, shell=True, stdout=PIPE, + stdin=PIPE, stderr=STDOUT, universal_newlines=True) result.update({"ok": True}) return result diff --git a/thingsboard_gateway/tb_utility/tb_utility.py b/thingsboard_gateway/tb_utility/tb_utility.py index 5f7f79ad..57baa390 100644 --- a/thingsboard_gateway/tb_utility/tb_utility.py +++ b/thingsboard_gateway/tb_utility/tb_utility.py @@ -17,7 +17,7 @@ from os import environ from platform import system as platform_system from re import search, findall -from typing import Union +from typing import Union, TYPE_CHECKING from uuid import uuid4 from cryptography import x509 @@ -33,6 +33,9 @@ from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig from thingsboard_gateway.tb_utility.tb_logger import TbLogger +if TYPE_CHECKING: + from thingsboard_gateway.gateway.entities.converted_data import ConvertedData + setLoggerClass(TbLogger) log = getLogger("service") @@ -121,13 +124,15 @@ def get_value(expression, body=None, value_type="string", get_tag=False, express try: if isinstance(body, dict) and target_str.split()[0] in body: if value_type.lower() == "string": - full_value = str(expression[0: max(p1 - 2, 0)]) + str(body[target_str.split()[0]]) + str(expression[p2 + 1:len(expression)]) + full_value = (str(expression[0: max(p1 - 2, 0)]) + + str(body[target_str.split()[0]]) + + str(expression[p2 + 1:len(expression)])) else: full_value = body.get(target_str.split()[0]) elif isinstance(body, (dict, list)): try: if " " in target_str: - target_str = '.'.join('"' + section_key + '"' if " " in section_key else section_key for section_key in target_str.split('.')) + target_str = '.'.join('"' + section_key + '"' if " " in section_key else section_key for section_key in target_str.split('.')) # noqa jsonpath_expression = parse(target_str) jsonpath_match = jsonpath_expression.find(body) if jsonpath_match: @@ -386,4 +391,3 @@ def get_service_environmental_variables(): converted_env_variables[key] = value return converted_env_variables - diff --git a/thingsboard_gateway/version.py b/thingsboard_gateway/version.py index 1413560c..aebff413 100644 --- a/thingsboard_gateway/version.py +++ b/thingsboard_gateway/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -VERSION = "3.6.2" \ No newline at end of file +VERSION = "3.6.2"