Skip to content

Commit

Permalink
fix: move retry into ipc_client.publish_to_iot_core
Browse files Browse the repository at this point in the history
  • Loading branch information
BolongZhang-AWS committed Aug 23, 2022
1 parent c9507e2 commit f4be642
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 26 deletions.
43 changes: 23 additions & 20 deletions greengrass_defender_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,34 @@ def publish_metrics(ipc_client, config_changed, metrics_collector, sample_interv

metric = metrics_collector.collect_metrics()
config.logger.debug("Publishing metrics: {}".format(metric.to_json_string()))
ipc_client.publish_to_iot_core(config.TOPIC, metric.to_json_string())

env_config = set_env_variables_config()
publish_retry = env_config[config.PUBLISH_RETRY_CONFIG_KEY]
need_retry = True
retry_time = config.INITIAL_RETRY_INTERVAL_SECONDS

while (need_retry and publish_retry >= 0):
try:
ipc_client.publish_to_iot_core(config.TOPIC, metric.to_json_string())
need_retry = False
except Exception as e:
if publish_retry < 1:
config.logger.error("Exhausted all retries when publishing to cloud")
raise e
handle_expcetion_and_sleep("metrics publish", e, retry_time, publish_retry)
if retry_time < config.MAX_RETRY_INTERVAL_SECONDS:
retry_time = retry_time * 2 + randint(0, config.MAX_JITTER_TIME_INTERVAL_SECONDS)
else:
retry_time = config.MAX_RETRY_INTERVAL_SECONDS
publish_retry -= 1

config.SCHEDULED_THREAD = Timer(
float(sample_interval_seconds), publish_metrics,
[ipc_client, config_changed, metrics_collector, sample_interval_seconds]
)
config.SCHEDULED_THREAD.start()

except Exception as e:
config.logger.error("Error collecting and publishing metrics: {}".format(e))
config.logger.exception("Error collecting and publishing metrics: {}".format(e))
raise e


Expand Down Expand Up @@ -196,23 +214,8 @@ def main():
metrics_collector = collector.Collector(short_metrics_names=False)

# Start collecting and publishing metrics
env_config = set_env_variables_config()
publish_retry = env_config[config.PUBLISH_RETRY_CONFIG_KEY]
need_retry = True
retry_time = config.INITIAL_RETRY_INTERVAL_SECONDS

while (need_retry and publish_retry >= 0):
try:
set_configuration_and_publish(ipc_client, configuration, metrics_collector)
need_retry = False
except Exception as e:
handle_expcetion_and_sleep("metrics publish", e, retry_time, publish_retry)
if retry_time < config.MAX_RETRY_INTERVAL_SECONDS:
retry_time = retry_time * 2 + randint(0, config.MAX_JITTER_TIME_INTERVAL_SECONDS)
else:
retry_time = config.MAX_RETRY_INTERVAL_SECONDS
publish_retry -= 1

set_configuration_and_publish(ipc_client, configuration, metrics_collector)

# Subscribe to the subsequent configuration changes
ipc_client.subscribe_to_config_updates()
Thread(
Expand Down
12 changes: 6 additions & 6 deletions greengrass_defender_agent/ipc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def publish_to_iot_core(self, topic, payload):
future.result(config.TIMEOUT)
config.logger.info("Published to the IoT core...")
except Exception as e:
config.logger.error("Exception occurred during publish to {}: {}".format(topic, e))
config.logger.exception("Exception occurred during publish to {}: {}".format(topic, e))
raise e

def subscribe_to_iot_core(self, topic):
Expand All @@ -86,7 +86,7 @@ def subscribe_to_iot_core(self, topic):
future.result(config.TIMEOUT)
config.logger.info("Subscribed to topic {}".format(topic))
except Exception as e:
config.logger.error("Exception occurred during subscribe: {}".format(e))
config.logger.exception("Exception occurred during subscribe: {}".format(e))
raise e

def get_configuration(self):
Expand All @@ -102,7 +102,7 @@ def get_configuration(self):
result = operation.get_response().result(config.TIMEOUT)
return result.value
except Exception as e:
config.logger.error(
config.logger.exception(
"Exception occurred during fetching the configuration: {}".format(e)
)
raise e
Expand All @@ -120,7 +120,7 @@ def subscribe_to_config_updates(self):
subscribe_operation.activate(subsreq).result(config.TIMEOUT)
subscribe_operation.get_response().result(config.TIMEOUT)
except Exception as e:
config.logger.error(
config.logger.exception(
"Exception occurred during subscribing to the configuration updates: {}".format(e)
)
raise e
Expand All @@ -139,7 +139,7 @@ def on_stream_event(self, event: ConfigurationUpdateEvents) -> None:
config.condition.notify()

def on_stream_error(self, error: Exception) -> bool:
config.logger.error("Error in config update subscriber - {0}".format(error))
config.logger.exception("Error in config update subscriber - {0}".format(error))
return False

def on_stream_closed(self) -> None:
Expand All @@ -162,7 +162,7 @@ def on_stream_event(self, event: IoTCoreMessage) -> None:
config.logger.debug("Received message from topic {}: {}".format(self.topic, received_message))

def on_stream_error(self, error: Exception) -> bool:
config.logger.error("Error in Iot Core subscriber - {0}".format(error))
config.logger.exception("Error in Iot Core subscriber - {0}".format(error))
return False

def on_stream_closed(self) -> None:
Expand Down

0 comments on commit f4be642

Please sign in to comment.