Skip to content

Commit

Permalink
feat: Workspaceapi (#7493)
Browse files Browse the repository at this point in the history
  • Loading branch information
archinksagar committed Mar 21, 2024
1 parent fd21ccb commit 8123e6d
Show file tree
Hide file tree
Showing 12 changed files with 1,706 additions and 7 deletions.
1 change: 1 addition & 0 deletions moto/backend_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,6 @@
),
("transcribe", re.compile("https?://transcribe\\.(.+)\\.amazonaws\\.com")),
("wafv2", re.compile("https?://wafv2\\.(.+)\\.amazonaws.com")),
("workspaces", re.compile("https?://workspaces\\.(.+)\\.amazonaws\\.com")),
("xray", re.compile("https?://xray\\.(.+)\\.amazonaws.com")),
]
4 changes: 4 additions & 0 deletions moto/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
from moto.timestreamwrite.models import TimestreamWriteBackend
from moto.transcribe.models import TranscribeBackend
from moto.wafv2.models import WAFV2Backend
from moto.workspaces.models import WorkSpacesBackend
from moto.xray.models import XRayBackend


Expand Down Expand Up @@ -295,6 +296,7 @@ def get_service_from_url(url: str) -> Optional[str]:
"Literal['timestream-write']",
"Literal['transcribe']",
"Literal['wafv2']",
"Literal['workspaces']",
"Literal['xray']",
]

Expand Down Expand Up @@ -564,6 +566,8 @@ def get_backend(name: "Literal['transcribe']") -> "BackendDict[TranscribeBackend
@overload
def get_backend(name: "Literal['wafv2']") -> "BackendDict[WAFV2Backend]": ...
@overload
def get_backend(name: "Literal['workspaces']") -> "BackendDict[WorkSpacesBackend]": ...
@overload
def get_backend(name: "Literal['xray']") -> "BackendDict[XRayBackend]": ...
# fmt: on

Expand Down
56 changes: 54 additions & 2 deletions moto/resourcegroupstaggingapi/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
from moto.sns.models import SNSBackend, sns_backends
from moto.sqs.models import SQSBackend, sqs_backends
from moto.ssm.models import SimpleSystemManagerBackend, ssm_backends
from moto.workspaces.models import WorkSpacesBackend, workspaces_backends
from moto.utilities.tagging_service import TaggingService

# Left: EC2 ElastiCache RDS ELB CloudFront WorkSpaces Lambda EMR Glacier Kinesis Redshift Route53
# Left: EC2 ElastiCache RDS ELB CloudFront Lambda EMR Glacier Kinesis Redshift Route53
# StorageGateway DynamoDB MachineLearning ACM DirectConnect DirectoryService CloudHSM
# Inspector Elasticsearch

Expand Down Expand Up @@ -120,6 +121,10 @@ def backup_backend(self) -> BackupBackend:
def dynamodb_backend(self) -> DynamoDBBackend:
return dynamodb_backends[self.account_id][self.region_name]

@property
def workspaces_backend(self) -> WorkSpacesBackend:
return workspaces_backends[self.account_id][self.region_name]

def _get_resources_generator(
self,
tag_filters: Optional[List[Dict[str, Any]]] = None,
Expand Down Expand Up @@ -533,6 +538,48 @@ def format_tag_keys(
}


# Workspaces
if not resource_type_filters or "workspaces" in resource_type_filters:
for ws in self.workspaces_backend.workspaces.values():
tags = format_tag_keys(ws.tags, ["Key", "Value"])
if not tags or not tag_filter(
tags
): # Skip if no tags, or invalid filter
continue

yield {
"ResourceARN": f"arn:aws:workspaces:{self.region_name}:{self.account_id}:workspace/{ws.workspace_id}",
"Tags": tags,
}

# Workspace Directories
if not resource_type_filters or "workspaces-directory" in resource_type_filters:
for wd in self.workspaces_backend.workspace_directories.values():
tags = format_tag_keys(wd.tags, ["Key", "Value"])
if not tags or not tag_filter(
tags
): # Skip if no tags, or invalid filter
continue

yield {
"ResourceARN": f"arn:aws:workspaces:{self.region_name}:{self.account_id}:directory/{wd.directory_id}",
"Tags": tags,
}

# Workspace Images
if not resource_type_filters or "workspaces-image" in resource_type_filters:
for wi in self.workspaces_backend.workspace_images.values():
tags = format_tag_keys(wi.tags, ["Key", "Value"])
if not tags or not tag_filter(
tags
): # Skip if no tags, or invalid filter
continue

yield {
"ResourceARN": f"arn:aws:workspaces:{self.region_name}:{self.account_id}:workspaceimage/{wi.image_id}",
"Tags": tags,
}

# VPC
if (
not resource_type_filters
Expand Down Expand Up @@ -885,7 +932,12 @@ def tag_resources(
self.rds_backend.add_tags_to_resource(
arn, TaggingService.convert_dict_to_tags_input(tags)
)
if arn.startswith("arn:aws:logs:"):
elif arn.startswith("arn:aws:workspaces:"):
resource_id = arn.split("/")[-1]
self.workspaces_backend.create_tags(
resource_id, TaggingService.convert_dict_to_tags_input(tags)
)
elif arn.startswith("arn:aws:logs:"):
self.logs_backend.tag_resource(arn, tags)
if arn.startswith("arn:aws:dynamodb"):
self.dynamodb_backend.tag_resource(
Expand Down
1 change: 1 addition & 0 deletions moto/workspaces/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .models import workspaces_backends # noqa: F401
30 changes: 30 additions & 0 deletions moto/workspaces/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Exceptions raised by the workspaces service."""
from moto.core.exceptions import JsonRESTError


class ValidationException(JsonRESTError):
code = 400

def __init__(self, message: str):
super().__init__("ValidationException", message)


class InvalidParameterValuesException(JsonRESTError):
code = 400

def __init__(self, message: str):
super().__init__("InvalidParameterValuesException", message)


class ResourceAlreadyExistsException(JsonRESTError):
code = 400

def __init__(self, message: str):
super().__init__("ResourceAlreadyExistsException", message)


class ResourceNotFoundException(JsonRESTError):
code = 400

def __init__(self, message: str):
super().__init__("ResourceNotFoundException", message)
Loading

0 comments on commit 8123e6d

Please sign in to comment.