From 1cad64e928964a2a95b829de606b206577838075 Mon Sep 17 00:00:00 2001 From: johnson2427 Date: Mon, 24 Jun 2024 16:00:42 -0500 Subject: [PATCH] feat: adding import functionality --- ape_aws/client.py | 62 ++++++++++++++++++++++++++++++++++++++++++--- ape_aws/kms/_cli.py | 62 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 118 insertions(+), 6 deletions(-) diff --git a/ape_aws/client.py b/ape_aws/client.py index d40e06b..9869a32 100644 --- a/ape_aws/client.py +++ b/ape_aws/client.py @@ -1,8 +1,11 @@ +from cryptography.hazmat.primitives.asymmetric import ec, padding +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.backends import default_backend from datetime import datetime from typing import ClassVar import boto3 # type: ignore[import] -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict class AliasResponse(BaseModel): @@ -15,6 +18,7 @@ class AliasResponse(BaseModel): class KeyBaseModel(BaseModel): alias: str + model_config = ConfigDict(populate_by_name=True) class CreateKeyModel(KeyBaseModel): @@ -67,10 +71,45 @@ class CreateKey(CreateKeyModel): origin: str = Field(default="AWS_KMS", alias="Origin") -class ImportKey(CreateKeyModel): +class ImportKeyRequest(CreateKeyModel): origin: str = Field(default="EXTERNAL", alias="Origin") +class ImportKey(ImportKeyRequest): + key_id: str = Field(default=None, alias="KeyId") + public_key: bytes = Field(default=None, alias="PublicKey") + private_key: bytes = Field( + default=ec.generate_private_key( + ec.SeCP256K1(), + default_backend() + ).private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ), + alias="PrivateKey", + ) + import_token: bytes = Field(default=None, alias="ImportToken") + + @property + def encrypted_key(self): + if not self.public_key: + raise ValueError("Public key not found") + + serialized_public_key = serialization.load_der_public_key( + self.public_key, + backend=default_backend(), + ) + return serialized_public_key.encrypt( + self.private_key, + padding.OAEP( + mgf=padding.MGF1(hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ) + ) + + class DeleteKey(KeyBaseModel): key_id: str days: int = 30 @@ -103,8 +142,9 @@ def sign(self, key_id, msghash): ) return response.get("Signature") - def create_key(self, key_spec: CreateKey): + def create_key(self, key_spec: CreateKey | ImportKey): response = self.client.create_key(**key_spec.to_aws_dict()) + key_id = response["KeyMetadata"]["KeyId"] self.client.create_alias( AliasName=f"alias/{key_spec.alias}", @@ -131,6 +171,22 @@ def create_key(self, key_spec: CreateKey): ) return key_id + def import_key(self, key_spec: ImportKey): + breakpoint() + return self.client.import_key_material( + KeyId=key_spec.key_id, + ImportToken=key_spec.import_token, + EncryptedKeyMaterial=key_spec.encrypted_key, + ExpirationModel="KEY_MATERIAL_DOES_NOT_EXPIRE", + ) + + def get_parameters(self, key_id: str): + return self.client.get_parameters_for_import( + KeyId=key_id, + WrappingAlgorithm="RSAES_OAEP_SHA_256", + WrappingKeySpec="RSA_2048", + ) + def delete_key(self, key_spec: DeleteKey): self.client.delete_alias(AliasName=key_spec.alias) self.client.schedule_key_deletion(KeyId=key_spec.key_id, PendingWindowInDays=key_spec.days) diff --git a/ape_aws/kms/_cli.py b/ape_aws/kms/_cli.py index c6145e7..e112789 100644 --- a/ape_aws/kms/_cli.py +++ b/ape_aws/kms/_cli.py @@ -1,7 +1,14 @@ +import base64 import click -from ape.cli import ape_cli_context -from ape_aws.client import CreateKey, DeleteKey, kms_client +from ape.cli import ape_cli_context +from ape_aws.client import ( + CreateKey, + DeleteKey, + ImportKeyRequest, + ImportKey, + kms_client, +) @click.group("kms") @@ -54,7 +61,56 @@ def create_key( cli_ctx.logger.success(f"Key created successfully with ID: {key_id}") -# TODO: Add `ape aws kms import` +@kms.command(name="import") +@ape_cli_context() +@click.option( + "-a", + "--admin", + "administrators", + multiple=True, + help="Apply key policy to a list of administrators if applicable, ex. -a ARN1, -a ARN2", + metavar="list[ARN]", +) +@click.option( + "-u", + "--user", + "users", + multiple=True, + help="Apply key policy to a list of users if applicable, ex. -u ARN1, -u ARN2", + metavar="list[ARN]", +) +@click.argument("alias_name") +@click.argument("description") +@click.argument("private_key") +def import_key( + cli_ctx, + alias_name: str, + description: str, + private_key: bytes, + administrators: list[str], + users: list[str], +): + key_spec = ImportKeyRequest( + alias=alias_name, + description=description, + admins=administrators, + users=users, + ) + key_id = kms_client.create_key(key_spec) + create_key_response = kms_client.get_parameters(key_id) + public_key = base64.b64encode(create_key_response["PublicKey"]) + import_token = base64.b64encode(create_key_response["ImportToken"]) + import_key_spec = ImportKey( + **key_spec.model_dump(), + key_id=key_id, + public_key=public_key, + private_key=private_key, + import_token=import_token, + ) + key_id = kms_client.import_key(import_key_spec) + cli_ctx.logger.success(f"Key imported successfully with ID: {key_id}") + + # TODO: Add `ape aws kms sign-message [message]` # TODO: Add `ape aws kms verify-message [message] [hex-signature]`