Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Appsync API Cache Implementation #8115

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions moto/appsync/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ def __init__(self, message: str):
self.description = json.dumps({"message": self.message})


class GraphqlAPICacheNotFound(AppSyncExceptions):
code = 404

def __init__(self, op: str):
super().__init__(
"NotFoundException",
f"Unable to {op} the cache as it doesn't exist, please create the cache first.",
)
self.description = json.dumps({"message": self.message})


class BadRequestException(AppSyncExceptions):
def __init__(self, message: str):
super().__init__("BadRequestException", message)
150 changes: 149 additions & 1 deletion moto/appsync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import get_partition

from .exceptions import BadRequestException, GraphqlAPINotFound, GraphQLSchemaException
from .exceptions import (
BadRequestException,
GraphqlAPICacheNotFound,
GraphqlAPINotFound,
GraphQLSchemaException,
)

# AWS custom scalars and directives
# https://github.com/dotansimha/graphql-code-generator/discussions/4311#discussioncomment-2921796
Expand Down Expand Up @@ -132,6 +137,49 @@ def to_json(self) -> Dict[str, Any]:
}


class APICache(BaseModel):
def __init__(
self,
ttl: int,
api_caching_behavior: str,
type_: str,
transit_encryption_enabled: Optional[bool] = None,
at_rest_encryption_enabled: Optional[bool] = None,
health_metrics_config: Optional[str] = None,
):
self.ttl = ttl
self.api_caching_behavior = api_caching_behavior
self.type = type_
self.transit_encryption_enabled = transit_encryption_enabled or False
self.at_rest_encryption_enabled = at_rest_encryption_enabled or False
self.health_metrics_config = health_metrics_config or "DISABLED"
self.status = "AVAILABLE"

def update(
self,
ttl: int,
api_caching_behavior: str,
type: str,
health_metrics_config: Optional[str] = None,
) -> None:
self.ttl = ttl
self.api_caching_behavior = api_caching_behavior
self.type = type
if health_metrics_config is not None:
self.health_metrics_config = health_metrics_config

def to_json(self) -> Dict[str, Any]:
return {
"ttl": self.ttl,
"transitEncryptionEnabled": self.transit_encryption_enabled,
"atRestEncryptionEnabled": self.at_rest_encryption_enabled,
"apiCachingBehavior": self.api_caching_behavior,
"type": self.type,
"healthMetricsConfig": self.health_metrics_config,
"status": self.status,
}


