Skip to content

Commit

Permalink
Reformatting
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Dec 17, 2024
1 parent 0c9772f commit d811346
Show file tree
Hide file tree
Showing 16 changed files with 121 additions and 72 deletions.
8 changes: 5 additions & 3 deletions thingsboard_gateway/gateway/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@
"metadata": {
RECEIVED_TS_PARAMETER: int(time() * 1000),
CONNECTOR_PARAMETER: "",
"publishedTs": int(time() * 1000)}}
, separators=(',', ':'), skipkeys=True).encode("utf-8"))
"publishedTs": int(time() * 1000)}}, separators=(',', ':'), skipkeys=True).encode("utf-8"))

# Report strategy parameters
REPORT_STRATEGY_PARAMETER = "reportStrategy"
REPORT_PERIOD_PARAMETER = "reportPeriod"
TYPE_PARAMETER = "type"
AGGREGATION_FUNCTION_PARAMETER = "aggregationFunction"


class ReportStrategy(Enum):
ON_REPORT_PERIOD = "ON_REPORT_PERIOD"
ON_CHANGE = "ON_CHANGE"
Expand All @@ -98,6 +98,7 @@ def from_string(cls, value: str):
return strategy
raise ValueError("Invalid report strategy value: %r" % value)


DEFAULT_REPORT_STRATEGY_CONFIG = {
TYPE_PARAMETER: ReportStrategy.ON_RECEIVED.value,
REPORT_PERIOD_PARAMETER: 10000
Expand All @@ -111,4 +112,5 @@ def from_string(cls, value: str):
DATA_PARAMETER = "data"

# Attribute constants
ATTRIBUTES_FOR_REQUEST = ["RemoteLoggingLevel", "general_configuration", "storage_configuration", "grpc_configuration", "logs_configuration", "active_connectors"]
ATTRIBUTES_FOR_REQUEST = ["RemoteLoggingLevel", "general_configuration", "storage_configuration",
"grpc_configuration", "logs_configuration", "active_connectors"]
2 changes: 1 addition & 1 deletion thingsboard_gateway/gateway/entities/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


class Attributes:
def __init__(self, values: Dict[DatapointKey, Any]=None):
def __init__(self, values: Dict[DatapointKey, Any] = None):
self.values: Dict[DatapointKey, Any] = values or {}

def __str__(self):
Expand Down
25 changes: 14 additions & 11 deletions thingsboard_gateway/gateway/entities/converted_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def split_large_entries(entries: dict, first_item_max_data_size: int, max_data_s
split_chunk_sizes.append(current_size)
ts_check = False
# Start a new chunk
current_chunk = {original_key: value} # New dict is created to avoid modifying the original dict
current_chunk = {original_key: value} # New dict is created to avoid modifying the original dict
current_size = entry_size
else:
# Add to current chunk
Expand All @@ -59,7 +59,9 @@ def split_large_entries(entries: dict, first_item_max_data_size: int, max_data_s


class ConvertedData:
__slots__ = ['device_name', 'device_type', 'telemetry', 'attributes', 'metadata', '_telemetry_datapoints_count', 'ts_index']
__slots__ = ['device_name', 'device_type', 'telemetry', 'attributes',
'metadata', '_telemetry_datapoints_count', 'ts_index']

def __init__(self, device_name, device_type='default', metadata=None):
self.device_name = device_name
self.device_type = device_type
Expand Down Expand Up @@ -118,7 +120,7 @@ def _add_single_telemetry_entry(self, telemetry_entry: Union[dict, TelemetryEntr
existing_values = self.telemetry[index].values

new_keys = telemetry_entry.values.keys() - existing_values.keys()
new_values = {key_and_report_strategy: telemetry_entry.values[key_and_report_strategy] for key_and_report_strategy in new_keys}
new_values = {key_and_rs: telemetry_entry.values[key_and_rs] for key_and_rs in new_keys}

self._telemetry_datapoints_count += len(new_values)
existing_values.update(new_values)
Expand All @@ -127,7 +129,7 @@ def _add_single_telemetry_entry(self, telemetry_entry: Union[dict, TelemetryEntr
self._telemetry_datapoints_count += len(telemetry_entry.values)
self.ts_index[telemetry_entry.ts] = len(self.telemetry) - 1

def add_to_attributes(self, key_or_entry: Union[dict, str, List[dict], DatapointKey], value = None):
def add_to_attributes(self, key_or_entry: Union[dict, str, List[dict], DatapointKey], value=None):
if isinstance(key_or_entry, list):
for entry in key_or_entry:
if not isinstance(entry, dict):
Expand Down Expand Up @@ -182,8 +184,8 @@ def convert_to_objects_with_maximal_size(self, max_data_size) -> List['Converted
current_data_size += attributes_bytes_size
else:
split_attributes_and_sizes = split_large_entries(attributes_dict,
max_data_size - current_data_size,
available_data_size)
max_data_size - current_data_size,
available_data_size)
for data_chunk, chunk_size in split_attributes_and_sizes:
if current_data_size + chunk_size >= max_data_size:
converted_objects.append(current_data)
Expand All @@ -196,17 +198,18 @@ def convert_to_objects_with_maximal_size(self, max_data_size) -> List['Converted
telemetry_values = telemetry_entry.values
ts_data_size = TBUtility.get_data_size({"ts": telemetry_entry.ts}) + 1

telemetry_obj_size = TBUtility.get_data_size({datapoint_key.key: value for datapoint_key, value in telemetry_values.items()}) + ts_data_size
telemetry_obj_size = TBUtility.get_data_size(
{datapoint_key.key: value for datapoint_key, value in telemetry_values.items()}) + ts_data_size

if telemetry_obj_size <= max_data_size - current_data_size:
current_data.add_to_telemetry(telemetry_entry)
current_data_size += telemetry_obj_size
else:
split_telemetry_and_sizes = split_large_entries(telemetry_values,
max_data_size - current_data_size,
available_data_size,
telemetry_entry.ts,
ts_data_size)
max_data_size - current_data_size,
available_data_size,
telemetry_entry.ts,
ts_data_size)
for telemetry_chunk, chunk_size in split_telemetry_and_sizes:

if current_data_size + chunk_size > max_data_size:
Expand Down
1 change: 0 additions & 1 deletion thingsboard_gateway/gateway/entities/datapoint_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def __str__(self):
def __repr__(self):
return self.__str__()


def __hash__(self):
return hash((self.key, self.report_strategy))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def from_string(cls, value: str):


class ReportStrategyConfig:
def __init__(self, config, default_report_strategy_config = {}):
def __init__(self, config, default_report_strategy_config={}):
if isinstance(config, ReportStrategyConfig):
self.report_period = config.report_period
self.report_strategy = config.report_strategy
Expand All @@ -49,7 +49,8 @@ def __init__(self, config, default_report_strategy_config = {}):
and not default_report_strategy_config):
raise ValueError("Report strategy config is not specified")
if default_report_strategy_config:
self.report_period = max(config.get(REPORT_PERIOD_PARAMETER, default_report_strategy_config[REPORT_PERIOD_PARAMETER]) - 10, 1)
self.report_period = max(config.get(REPORT_PERIOD_PARAMETER,
default_report_strategy_config[REPORT_PERIOD_PARAMETER]) - 10, 1)
report_strategy_type = config.get(TYPE_PARAMETER, default_report_strategy_config[TYPE_PARAMETER])
else:
self.report_period = max(config.get(REPORT_PERIOD_PARAMETER, 0), 1)
Expand Down Expand Up @@ -82,4 +83,5 @@ def __eq__(self, other):
and self.aggregation_function == other.aggregation_function)

def __str__(self):
return f"ReportStrategyConfig(report_period={self.report_period}, report_strategy={self.report_strategy}, aggregation_function={self.aggregation_function})"
return f"ReportStrategyConfig(report_period={self.report_period}, report_strategy={self.report_strategy},\
aggregation_function={self.aggregation_function})"
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from time import monotonic

from thingsboard_gateway.gateway.constants import ReportStrategy
from thingsboard_gateway.gateway.entities.datapoint_key import DatapointKey
from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig


class ReportStrategyDataRecord:
__slots__ = ["_value", "_device_name", "_device_type","_connector_name",
"_connector_id", "_report_strategy", "_last_report_time", "_is_telemetry","_ts"]
__slots__ = ["_value", "_device_name", "_device_type", "_connector_name",
"_connector_id", "_report_strategy", "_last_report_time", "_is_telemetry", "_ts"]

def __init__(self, value, device_name, device_type, connector_name, connector_id, report_strategy, is_telemetry):
self._value = value
self._device_name = device_name
Expand Down Expand Up @@ -60,7 +60,8 @@ def update_ts(self, ts):
self._ts = ts

def should_be_reported_by_period(self, current_time):
if self._report_strategy.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD):
if self._report_strategy.report_strategy in (ReportStrategy.ON_REPORT_PERIOD,
ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD):
return (self._last_report_time is None
or current_time - self._last_report_time + 50 >= self._report_strategy.report_period)
else:
Expand All @@ -75,8 +76,15 @@ def __init__(self, config):
self._config = config
self._data_cache = {}

def put(self, datapoint_key: DatapointKey, data: str, device_name, device_type, connector_name, connector_id, report_strategy, is_telemetry):
self._data_cache[(datapoint_key, device_name, connector_id)] = ReportStrategyDataRecord(data, device_name, device_type, connector_name, connector_id, report_strategy, is_telemetry)
def put(self, datapoint_key: DatapointKey, data: str, device_name,
device_type, connector_name, connector_id, report_strategy, is_telemetry):
self._data_cache[(datapoint_key, device_name, connector_id)] = ReportStrategyDataRecord(data,
device_name,
device_type,
connector_name,
connector_id,
report_strategy,
is_telemetry)

def get(self, datapoint_key: DatapointKey, device_name, connector_id) -> ReportStrategyDataRecord:
return self._data_cache.get((datapoint_key, device_name, connector_id))
Expand All @@ -97,4 +105,3 @@ def delete_all_records_for_connector_by_connector_id(self, connector_id):

def clear(self):
self._data_cache.clear()

Loading

0 comments on commit d811346

Please sign in to comment.