From a3972f829549db882f4e8f60e02669b150be94e0 Mon Sep 17 00:00:00 2001 From: Guy Rodrigue Koffi Date: Fri, 19 Aug 2016 01:30:19 +0200 Subject: [PATCH] orphan snapshots cleaning - deleting snapshots at cli launch (with confirmation) - option to skip orphan snapshots checking - code separation - more tests --- amicleaner/cli.py | 443 +++++++++----------------------------------- amicleaner/core.py | 267 ++++++++++++++++++++++++++ amicleaner/utils.py | 106 +++++++++++ tests/test_cli.py | 357 ++++++----------------------------- tests/test_core.py | 256 +++++++++++++++++++++++++ 5 files changed, 770 insertions(+), 659 deletions(-) create mode 100644 amicleaner/core.py create mode 100644 amicleaner/utils.py create mode 100644 tests/test_core.py diff --git a/amicleaner/cli.py b/amicleaner/cli.py index 89d73cb..d631584 100755 --- a/amicleaner/cli.py +++ b/amicleaner/cli.py @@ -1,408 +1,143 @@ #!/usr/bin/env python -import argparse import sys -import boto3 -from botocore.exceptions import ClientError -from prettytable import PrettyTable - -from resources.config import MAPPING_KEY, MAPPING_VALUES, KEEP_PREVIOUS +from core import AMICleaner, OrphanSnapshotCleaner +from resources.config import MAPPING_KEY, MAPPING_VALUES from resources.config import TERM -from resources.models import AMI, AWSEC2Instance - - -class OrphanedSnapshotCleaner: - - """ Finds and removes ebs snapshots left orphaned """ - - def __init__(self): - self.ec2 = boto3.client('ec2') - - -class AMICleaner: - - def __init__(self, ec2=None): - self.ec2 = ec2 or boto3.client('ec2') - - @staticmethod - def get_ami_sorting_key(ami): - - """ return a key for sorting array of AMIs """ - - return ami.creation_date - - def remove_amis(self, amis): - - """ - deregister AMIs (array) and removes related snapshots - :param amis: array of AMI objects - """ - - failed_snapshots = list() - - amis = amis or [] - for ami in amis: - self.ec2.deregister_image(ImageId=ami.id) - print "{0} deregistered".format(ami.id) - for block_device in ami.block_device_mappings: - try: - self.ec2.delete_snapshot( - SnapshotId=block_device.snapshot_id - ) - except ClientError: - failed_snapshots.append(block_device.snapshot_id) - print "{0} deleted\n".format(block_device.snapshot_id) - - if failed_snapshots: - print_failed_snapshots(failed_snapshots) - - return True - - def remove_amis_from_ids(self, ami_ids): - - """ - takes a list of AMI ids, verify on aws and removes them - :param ami_ids: array of AMI ids - """ - - if not ami_ids: - return False - - my_custom_images = self.ec2.describe_images( - Owners=['self'], - ImageIds=ami_ids - ) - amis = [] - for image_json in my_custom_images.get('Images'): - ami = AMI.object_with_json(image_json) - amis.append(ami) - - return self.remove_amis(amis) - - def fetch_available_amis(self): - - """ Retrieve from your aws account your custom AMIs""" - - available_amis = dict() - - my_custom_images = self.ec2.describe_images(Owners=['self']) - for image_json in my_custom_images.get('Images'): - ami = AMI.object_with_json(image_json) - available_amis[ami.id] = ami - - return available_amis - - def fetch_instances(self): - - """ Retrieve from your aws account your running ec2 instances """ - - ec2_instances = dict() - - my_instances = self.ec2.describe_instances() - for reservation in my_instances.get("Reservations", []): - for instance_json in reservation.get("Instances", []): - ec2_instance = AWSEC2Instance.object_with_json(instance_json) - ec2_instances[ec2_instance.image_id] = ec2_instance - - return ec2_instances - - def fetch_candidates(self, amis_dict=None, instances_dict=None): - - """ - Collects AMIs and ec2 instances (as dicts) and returns AMIs not holded - by instances. Both dicts have as keys an ami-id - """ +from utils import Printer, parse_args - amis_dict = amis_dict or self.fetch_available_amis() - instances_dict = instances_dict or self.fetch_instances() - for instance_image_id, ec2_instance in instances_dict.iteritems(): - amis_dict.pop(instance_image_id, None) +class App: - return amis_dict.values() + def __init__(self, args): - def map_candidates(self, candidates_ami=None, mapping_strategy=None): + self.mapping_key = args.mapping_key or MAPPING_KEY + self.mapping_values = args.mapping_values or MAPPING_VALUES + self.keep_previous = args.keep_previous + self.skip_orphans = args.skip_orphans + self.from_ids = args.from_ids + self.full_report = args.full_report + self.force_delete = args.force_delete - """ - Given a dict of AMIs to clean, and a mapping strategy (see config.py), - this function returns a dict of grouped amis with the mapping strategy - name as a key. - - example : - mapping_strategy = {"key": "name", "values": ["ubuntu", "debian"]} - or - mapping_strategy = {"key": "tags", "values": ["env", "role"]} - - print map_candidates(candidates_ami, mapping_strategy) - ==> - { - "ubuntu": [obj1, obj3], - "debian": [obj2, obj5] + self.mapping_strategy = { + "key": self.mapping_key, + "values": self.mapping_values, } - or - ==> - { - "prod.nginx": [obj1, obj3], - "prod.tomcat": [obj2, obj5], - "test.nginx": [obj6, obj7], - } - """ - - mapping_strategy = mapping_strategy or {} - - if not mapping_strategy: - return candidates_ami - - candidates_ami = candidates_ami or self.fetch_candidates() - - candidates_map = dict() - for ami in candidates_ami: - # case : grouping on name - if mapping_strategy.get("key") == "name": - for mapping_value in mapping_strategy.get("values"): - if mapping_value in ami.name: - mapping_list = candidates_map.get(mapping_value) or [] - mapping_list.append(ami) - candidates_map[mapping_value] = mapping_list - # case : grouping on tags - elif mapping_strategy.get("key") == "tags": - mapping_value = self.tags_values_to_string( - ami.tags, - mapping_strategy.get("values") - ) - mapping_list = candidates_map.get(mapping_value) or [] - mapping_list.append(ami) - candidates_map[mapping_value] = mapping_list - - return candidates_map - - @staticmethod - def tags_values_to_string(tags, filters=None): - """ - filters tags(key,value) array and return a string with tags values - :tags is an array of AWSTag objects - """ - - if tags is None: - return None - - tag_values = [] - - filters = filters or [] - filters_to_string = ".".join(filters) - - for tag in tags: - if not filters: - tag_values.append(tag.value) - elif tag.key in filters_to_string: - tag_values.append(tag.value) - - return ".".join(sorted(tag_values)) - - def reduce_candidates(self, mapped_candidates_ami, keep_previous=0): + def fetch_and_prepare(self): - """ - Given a array of AMIs to clean this function return a subsequent - list by preserving a given number of them (history) based on creation - time and rotation_strategy param - """ + """ Uses AMICleaner to retrieve candidates AMI, map and reduce """ - if not keep_previous: - return mapped_candidates_ami + cleaner = AMICleaner() - if not mapped_candidates_ami: - return mapped_candidates_ami - - amis = sorted( - mapped_candidates_ami, - key=self.get_ami_sorting_key, - reverse=True + mapped_amis = cleaner.map_candidates( + mapping_strategy=self.mapping_strategy ) - return amis[keep_previous:] - - -def parse_args(args): - parser = argparse.ArgumentParser(description='Clean your AMIs on your ' - 'AWS account. Your AWS ' - 'credentials must be sourced') - - parser.add_argument("--from-ids", - dest='from_ids', - nargs='+', - help="AMI id(s) you simply want to remove") - - parser.add_argument("--full-report", - dest='full_report', - action="store_true", - help="Prints a full report of what to be cleaned") - - parser.add_argument("--mapping-key", - dest='mapping_key', - help="How to regroup AMIs : [name|tags]") - - parser.add_argument("--mapping-values", - dest='mapping_values', - nargs='+', - help="List of values for tags or name") - - parser.add_argument("--keep-previous", - dest='keep_previous', - type=int, - default=KEEP_PREVIOUS, - help="Number of previous AMI to keep excluding those" - "currently being running") - - parser.add_argument("-f", "--force-delete", - dest='force_delete', - action="store_true", - help="Skip confirmation") - - parser.add_argument("-o", "--clean-orphans", - dest='clean_orphans', - action="store_true", - help="Clean orphaned snapshots") - - parsed_args = parser.parse_args(args) - if parsed_args.mapping_key and not parsed_args.mapping_values: - print "missing mapping-values\n" - parser.print_help() - return None - - return parsed_args + if not mapped_amis: + return None + candidates = [] + report = dict() -def fetch_and_prepare(mapping_strategy, keep_previous, full_report=False): + for group_name, amis in mapped_amis.iteritems(): + group_name = group_name or "" - """ Uses AMICleaner to retrieve candidates AMI, map and reduce """ + if not group_name: + report["no-tags (excluded)"] = amis + else: + reduced = cleaner.reduce_candidates(amis, self.keep_previous) + if reduced: + report[group_name] = reduced + candidates.extend(reduced) - cleaner = AMICleaner() + Printer.print_report(report, self.full_report) - print TERM.bold("\nRetrieving AMIs to clean ...") - mapped_amis = cleaner.map_candidates(mapping_strategy=mapping_strategy) + return candidates - if not mapped_amis: - return None + def prepare_delete_amis(self, candidates, from_ids=False): - candidates = [] - report = dict() + """ prepare deletion of candidates AMIs""" - for group_name, amis in mapped_amis.iteritems(): - group_name = group_name or "" + failed = [] - if not group_name: - report["no-tags (excluded)"] = amis + if from_ids: + print TERM.bold("\nCleaning from {} AMI id(s) ...".format( + len(candidates)) + ) + failed = AMICleaner().remove_amis_from_ids(candidates) else: - reduced = cleaner.reduce_candidates(amis, keep_previous) - if reduced: - report[group_name] = reduced - candidates.extend(reduced) + print TERM.bold("\nCleaning {} AMIs ...".format(len(candidates))) + failed = AMICleaner().remove_amis(candidates) - print_report(report, full_report) + if failed: + print TERM.red("\n{0} failed snapshots".format(len(failed))) + Printer.print_failed_snapshots(failed) - return candidates + def clean_orphans(self): + """ Find and removes orphan snapshots """ -def print_report(candidates, full_report=False): + cleaner = OrphanSnapshotCleaner() + snaps = cleaner.fetch() - """ Print results """ + Printer.print_orphan_snapshots(snaps) - if not candidates: - return + answer = raw_input( + "Do you want to continue and remove {} orphan snapshots " + "[y/N] ? : ".format(len(snaps))) + confirm = (answer.lower() == "y") - groups_table = PrettyTable(["Group name", "candidates"]) + if confirm: + print "Removing orphan snapshots... " + cleaner.clean(snaps[:2]) - for group_name, amis in candidates.iteritems(): - groups_table.add_row([group_name, len(amis)]) - eligible_amis_table = PrettyTable( - ["AMI ID", "AMI Name", "Creation Date"] - ) - for ami in amis: - eligible_amis_table.add_row([ - ami.id, - ami.name, - ami.creation_date - ]) - if full_report: - print group_name - print eligible_amis_table.get_string(sortby="AMI Name"), "\n\n" - - print "\nAMIs to be removed:" - print groups_table.get_string(sortby="Group name") + def print_defaults(self): + print TERM.bold("\nDefault values : ==>") + print TERM.green("mapping_key : {0}".format(self.mapping_key)) + print TERM.green("mapping_values : {0}".format(self.mapping_values)) + print TERM.green("keep_previous : {0}".format(self.keep_previous)) -def print_failed_snapshots(failed_snapshots): + def run_cli(self): - """ Print failed snapshots """ + if not self.skip_orphans: + self.clean_orphans() - snap_table = PrettyTable(["Failed Snapshots"]) + if self.from_ids: + self.prepare_delete_amis(self.from_ids, from_ids=True) + else: + # print defaults + self.print_defaults() - for failed_snap in failed_snapshots: - snap_table.add_row([failed_snap]) - print "\nFailed Snapshots:" - print snap_table + print TERM.bold("\nRetrieving AMIs to clean ...") + candidates = self.fetch_and_prepare() + if not candidates: + sys.exit(0) -def prepare_delete_amis(candidates, from_ids=False): + delete = False - """ prepare deletion of candidates AMIs""" + if not self.force_delete: + answer = raw_input( + "Do you want to continue and remove {} AMIs " + "[y/N] ? : ".format(len(candidates))) + delete = (answer.lower() == "y") + else: + delete = True - if from_ids: - print TERM.bold("\nCleaning from {} AMI id(s) ...".format( - len(candidates)) - ) - AMICleaner().remove_amis_from_ids(candidates) - else: - print TERM.bold("\nCleaning {} AMIs ...".format(len(candidates))) - AMICleaner().remove_amis(candidates) + if delete: + self.prepare_delete_amis(candidates) def main(): - """ main entry point for cli """ - args = parse_args(sys.argv[1:]) - if not args: sys.exit(1) - # defaults - mapping_key = args.mapping_key or MAPPING_KEY - mapping_values = args.mapping_values or MAPPING_VALUES - keep_previous = args.keep_previous - - if args.from_ids: - prepare_delete_amis(args.from_ids, True) - else: - # print defaults - print TERM.bold("Default values : ==>") - print TERM.green("mapping_key : {0}".format(mapping_key)) - print TERM.green("mapping_values : {0}".format(mapping_values)) - print TERM.green("keep_previous : {0}".format(keep_previous)) - - mapping_strategy = {"key": mapping_key, "values": mapping_values} - - candidates = fetch_and_prepare( - mapping_strategy, - args.keep_previous, - args.full_report - ) - - if not candidates: - sys.exit(0) - - delete = False - - if not args.force_delete: - answer = raw_input("Do you want to continue and remove {} AMIs " - "[y/N] ? : ".format(len(candidates))) - delete = (answer.lower() == "y") - else: - delete = True - - if delete: - prepare_delete_amis(candidates) + app = App(args) + app.run_cli() if __name__ == "__main__": diff --git a/amicleaner/core.py b/amicleaner/core.py new file mode 100644 index 0000000..ccc1604 --- /dev/null +++ b/amicleaner/core.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python + +import boto3 +from botocore.exceptions import ClientError +from resources.models import AMI, AWSEC2Instance + + +class OrphanSnapshotCleaner: + + """ Finds and removes ebs snapshots left orphaned """ + + def __init__(self): + self.ec2 = boto3.client('ec2') + + def get_snapshots_filter(self): + + return [{ + 'Name': 'status', + 'Values': [ + 'completed', + ]}, { + 'Name': 'description', + 'Values': [ + 'Created by CreateImage*' + ] + }] + + def fetch(self): + + """ retrieve orphan snapshots """ + + resp = self.ec2.describe_images(Owners=['self']) + + used_snaps = [ + ebs.get("Ebs", {}).get("SnapshotId") + for image in resp.get("Images") + for ebs in image.get("BlockDeviceMappings") + ] + snap_filter = self.get_snapshots_filter() + owner_id = resp.get("Images")[0].get("OwnerId") + + # all snapshots created for AMIs + resp = self.ec2.describe_snapshots( + Filters=snap_filter, OwnerIds=[owner_id] + ) + + all_snaps = [snap.get("SnapshotId") for snap in resp["Snapshots"]] + return list(set(all_snaps) - set(used_snaps)) + + def clean(self, snapshots): + + """ + actually deletes the snapshots with an array + of snapshots ids + """ + + snapshots = snapshots or [] + + for snap in snapshots[0:2]: + try: + self.ec2.delete_snapshot(SnapshotId=snap) + except ClientError as e: + self.log("{0} deletion failed : {1}".format(snap, e)) + + def log(self, msg): + print msg + + +class AMICleaner: + + def __init__(self, ec2=None): + self.ec2 = ec2 or boto3.client('ec2') + + @staticmethod + def get_ami_sorting_key(ami): + + """ return a key for sorting array of AMIs """ + + return ami.creation_date + + def remove_amis(self, amis): + + """ + deregister AMIs (array) and removes related snapshots + :param amis: array of AMI objects + """ + + failed_snapshots = [] + + amis = amis or [] + for ami in amis: + self.ec2.deregister_image(ImageId=ami.id) + print "{0} deregistered".format(ami.id) + for block_device in ami.block_device_mappings: + try: + self.ec2.delete_snapshot( + SnapshotId=block_device.snapshot_id + ) + except ClientError: + failed_snapshots.append(block_device.snapshot_id) + print "{0} deleted\n".format(block_device.snapshot_id) + + return failed_snapshots + + def remove_amis_from_ids(self, ami_ids): + + """ + takes a list of AMI ids, verify on aws and removes them + :param ami_ids: array of AMI ids + """ + + if not ami_ids: + return False + + my_custom_images = self.ec2.describe_images( + Owners=['self'], + ImageIds=ami_ids + ) + amis = [] + for image_json in my_custom_images.get('Images'): + ami = AMI.object_with_json(image_json) + amis.append(ami) + + return self.remove_amis(amis) + + def fetch_available_amis(self): + + """ Retrieve from your aws account your custom AMIs""" + + available_amis = dict() + + my_custom_images = self.ec2.describe_images(Owners=['self']) + for image_json in my_custom_images.get('Images'): + ami = AMI.object_with_json(image_json) + available_amis[ami.id] = ami + + return available_amis + + def fetch_instances(self): + + """ Retrieve from your aws account your running ec2 instances """ + + ec2_instances = dict() + + my_instances = self.ec2.describe_instances() + for reservation in my_instances.get("Reservations", []): + for instance_json in reservation.get("Instances", []): + ec2_instance = AWSEC2Instance.object_with_json(instance_json) + ec2_instances[ec2_instance.image_id] = ec2_instance + + return ec2_instances + + def fetch_candidates(self, amis_dict=None, instances_dict=None): + + """ + Collects AMIs and ec2 instances (as dicts) and returns AMIs not holded + by instances. Both dicts have as keys an ami-id + """ + + amis_dict = amis_dict or self.fetch_available_amis() + instances_dict = instances_dict or self.fetch_instances() + + for instance_image_id, ec2_instance in instances_dict.iteritems(): + amis_dict.pop(instance_image_id, None) + + return amis_dict.values() + + def map_candidates(self, candidates_ami=None, mapping_strategy=None): + + """ + Given a dict of AMIs to clean, and a mapping strategy (see config.py), + this function returns a dict of grouped amis with the mapping strategy + name as a key. + + example : + mapping_strategy = {"key": "name", "values": ["ubuntu", "debian"]} + or + mapping_strategy = {"key": "tags", "values": ["env", "role"]} + + print map_candidates(candidates_ami, mapping_strategy) + ==> + { + "ubuntu": [obj1, obj3], + "debian": [obj2, obj5] + } + + or + ==> + { + "prod.nginx": [obj1, obj3], + "prod.tomcat": [obj2, obj5], + "test.nginx": [obj6, obj7], + } + """ + + mapping_strategy = mapping_strategy or {} + + if not mapping_strategy: + return candidates_ami + + candidates_ami = candidates_ami or self.fetch_candidates() + + candidates_map = dict() + for ami in candidates_ami: + # case : grouping on name + if mapping_strategy.get("key") == "name": + for mapping_value in mapping_strategy.get("values"): + if mapping_value in ami.name: + mapping_list = candidates_map.get(mapping_value) or [] + mapping_list.append(ami) + candidates_map[mapping_value] = mapping_list + # case : grouping on tags + elif mapping_strategy.get("key") == "tags": + mapping_value = self.tags_values_to_string( + ami.tags, + mapping_strategy.get("values") + ) + mapping_list = candidates_map.get(mapping_value) or [] + mapping_list.append(ami) + candidates_map[mapping_value] = mapping_list + + return candidates_map + + @staticmethod + def tags_values_to_string(tags, filters=None): + """ + filters tags(key,value) array and return a string with tags values + :tags is an array of AWSTag objects + """ + + if tags is None: + return None + + tag_values = [] + + filters = filters or [] + filters_to_string = ".".join(filters) + + for tag in tags: + if not filters: + tag_values.append(tag.value) + elif tag.key in filters_to_string: + tag_values.append(tag.value) + + return ".".join(sorted(tag_values)) + + def reduce_candidates(self, mapped_candidates_ami, keep_previous=0): + + """ + Given a array of AMIs to clean this function return a subsequent + list by preserving a given number of them (history) based on creation + time and rotation_strategy param + """ + + if not keep_previous: + return mapped_candidates_ami + + if not mapped_candidates_ami: + return mapped_candidates_ami + + amis = sorted( + mapped_candidates_ami, + key=self.get_ami_sorting_key, + reverse=True + ) + + return amis[keep_previous:] diff --git a/amicleaner/utils.py b/amicleaner/utils.py new file mode 100644 index 0000000..978dfb9 --- /dev/null +++ b/amicleaner/utils.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python +import argparse + +from prettytable import PrettyTable + +from resources.config import KEEP_PREVIOUS + + +class Printer: + + """ Pretty table prints methods """ + @staticmethod + def print_report(candidates, full_report=False): + + """ Print AMI collection results """ + + if not candidates: + return + + groups_table = PrettyTable(["Group name", "candidates"]) + + for group_name, amis in candidates.iteritems(): + groups_table.add_row([group_name, len(amis)]) + eligible_amis_table = PrettyTable( + ["AMI ID", "AMI Name", "Creation Date"] + ) + for ami in amis: + eligible_amis_table.add_row([ + ami.id, + ami.name, + ami.creation_date + ]) + if full_report: + print group_name + print eligible_amis_table.get_string(sortby="AMI Name"), "\n\n" + + print "\nAMIs to be removed:" + print groups_table.get_string(sortby="Group name") + + @staticmethod + def print_failed_snapshots(snapshots): + + snap_table = PrettyTable(["Failed Snapshots"]) + + for snap in snapshots: + snap_table.add_row([snap]) + print snap_table + + @staticmethod + def print_orphan_snapshots(snapshots): + + snap_table = PrettyTable(["Orphan Snapshots"]) + + for snap in snapshots: + snap_table.add_row([snap]) + print snap_table + + +def parse_args(args): + parser = argparse.ArgumentParser(description='Clean your AMIs on your ' + 'AWS account. Your AWS ' + 'credentials must be sourced') + + parser.add_argument("--from-ids", + dest='from_ids', + nargs='+', + help="AMI id(s) you simply want to remove") + + parser.add_argument("--full-report", + dest='full_report', + action="store_true", + help="Prints a full report of what to be cleaned") + + parser.add_argument("--mapping-key", + dest='mapping_key', + help="How to regroup AMIs : [name|tags]") + + parser.add_argument("--mapping-values", + dest='mapping_values', + nargs='+', + help="List of values for tags or name") + + parser.add_argument("--keep-previous", + dest='keep_previous', + type=int, + default=KEEP_PREVIOUS, + help="Number of previous AMI to keep excluding those" + "currently being running") + + parser.add_argument("-f", "--force-delete", + dest='force_delete', + action="store_true", + help="Skip confirmation") + + parser.add_argument("--skip-orphans", + dest='skip_orphans', + action="store_true", + help="Skip orphaned snapshots cleaning") + + parsed_args = parser.parse_args(args) + if parsed_args.mapping_key and not parsed_args.mapping_values: + print "missing mapping-values\n" + parser.print_help() + return None + + return parsed_args diff --git a/tests/test_cli.py b/tests/test_cli.py index e95fd7a..a301549 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,258 +1,61 @@ import json -import unittest -import boto3 -from datetime import datetime +import boto3 from moto import mock_ec2 -from amicleaner.cli import AMICleaner, parse_args, fetch_and_prepare -from amicleaner.cli import prepare_delete_amis -from amicleaner.cli import print_report, print_failed_snapshots -from amicleaner.resources.models import AMI, AWSEC2Instance, AWSTag - - -def test_fetch_candidates(): - # creating tests objects - first_ami = AMI() - first_ami.id = 'ami-28c2b348' - first_ami.creation_date = datetime.now() - - first_instance = AWSEC2Instance() - first_instance.id = 'i-9f9f6a2a' - first_instance.name = "first-instance" - first_instance.image_id = first_ami.id - first_instance.launch_time = datetime.now() - - second_ami = AMI() - second_ami.id = 'unused-ami' - second_ami.creation_date = datetime.now() - - second_instance = AWSEC2Instance() - second_instance.id = 'i-9f9f6a2a' - second_instance.name = "second-instance" - second_instance.image_id = first_ami.id - second_instance.launch_time = datetime.now() - - # constructing dicts - amis_dict = dict() - amis_dict[first_ami.id] = first_ami - amis_dict[second_ami.id] = second_ami - - instances_dict = dict() - instances_dict[first_instance.image_id] = instances_dict - instances_dict[second_instance.image_id] = second_instance - - # testing filter - unused_ami_dict = AMICleaner().fetch_candidates(amis_dict, instances_dict) - assert len(unused_ami_dict) == 1 - assert amis_dict.get('unused-ami') is not None - - -def test_map_candidates_with_null_arguments(): - assert AMICleaner().map_candidates({}, {}) == {} - - -def test_tags_values_to_string(): - first_tag = AWSTag() - first_tag.key = "Key1" - first_tag.value = "Value1" - - second_tag = AWSTag() - second_tag.key = "Key2" - second_tag.value = "Value2" - - third_tag = AWSTag() - third_tag.key = "Key3" - third_tag.value = "Value3" - - fourth_tag = AWSTag() - fourth_tag.key = "Key4" - fourth_tag.value = "Value4" - - tags = [first_tag, third_tag, second_tag, fourth_tag] - filters = ["Key2", "Key3"] - - tags_values_string = AMICleaner.tags_values_to_string(tags, filters) - assert tags_values_string is not None - assert tags_values_string == "Value2.Value3" - - -def test_tags_values_to_string_with_none(): - assert AMICleaner.tags_values_to_string(None) is None - - -def test_tags_values_to_string_without_filters(): - first_tag = AWSTag() - first_tag.key = "Key1" - first_tag.value = "Value1" - - second_tag = AWSTag() - second_tag.key = "Key2" - second_tag.value = "Value2" - - third_tag = AWSTag() - third_tag.key = "Key3" - third_tag.value = "Value3" - - tags = [first_tag, third_tag, second_tag] - filters = [] - - tags_values_string = AMICleaner.tags_values_to_string(tags, filters) - assert tags_values_string is not None - assert tags_values_string == "Value1.Value2.Value3" - - -def test_map_with_names(): - # creating tests objects - first_ami = AMI() - first_ami.id = 'ami-28c2b348' - first_ami.name = "ubuntu-20160102" - first_ami.creation_date = datetime.now() - - second_ami = AMI() - second_ami.id = 'ami-28c2b349' - second_ami.name = "ubuntu-20160103" - second_ami.creation_date = datetime.now() - - third_ami = AMI() - third_ami.id = 'ami-28c2b350' - third_ami.name = "debian-20160104" - third_ami.creation_date = datetime.now() +from amicleaner.cli import App +from amicleaner.core import AMICleaner +from amicleaner.resources.models import AMI +from amicleaner.utils import parse_args, Printer - # creating amis to drop dict - candidates = [first_ami, second_ami, third_ami] - # grouping strategy - grouping_strategy = {"key": "name", "values": ["ubuntu", "debian"]} - - grouped_amis = AMICleaner().map_candidates(candidates, grouping_strategy) - assert grouped_amis is not None - assert len(grouped_amis.get('ubuntu')) == 2 - assert len(grouped_amis.get('debian')) == 1 - - -def test_map_with_tags(): - # tags - stack_tag = AWSTag() - stack_tag.key = "stack" - stack_tag.value = "web-server" - - env_tag = AWSTag() - env_tag.key = "env" - env_tag.value = "prod" - - # creating tests objects - # prod and web-server - first_ami = AMI() - first_ami.id = 'ami-28c2b348' - first_ami.name = "ubuntu-20160102" - first_ami.tags.append(stack_tag) - first_ami.tags.append(env_tag) - first_ami.creation_date = datetime.now() - - # just prod - second_ami = AMI() - second_ami.id = 'ami-28c2b349' - second_ami.name = "ubuntu-20160103" - second_ami.tags.append(env_tag) - second_ami.creation_date = datetime.now() - - # prod and web-server - third_ami = AMI() - third_ami.id = 'ami-28c2b350' - third_ami.name = "debian-20160104" - third_ami.tags.append(stack_tag) - third_ami.tags.append(env_tag) - third_ami.creation_date = datetime.now() - - # creating amis to drop dict - candidates = [first_ami, second_ami, third_ami] - - # grouping strategy - grouping_strategy = {"key": "tags", "values": ["stack", "env"]} - grouped_amis = AMICleaner().map_candidates(candidates, grouping_strategy) - assert grouped_amis is not None - assert len(grouped_amis.get("prod")) == 1 - assert len(grouped_amis.get("prod.web-server")) == 2 - - -def test_reduce_without_rotation_number(): - # creating tests objects - first_ami = AMI() - first_ami.id = 'ami-28c2b348' - first_ami.name = "ubuntu-20160102" - first_ami.creation_date = datetime(2016, 1, 10) - - # just prod - second_ami = AMI() - second_ami.id = 'ami-28c2b349' - second_ami.name = "ubuntu-20160103" - second_ami.creation_date = datetime(2016, 1, 11) - - # prod and web-server - third_ami = AMI() - third_ami.id = 'ami-28c2b350' - third_ami.name = "debian-20160104" - third_ami.creation_date = datetime(2016, 1, 12) - - # creating amis to drop dict - candidates = [second_ami, third_ami, first_ami] - - assert AMICleaner().reduce_candidates(candidates) == candidates - - -def test_reduce(): - # creating tests objects - first_ami = AMI() - first_ami.id = 'ami-28c2b348' - first_ami.name = "ubuntu-20160102" - first_ami.creation_date = datetime(2016, 1, 10) - - # just prod - second_ami = AMI() - second_ami.id = 'ami-28c2b349' - second_ami.name = "ubuntu-20160103" - second_ami.creation_date = datetime(2016, 1, 11) +@mock_ec2 +def test_fetch_and_prepare(): + parser = parse_args(['--keep-previous', '0']) + assert App(parser).fetch_and_prepare() is None - # prod and web-server - third_ami = AMI() - third_ami.id = 'ami-28c2b350' - third_ami.name = "debian-20160104" - third_ami.creation_date = datetime(2016, 1, 12) - # keep 2 recent amis - candidates = [second_ami, third_ami, first_ami] - rotation_number = 2 - cleaner = AMICleaner() - left = cleaner.reduce_candidates(candidates, rotation_number) - assert len(left) == 1 - assert left[0].id == first_ami.id +@mock_ec2 +def test_deletion(): + """ Test deletion methods """ - # keep 1 recent ami - rotation_number = 1 - left = cleaner.reduce_candidates(candidates, rotation_number) - assert len(left) == 2 - assert left[0].id == second_ami.id + base_ami = "ami-1234abcd" - # keep 5 recent amis - rotation_number = 5 - left = cleaner.reduce_candidates(candidates, rotation_number) - assert len(left) == 0 + parser = parse_args( + [ + '--keep-previous', '0', + '--mapping-key', 'name', + '--mapping-values', 'test-ami'] + ) + conn = boto3.client('ec2') + reservation = conn.run_instances( + ImageId=base_ami, MinCount=1, MaxCount=1 + ) + instance = reservation["Instances"][0] -def test_remove_ami_from_none(): - assert AMICleaner().remove_amis(None) is True + # create amis + images = [] + for i in xrange(5): + image = conn.create_image( + InstanceId=instance.get("InstanceId"), + Name="test-ami" + ) + images.append(image.get("ImageId")) + # delete one by id + app = App(parser) + assert len(AMICleaner(conn).fetch_available_amis()) == 5 + assert app.prepare_delete_amis( + candidates=[images[4]], from_ids=True + ) is None + assert len(AMICleaner(conn).fetch_available_amis()) == 4 -@mock_ec2 -def test_remove_ami_from_ids(): - pass - """ - ami_backend = AmiBackend() - ami_backend.create_image("instance-id", "linux") - print ami_backend.amis - assert AMICleaner().remove_amis_from_ids(["ami-02197662"]) is True - """ + # delete with mapping strategy + candidates = app.fetch_and_prepare() + assert len(candidates) == 4 + assert app.prepare_delete_amis(candidates) is None + assert len(AMICleaner(conn).fetch_available_amis()) == 0 def test_parse_args_no_args(): @@ -280,78 +83,22 @@ def test_parse_args(): assert len(parser.mapping_values) == 2 -@mock_ec2 -def test_fetch_and_prepare(): - assert fetch_and_prepare({}, 0) is None - - def test_print_report(): - assert print_report({}) is None + assert Printer.print_report({}) is None with open("tests/mocks/ami.json") as mock_file: json_to_parse = json.load(mock_file) ami = AMI.object_with_json(json_to_parse) candidates = {'test': [ami]} - assert print_report(candidates) is None - assert print_report(candidates, full_report=True) is None + assert Printer.print_report(candidates) is None + assert Printer.print_report(candidates, full_report=True) is None def test_print_failed_snapshots(): - assert print_failed_snapshots({}) is None - assert print_failed_snapshots(["ami-one", "ami-two"]) is None - - -@mock_ec2 -def test_fetch_instances(): - """ Tests fetch instances and AMIs """ - - base_ami = "ami-1234abcd" - - conn = boto3.client('ec2') - reservation = conn.run_instances( - ImageId=base_ami, MinCount=1, MaxCount=1 - ) - instance = reservation["Instances"][0] - assert instance.get("ImageId") == base_ami - - # Test fetch instances method - assert len(AMICleaner(conn).fetch_instances()) == 1 + assert Printer.print_failed_snapshots({}) is None + assert Printer.print_failed_snapshots(["ami-one", "ami-two"]) is None -@mock_ec2 -def test_deletion(): - """ Test deletion methods """ - - base_ami = "ami-1234abcd" - - conn = boto3.client('ec2') - reservation = conn.run_instances( - ImageId=base_ami, MinCount=1, MaxCount=1 - ) - instance = reservation["Instances"][0] - - # create amis - images = [] - for i in xrange(5): - image = conn.create_image( - InstanceId=instance.get("InstanceId"), - Name="test-ami" - ) - images.append(image.get("ImageId")) - - # delete one by id - assert len(AMICleaner(conn).fetch_available_amis()) == 5 - assert prepare_delete_amis(candidates=[images[4]], from_ids=True) is None - assert len(AMICleaner(conn).fetch_available_amis()) == 4 - - # delete with mapping strategy - mapping_strategy = {"key": "name", "values": ["test-ami"]} - - candidates = fetch_and_prepare( - mapping_strategy, - keep_previous=0, - full_report=False - ) - assert len(candidates) == 4 - assert prepare_delete_amis(candidates) is None - assert len(AMICleaner(conn).fetch_available_amis()) == 0 +def test_print_orphan_snapshots(): + assert Printer.print_orphan_snapshots({}) is None + assert Printer.print_orphan_snapshots(["ami-one", "ami-two"]) is None diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 0000000..809d4b4 --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,256 @@ +import boto3 +from datetime import datetime +from moto import mock_ec2 + +from amicleaner.core import AMICleaner +from amicleaner.resources.models import AMI, AWSEC2Instance, AWSTag + + +def test_fetch_candidates(): + # creating tests objects + first_ami = AMI() + first_ami.id = 'ami-28c2b348' + first_ami.creation_date = datetime.now() + + first_instance = AWSEC2Instance() + first_instance.id = 'i-9f9f6a2a' + first_instance.name = "first-instance" + first_instance.image_id = first_ami.id + first_instance.launch_time = datetime.now() + + second_ami = AMI() + second_ami.id = 'unused-ami' + second_ami.creation_date = datetime.now() + + second_instance = AWSEC2Instance() + second_instance.id = 'i-9f9f6a2a' + second_instance.name = "second-instance" + second_instance.image_id = first_ami.id + second_instance.launch_time = datetime.now() + + # constructing dicts + amis_dict = dict() + amis_dict[first_ami.id] = first_ami + amis_dict[second_ami.id] = second_ami + + instances_dict = dict() + instances_dict[first_instance.image_id] = instances_dict + instances_dict[second_instance.image_id] = second_instance + + # testing filter + unused_ami_dict = AMICleaner().fetch_candidates(amis_dict, instances_dict) + assert len(unused_ami_dict) == 1 + assert amis_dict.get('unused-ami') is not None + + +def test_map_candidates_with_null_arguments(): + assert AMICleaner().map_candidates({}, {}) == {} + + +def test_tags_values_to_string(): + first_tag = AWSTag() + first_tag.key = "Key1" + first_tag.value = "Value1" + + second_tag = AWSTag() + second_tag.key = "Key2" + second_tag.value = "Value2" + + third_tag = AWSTag() + third_tag.key = "Key3" + third_tag.value = "Value3" + + fourth_tag = AWSTag() + fourth_tag.key = "Key4" + fourth_tag.value = "Value4" + + tags = [first_tag, third_tag, second_tag, fourth_tag] + filters = ["Key2", "Key3"] + + tags_values_string = AMICleaner.tags_values_to_string(tags, filters) + assert tags_values_string is not None + assert tags_values_string == "Value2.Value3" + + +def test_tags_values_to_string_with_none(): + assert AMICleaner.tags_values_to_string(None) is None + + +def test_tags_values_to_string_without_filters(): + first_tag = AWSTag() + first_tag.key = "Key1" + first_tag.value = "Value1" + + second_tag = AWSTag() + second_tag.key = "Key2" + second_tag.value = "Value2" + + third_tag = AWSTag() + third_tag.key = "Key3" + third_tag.value = "Value3" + + tags = [first_tag, third_tag, second_tag] + filters = [] + + tags_values_string = AMICleaner.tags_values_to_string(tags, filters) + assert tags_values_string is not None + assert tags_values_string == "Value1.Value2.Value3" + + +def test_map_with_names(): + # creating tests objects + first_ami = AMI() + first_ami.id = 'ami-28c2b348' + first_ami.name = "ubuntu-20160102" + first_ami.creation_date = datetime.now() + + second_ami = AMI() + second_ami.id = 'ami-28c2b349' + second_ami.name = "ubuntu-20160103" + second_ami.creation_date = datetime.now() + + third_ami = AMI() + third_ami.id = 'ami-28c2b350' + third_ami.name = "debian-20160104" + third_ami.creation_date = datetime.now() + + # creating amis to drop dict + candidates = [first_ami, second_ami, third_ami] + + # grouping strategy + grouping_strategy = {"key": "name", "values": ["ubuntu", "debian"]} + + grouped_amis = AMICleaner().map_candidates(candidates, grouping_strategy) + assert grouped_amis is not None + assert len(grouped_amis.get('ubuntu')) == 2 + assert len(grouped_amis.get('debian')) == 1 + + +def test_map_with_tags(): + # tags + stack_tag = AWSTag() + stack_tag.key = "stack" + stack_tag.value = "web-server" + + env_tag = AWSTag() + env_tag.key = "env" + env_tag.value = "prod" + + # creating tests objects + # prod and web-server + first_ami = AMI() + first_ami.id = 'ami-28c2b348' + first_ami.name = "ubuntu-20160102" + first_ami.tags.append(stack_tag) + first_ami.tags.append(env_tag) + first_ami.creation_date = datetime.now() + + # just prod + second_ami = AMI() + second_ami.id = 'ami-28c2b349' + second_ami.name = "ubuntu-20160103" + second_ami.tags.append(env_tag) + second_ami.creation_date = datetime.now() + + # prod and web-server + third_ami = AMI() + third_ami.id = 'ami-28c2b350' + third_ami.name = "debian-20160104" + third_ami.tags.append(stack_tag) + third_ami.tags.append(env_tag) + third_ami.creation_date = datetime.now() + + # creating amis to drop dict + candidates = [first_ami, second_ami, third_ami] + + # grouping strategy + grouping_strategy = {"key": "tags", "values": ["stack", "env"]} + grouped_amis = AMICleaner().map_candidates(candidates, grouping_strategy) + assert grouped_amis is not None + assert len(grouped_amis.get("prod")) == 1 + assert len(grouped_amis.get("prod.web-server")) == 2 + + +def test_reduce_without_rotation_number(): + # creating tests objects + first_ami = AMI() + first_ami.id = 'ami-28c2b348' + first_ami.name = "ubuntu-20160102" + first_ami.creation_date = datetime(2016, 1, 10) + + # just prod + second_ami = AMI() + second_ami.id = 'ami-28c2b349' + second_ami.name = "ubuntu-20160103" + second_ami.creation_date = datetime(2016, 1, 11) + + # prod and web-server + third_ami = AMI() + third_ami.id = 'ami-28c2b350' + third_ami.name = "debian-20160104" + third_ami.creation_date = datetime(2016, 1, 12) + + # creating amis to drop dict + candidates = [second_ami, third_ami, first_ami] + + assert AMICleaner().reduce_candidates(candidates) == candidates + + +def test_reduce(): + # creating tests objects + first_ami = AMI() + first_ami.id = 'ami-28c2b348' + first_ami.name = "ubuntu-20160102" + first_ami.creation_date = datetime(2016, 1, 10) + + # just prod + second_ami = AMI() + second_ami.id = 'ami-28c2b349' + second_ami.name = "ubuntu-20160103" + second_ami.creation_date = datetime(2016, 1, 11) + + # prod and web-server + third_ami = AMI() + third_ami.id = 'ami-28c2b350' + third_ami.name = "debian-20160104" + third_ami.creation_date = datetime(2016, 1, 12) + + # keep 2 recent amis + candidates = [second_ami, third_ami, first_ami] + rotation_number = 2 + cleaner = AMICleaner() + left = cleaner.reduce_candidates(candidates, rotation_number) + assert len(left) == 1 + assert left[0].id == first_ami.id + + # keep 1 recent ami + rotation_number = 1 + left = cleaner.reduce_candidates(candidates, rotation_number) + assert len(left) == 2 + assert left[0].id == second_ami.id + + # keep 5 recent amis + rotation_number = 5 + left = cleaner.reduce_candidates(candidates, rotation_number) + assert len(left) == 0 + + +def test_remove_ami_from_none(): + assert AMICleaner().remove_amis(None) == [] + + +@mock_ec2 +def test_fetch_instances(): + """ Tests fetch instances and AMIs """ + + base_ami = "ami-1234abcd" + + conn = boto3.client('ec2') + reservation = conn.run_instances( + ImageId=base_ami, MinCount=1, MaxCount=1 + ) + instance = reservation["Instances"][0] + assert instance.get("ImageId") == base_ami + + # Test fetch instances method + assert len(AMICleaner(conn).fetch_instances()) == 1