Skip to content

Commit

Permalink
Breaking: Remove helper functions (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcbanderson committed Dec 29, 2020
1 parent 26c2a8c commit 87a489a
Showing 1 changed file with 18 additions and 243 deletions.
261 changes: 18 additions & 243 deletions reflex_core/aws_rule_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import re
import traceback

import boto3

Expand Down Expand Up @@ -54,7 +55,7 @@ def __init__(self, event):
self.post_remediation_functions = []
self.notifiers = []

self.add_notifiers(SNSNotifier)
self.notifiers.append(SNSNotifier)

def get_boto3_client(self):
"""Instantiate and return a boto3 client.
Expand Down Expand Up @@ -216,7 +217,10 @@ def pre_compliance_check(self):
"Running pre-compliance check function %s",
pre_compliance_check_function.__name__,
)
pre_compliance_check_function()
try:
pre_compliance_check_function()
except Exception:
traceback.print_exc()

def post_compliance_check(self):
"""Runs all post-compliance check functions.
Expand All @@ -235,7 +239,10 @@ def post_compliance_check(self):
"Running post-remediation function %s",
post_compliance_check_function.__name__,
)
post_compliance_check_function()
try:
post_compliance_check_function()
except Exception:
traceback.print_exc()

def pre_remediation(self):
"""Runs all pre-remediation functions.
Expand All @@ -253,7 +260,10 @@ def pre_remediation(self):
self.LOGGER.debug(
"Running pre-remediation function %s", pre_remediation_function.__name__
)
pre_remediation_function()
try:
pre_remediation_function()
except Exception:
traceback.print_exc()

def post_remediation(self):
"""Runs all post-remediation functions.
Expand All @@ -272,7 +282,10 @@ def post_remediation(self):
"Running post-remediation function %s",
post_remediation_function.__name__,
)
post_remediation_function()
try:
post_remediation_function()
except Exception:
traceback.print_exc()