class GraphqlAPI(BaseModel):
def __init__(
self,
Expand Down Expand Up @@ -162,6 +210,8 @@ def __init__(

self.api_keys: Dict[str, GraphqlAPIKey] = dict()

self.api_cache: Optional[APICache] = None

def update(
self,
name: str,
Expand Down Expand Up @@ -223,6 +273,38 @@ def get_type(self, type_name: str, type_format: str) -> Any:
graphql_type["format"] = type_format # type: ignore[index]
return graphql_type

def create_api_cache(
self,
ttl: int,
api_caching_behavior: str,
type: str,
transit_encryption_enabled: Optional[bool] = None,
at_rest_encryption_enabled: Optional[bool] = None,
health_metrics_config: Optional[str] = None,
) -> APICache:
self.api_cache = APICache(
ttl,
api_caching_behavior,
type,
transit_encryption_enabled,
at_rest_encryption_enabled,
health_metrics_config,
)
return self.api_cache

def update_api_cache(
self,
ttl: int,
api_caching_behavior: str,
type: str,
health_metrics_config: Optional[str] = None,
) -> APICache:
self.api_cache.update(ttl, api_caching_behavior, type, health_metrics_config) # type: ignore[union-attr]
return self.api_cache # type: ignore[return-value]

def delete_api_cache(self) -> None:
self.api_cache = None

def to_json(self) -> Dict[str, Any]:
return {
"name": self.name,
Expand Down Expand Up @@ -374,5 +456,71 @@ def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
def get_type(self, api_id: str, type_name: str, type_format: str) -> Any:
return self.graphql_apis[api_id].get_type(type_name, type_format)

def get_api_cache(self, api_id: str) -> APICache:
if api_id not in self.graphql_apis:
raise GraphqlAPINotFound(api_id)
api_cache = self.graphql_apis[api_id].api_cache
if api_cache is None:
raise GraphqlAPICacheNotFound("get")
return api_cache

def delete_api_cache(self, api_id: str) -> None:
if api_id not in self.graphql_apis:
raise GraphqlAPINotFound(api_id)
if self.graphql_apis[api_id].api_cache is None:
raise GraphqlAPICacheNotFound("delete")
self.graphql_apis[api_id].delete_api_cache()
return

def create_api_cache(
self,
api_id: str,
ttl: int,
api_caching_behavior: str,
type: str,
transit_encryption_enabled: Optional[bool] = None,
at_rest_encryption_enabled: Optional[bool] = None,
health_metrics_config: Optional[str] = None,
) -> APICache:
if api_id not in self.graphql_apis:
raise GraphqlAPINotFound(api_id)
graphql_api = self.graphql_apis[api_id]
if graphql_api.api_cache is not None:
raise BadRequestException(message="The API has already enabled caching.")
api_cache = graphql_api.create_api_cache(
ttl,
api_caching_behavior,
type,
transit_encryption_enabled,
at_rest_encryption_enabled,
health_metrics_config,
)
return api_cache

def update_api_cache(
self,
api_id: str,
ttl: int,
api_caching_behavior: str,
type: str,
health_metrics_config: Optional[str] = None,
) -> APICache:
if api_id not in self.graphql_apis:
raise GraphqlAPINotFound(api_id)
graphql_api = self.graphql_apis[api_id]
if graphql_api.api_cache is None:
raise GraphqlAPICacheNotFound("update")
api_cache = graphql_api.update_api_cache(
ttl, api_caching_behavior, type, health_metrics_config
)
return api_cache

def flush_api_cache(self, api_id: str) -> None:
if api_id not in self.graphql_apis:
raise GraphqlAPINotFound(api_id)
if self.graphql_apis[api_id].api_cache is None:
raise GraphqlAPICacheNotFound("flush")
return


appsync_backends = BackendDict(AppSyncBackend, "appsync")
57 changes: 57 additions & 0 deletions moto/appsync/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,60 @@ def get_introspection_schema(self) -> str:
format_=format_, include_directives=include_directives
)
return schema

def get_api_cache(self) -> str:
api_id = self.path.split("/")[-2]
api_cache = self.appsync_backend.get_api_cache(
api_id=api_id,
)
return json.dumps(dict(apiCache=api_cache.to_json()))

def delete_api_cache(self) -> str:
api_id = self.path.split("/")[-2]
self.appsync_backend.delete_api_cache(
api_id=api_id,
)
return "{}"

def create_api_cache(self) -> str:
params = json.loads(self.body)
api_id = self.path.split("/")[-2]
mattheidelbaugh marked this conversation as resolved.
Show resolved Hide resolved
ttl = params.get("ttl")
transit_encryption_enabled = params.get("transitEncryptionEnabled")
at_rest_encryption_enabled = params.get("atRestEncryptionEnabled")
api_caching_behavior = params.get("apiCachingBehavior")
type = params.get("type")
health_metrics_config = params.get("healthMetricsConfig")
api_cache = self.appsync_backend.create_api_cache(
api_id=api_id,
ttl=ttl,
transit_encryption_enabled=transit_encryption_enabled,
at_rest_encryption_enabled=at_rest_encryption_enabled,
api_caching_behavior=api_caching_behavior,
type=type,
health_metrics_config=health_metrics_config,
)
return json.dumps(dict(apiCache=api_cache.to_json()))

def update_api_cache(self) -> str:
api_id = self.path.split("/")[-3]
mattheidelbaugh marked this conversation as resolved.
Show resolved Hide resolved
params = json.loads(self.body)
ttl = params.get("ttl")
api_caching_behavior = params.get("apiCachingBehavior")
type = params.get("type")
health_metrics_config = params.get("healthMetricsConfig")
api_cache = self.appsync_backend.update_api_cache(
api_id=api_id,
ttl=ttl,
api_caching_behavior=api_caching_behavior,
type=type,
health_metrics_config=health_metrics_config,
)
return json.dumps(dict(apiCache=api_cache.to_json()))

def flush_api_cache(self) -> str:
api_id = self.path.split("/")[-2]
mattheidelbaugh marked this conversation as resolved.
Show resolved Hide resolved
self.appsync_backend.flush_api_cache(
api_id=api_id,
)
return "{}"
3 changes: 3 additions & 0 deletions moto/appsync/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@
"{0}/v1/tags/(?P<resource_arn>.+)$": AppSyncResponse.dispatch,
"{0}/v1/tags/(?P<resource_arn_pt1>.+)/(?P<resource_arn_pt2>.+)$": AppSyncResponse.dispatch,
"{0}/v1/apis/(?P<api_id>[^/]+)/types/(?P<type_name>.+)$": AppSyncResponse.dispatch,
"{0}/v1/apis/(?P<apiId>.*)/ApiCaches$": AppSyncResponse.dispatch,
"{0}/v1/apis/(?P<apiId>.*)/ApiCaches/update$": AppSyncResponse.dispatch,
"{0}/v1/apis/(?P<apiId>.*)/FlushCache$": AppSyncResponse.dispatch,
}
Loading
Loading