Skip to content

Commit

Permalink
Support log group tagging (#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulsuh committed Jun 20, 2024
1 parent f23ef0c commit ffbc180
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
9 changes: 9 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ role. While AWS provides no generic managed CloudWatch Logs writer policy, we re
``arn:aws:iam::aws:policy/AWSOpsWorksCloudWatchLogs`` managed policy, which has just the right permissions without being
overly broad.

Log Group Tagging
~~~~~~~~~~~~~~~~~
Watchtower supports the tagging of log groups. This can be done by adding the parameter ``log_group_tags`` to the
``CloudWatchLogHandler`` constructor. This parameter should be a dictionary of tags to apply to the log group.

If you want to add tags to the log group you will need to add permission for the ``logs:TagResource`` action to your
policy. This will need to be in addition to the AWSOpsWorksCloudWatchLogs policy. (Note: the older ``logs:TagLogGroup``
permission is for the ``tag_log_group()`` call which is on the path to deprecation and is not used by Watchtower.)

Example: Flask logging with Watchtower
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
18 changes: 18 additions & 0 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,24 @@ def test_unicode_logging(self):

self.assertEqual(submit_batch.call_count, 2)

def test_log_group_tagging(self):

tags = {"tag1": "value1", "tag2": "value2"}
handler = CloudWatchLogHandler(log_group_name="test_log_group_tagging",
log_group_tags=tags)
logger = logging.getLogger("test_log_group_tagging")
logger.addHandler(handler)
logger.propagate = False
log_group_arn = handler._get_log_group_arn()

logs_client = boto3.client("logs")

logger.critical("This should create a log group with tags")

tag_list_result = logs_client.list_tags_for_resource(resourceArn=log_group_arn)

self.assertDictEqual(tag_list_result["tags"], tags)


if __name__ == "__main__":
unittest.main()
25 changes: 25 additions & 0 deletions watchtower/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ class CloudWatchLogHandler(logging.Handler):
Create CloudWatch Logs log group if it does not exist. **True** by default.
:param log_group_retention_days:
Sets the retention policy of the log group in days. **None** by default.
:param log_group_tags:
Tag the log group with the specified tags and values. There is no provision for removing tags. **{}** by default.
:param create_log_stream:
Create CloudWatch Logs log stream if it does not exist. **True** by default.
:param json_serialize_default:
Expand Down Expand Up @@ -211,6 +213,7 @@ def __init__(
boto3_client: botocore.client.BaseClient = None,
boto3_profile_name: Optional[str] = None,
create_log_group: bool = True,
log_group_tags: dict[str, str] = {},
json_serialize_default: Optional[Callable] = None,
log_group_retention_days: Optional[int] = None,
create_log_stream: bool = True,
Expand Down Expand Up @@ -261,6 +264,9 @@ def __init__(
if create_log_group:
self._ensure_log_group()

if len(log_group_tags) > 0:
self._tag_log_group(log_group_tags)

if log_group_retention_days:
self._idempotent_call(
"put_retention_policy", logGroupName=self.log_group_name, retentionInDays=self.log_group_retention_days
Expand Down Expand Up @@ -294,6 +300,25 @@ def _ensure_log_group(self):
pass
self._idempotent_call("create_log_group", logGroupName=self.log_group_name)

@functools.cache
def _get_log_group_arn(self):
# get the account number
sts_client = boto3.client("sts")
accountno = sts_client.get_caller_identity()["Account"]
region = self.cwl_client.meta.region_name
return f"arn:aws:logs:{region}:{accountno}:log-group:{self.log_group_name}"

def _tag_log_group(self, log_group_tags: dict[str, str]):
try:
self._idempotent_call("tag_resource", resourceArn=self._get_log_group_arn(), tags=log_group_tags)
except (
self.cwl_client.exceptions.ResourceNotFoundException,
self.cwl_client.exceptions.InvalidParameterException,
self.cwl_client.exceptions.ServiceUnavailableException,
self.cwl_client.exceptions.TooManyTagsException,
) as e:
warnings.warn(f"Failed to tag log group {self.log_group_name}: {e}", WatchtowerWarning)

def _idempotent_call(self, method, *args, **kwargs):
method_callable = getattr(self.cwl_client, method)
try:
Expand Down

0 comments on commit ffbc180

Please sign in to comment.