def _get_remediation_message(self):
"""Generates the message that will be sent in notifications.
Expand Down Expand Up @@ -323,185 +336,6 @@ def get_remediation_message_subject(self):
fixed_subject = " ".join(subject_split)
return f"The Reflex Rule {fixed_subject} was triggered."

def add_pre_compliance_check_functions(self, functions):
"""Sets a function or list of functions to be run before the resource
compliance check occurs.
If anything other than a function is present in the list, it will be ignored.
If something other than a function or list is passed, it will be ignored.
Returns:
None
"""
self._add_functions(
functions,
self.pre_compliance_check_functions,
"pre_compliance_check_functions",
)

def remove_pre_compliance_check_functions(self, functions):
"""Stop a function or list of functions from being run before the resource
compliance check occurs.
Takes a function or list of functions and removes them from the list
of pre-compliance check functions. Anything not in the list will be ignored.
Returns:
None
"""
self._remove_functions(
functions,
self.pre_compliance_check_functions,
"pre_compliance_check_functions",
)

def add_post_compliance_check_functions(self, functions):
"""Sets a function or list of functions to be run after the resource
compliance check occurs.
If anything other than a function is present in the list, it will be ignored.
If something other than a function or list is passed, it will be ignored.
Returns:
None
"""
self._add_functions(
functions,
self.post_compliance_check_functions,
"post_compliance_check_functions",
)

def remove_post_compliance_check_functions(self, functions):
"""Stop a function or list of functions from being run after the
resource compliance check occurs.
Takes a function or list of functions and removes them from the list
of post-compliance check functions. Anything not in the list will be ignored.
Returns:
None
"""
self._remove_functions(
functions,
self.post_compliance_check_functions,
"post_compliance_check_functions",
)

def add_pre_remediation_functions(self, functions):
"""Sets a function or list of functions to be run before remediation action occurs.
If anything other than a function is present in the list, it will be ignored.
If something other than a function or list is passed, it will be ignored.
Returns:
None
"""
self._add_functions(
functions, self.pre_remediation_functions, "pre_remediation_functions"
)

def remove_pre_remediation_functions(self, functions):
"""Stop a function or list of functions from being run pre-remediation.
Takes a function or list of functions and removes them from the list
of pre-remediation functions. Anything not in the list will be ignored.
Returns:
None
"""
self._remove_functions(
functions, self.pre_remediation_functions, "pre_remediation_functions"
)

def add_post_remediation_functions(self, functions):
"""Sets a function or list of functions to be run after remediation action occurs.
If anything other than a function is present in the list, it will be ignored.
If something other than a function or list is passed, it will be ignored.
Returns:
None
"""
self._add_functions(
functions, self.post_remediation_functions, "post_remediation_functions"
)

def remove_post_remediation_functions(self, functions):
"""Stop a function or list of functions from being run post-remediation.
Takes a function or list of functions and removes them from the list
of post-remediation functions. Anything not in the list will be ignored.
Returns:
None
"""
self._remove_functions(
functions, self.post_remediation_functions, "post_remediation_functions"
)

def add_notifiers(self, notifiers):
"""Sets a Notifier or list of Notifiers to send remediation notifications with.
If anything other than a Notifier is present in the list, it will be ignored.
If something other than a Notifier or list is passed, it will be ignored.
Returns:
None
"""
if isinstance(notifiers, list):
for notifier in notifiers:
if issubclass(notifier, Notifier):
self.LOGGER.debug(
"Adding %s to list of Notifiers", notifier.__name__
)
self.notifiers.append(notifier)
else:
self.LOGGER.warning(
"%s is not a Notifier. Not adding to list of Notifiers.",
notifier.__name__,
)
elif issubclass(notifiers, Notifier):
self.LOGGER.debug("Adding %s to list of Notifiers", notifiers.__name__)
self.notifiers.append(notifiers)
else:
self.LOGGER.warning(
"%s is not a Notifier or list. Not adding to list of Notifiers.",
notifiers.__name__,
)

def remove_notifiers(self, notifiers):
"""Stop a Notifier or list of Notifiers from sending remediation notifications.
Takes a Notifier or list of Notifiers and stops them from sending
remediation notifications. Anything not currently configured to send
notifictions will be ignored.
Returns:
None
"""
if isinstance(notifiers, list):
for notifier in notifiers:
try:
self.LOGGER.debug(
"Removing %s from list of Notifiers", notifier.__name__
)
self.notifiers.remove(notifier)
except ValueError:
self.LOGGER.warning(
"%s is not in the list of Notifiers. Skipping",
notifier.__name__,
)
else:
try:
self.LOGGER.debug(
"Removing %s from list of Notifiers", notifiers.__name__
)
self.notifiers.remove(notifiers)
except ValueError:
self.LOGGER.warning(
"%s is not in the list of Notifiers. Skipping", notifiers.__name__
)

def notify(self):
"""Send notification messages with all Notifiers.
Expand All @@ -527,62 +361,3 @@ def should_remediate(self):
"""
mode = os.environ.get("MODE", "detect").lower()
return mode == "remediate"

def _add_functions(self, functions, function_list, list_name):
"""Adds a function or list of functions to the provided function list.
If anything other than a function is present in the functions list, it will be ignored.
If something other than a function or list is passed, it will be ignored.
Returns:
None
"""
if isinstance(functions, list):
for function in functions:
if callable(function):
self.LOGGER.debug("Adding %s to %s", function.__name__, list_name)
function_list.append(function)
else:
self.LOGGER.warning(
"%s is not a function. Not adding to %s.",
function.__name__,
list_name,
)
elif callable(functions):
self.LOGGER.debug("Adding %s to %s", functions.__name__, list_name)
function_list.append(functions)
else:
self.LOGGER.warning(
"%s is not a function or list. Not adding to %s.",
functions.__name__,
list_name,
)

def _remove_functions(self, functions, function_list, list_name):
"""Remove a function or list of functions from the provided function list.
Takes a function or list of functions and removes them from function_list.
Anything not in function_list will be ignored.
Returns:
None
"""
if isinstance(functions, list):
for function in functions:
try:
self.LOGGER.debug(
"Removing %s from %s", function.__name__, list_name
)
function_list.remove(function)
except ValueError:
self.LOGGER.warning(
"%s is not in %s. Skipping", function.__name__, list_name,
)
else:
try:
self.LOGGER.debug("Removing %s from %s", functions.__name__, list_name)
function_list.remove(functions)
except ValueError:
self.LOGGER.warning(
"%s is not in %s. Skipping", functions.__name__, list_name,
)

0 comments on commit 87a489a

Please sign in to comment.