diff --git a/aws-replicator/aws_replicator/client/auth_proxy.py b/aws-replicator/aws_replicator/client/auth_proxy.py index 966abe2..2e75dee 100644 --- a/aws-replicator/aws_replicator/client/auth_proxy.py +++ b/aws-replicator/aws_replicator/client/auth_proxy.py @@ -14,7 +14,6 @@ from botocore.model import OperationModel from localstack import config from localstack import config as localstack_config -from localstack.aws.protocol.parser import create_parser from localstack.aws.spec import load_service from localstack.config import external_service_url from localstack.constants import AWS_REGION_US_EAST_1, DOCKER_IMAGE_NAME_PRO @@ -158,6 +157,8 @@ def register_in_instance(self): def _parse_aws_request( self, request: Request, service_name: str, region_name: str, client ) -> Tuple[OperationModel, AWSPreparedRequest, Dict]: + from localstack.aws.protocol.parser import create_parser + parser = create_parser(load_service(service_name)) operation_model, parsed_request = parser.parse(request) request_context = { diff --git a/aws-replicator/aws_replicator/client/cli.py b/aws-replicator/aws_replicator/client/cli.py index 8c90918..422e6be 100644 --- a/aws-replicator/aws_replicator/client/cli.py +++ b/aws-replicator/aws_replicator/client/cli.py @@ -67,10 +67,7 @@ def _is_logged_in() -> bool: required=False, ) def cmd_aws_proxy(services: str, config: str, container: bool, port: int, host: str): - from aws_replicator.client.auth_proxy import ( - start_aws_auth_proxy, - start_aws_auth_proxy_in_container, - ) + from aws_replicator.client.auth_proxy import start_aws_auth_proxy_in_container config_json: ProxyConfig = {"services": {}} if config: @@ -84,6 +81,10 @@ def cmd_aws_proxy(services: str, config: str, container: bool, port: int, host: try: if container: return start_aws_auth_proxy_in_container(config_json) + + # note: deferring the import here, to avoid import errors in CLI context + from aws_replicator.client.auth_proxy import start_aws_auth_proxy + proxy = start_aws_auth_proxy(config_json, port=port) proxy.join() except Exception as e: diff --git a/aws-replicator/aws_replicator/client/service_states.py b/aws-replicator/aws_replicator/client/service_states.py index 1cad6d4..148339b 100644 --- a/aws-replicator/aws_replicator/client/service_states.py +++ b/aws-replicator/aws_replicator/client/service_states.py @@ -1,14 +1,17 @@ import logging -from typing import Dict, Type +from typing import Dict, Optional, Type import boto3 from botocore.client import BaseClient from localstack.services.cloudformation.models.s3 import S3Bucket +from localstack.services.cloudformation.service_models import GenericBaseModel from localstack.utils.aws import aws_stack +from localstack.utils.objects import get_all_subclasses from localstack.utils.threads import parallelize from aws_replicator.client.utils import post_request_to_instance -from aws_replicator.shared.models import ExtendedResourceStateReplicator, ReplicateStateRequest +from aws_replicator.shared.models import ReplicateStateRequest +from aws_replicator.shared.utils import get_resource_type LOG = logging.getLogger(__name__) @@ -23,6 +26,37 @@ def wrapper(wrapping_clazz): return wrapper +# TODO: remove / adjust to use latest upstream CFn models! +class ExtendedResourceStateReplicator(GenericBaseModel): + """Extended resource models, used to replicate (inject) additional state into a resource instance""" + + def add_extended_state_external(self, remote_client: BaseClient = None): + """Called in the context of external CLI execution to fetch/replicate resource details from a remote account""" + + def add_extended_state_internal(self, state: Dict): + """Called in the context of the internal LocalStack instance to inject the state into a resource""" + + @classmethod + def get_resource_instance(cls, resource: Dict) -> Optional["ExtendedResourceStateReplicator"]: + resource_type = get_resource_type(resource) + resource_class = cls.find_resource_classes().get(resource_type) + if resource_class: + return resource_class(resource) + + @classmethod + def get_resource_class( + cls, resource_type: str + ) -> Optional[Type["ExtendedResourceStateReplicator"]]: + return cls.find_resource_classes().get(resource_type) + + @classmethod + def find_resource_classes(cls) -> Dict[str, "ExtendedResourceStateReplicator"]: + return { + inst.cloudformation_type(): inst + for inst in get_all_subclasses(ExtendedResourceStateReplicator) + } + + # resource-specific replications diff --git a/aws-replicator/aws_replicator/shared/models.py b/aws-replicator/aws_replicator/shared/models.py index 2d64a6d..5644212 100644 --- a/aws-replicator/aws_replicator/shared/models.py +++ b/aws-replicator/aws_replicator/shared/models.py @@ -1,46 +1,10 @@ import logging from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Type, TypedDict, Union - -from botocore.client import BaseClient -from localstack.services.cloudformation.service_models import GenericBaseModel -from localstack.utils.objects import get_all_subclasses - -from aws_replicator.shared.utils import get_resource_type +from typing import Any, Dict, List, Optional, TypedDict, Union LOG = logging.getLogger(__name__) -class ExtendedResourceStateReplicator(GenericBaseModel): - """Extended resource models, used to replicate (inject) additional state into a resource instance""" - - def add_extended_state_external(self, remote_client: BaseClient = None): - """Called in the context of external CLI execution to fetch/replicate resource details from a remote account""" - - def add_extended_state_internal(self, state: Dict): - """Called in the context of the internal LocalStack instance to inject the state into a resource""" - - @classmethod - def get_resource_instance(cls, resource: Dict) -> Optional["ExtendedResourceStateReplicator"]: - resource_type = get_resource_type(resource) - resource_class = cls.find_resource_classes().get(resource_type) - if resource_class: - return resource_class(resource) - - @classmethod - def get_resource_class( - cls, resource_type: str - ) -> Optional[Type["ExtendedResourceStateReplicator"]]: - return cls.find_resource_classes().get(resource_type) - - @classmethod - def find_resource_classes(cls) -> Dict[str, "ExtendedResourceStateReplicator"]: - return { - inst.cloudformation_type(): inst - for inst in get_all_subclasses(ExtendedResourceStateReplicator) - } - - class ReplicateStateRequest(TypedDict): """ Represents a request sent from the CLI to the extension request