Skip to content

Commit

Permalink
feat: adding import functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
johnson2427 committed Jun 24, 2024
1 parent b367ec6 commit 1cad64e
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 6 deletions.
62 changes: 59 additions & 3 deletions ape_aws/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
from datetime import datetime
from typing import ClassVar

import boto3 # type: ignore[import]
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, ConfigDict


class AliasResponse(BaseModel):
Expand All @@ -15,6 +18,7 @@ class AliasResponse(BaseModel):

class KeyBaseModel(BaseModel):
alias: str
model_config = ConfigDict(populate_by_name=True)


class CreateKeyModel(KeyBaseModel):
Expand Down Expand Up @@ -67,10 +71,45 @@ class CreateKey(CreateKeyModel):
origin: str = Field(default="AWS_KMS", alias="Origin")


class ImportKey(CreateKeyModel):
class ImportKeyRequest(CreateKeyModel):
origin: str = Field(default="EXTERNAL", alias="Origin")


class ImportKey(ImportKeyRequest):
key_id: str = Field(default=None, alias="KeyId")
public_key: bytes = Field(default=None, alias="PublicKey")
private_key: bytes = Field(
default=ec.generate_private_key(
ec.SeCP256K1(),
default_backend()
).private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
),
alias="PrivateKey",
)
import_token: bytes = Field(default=None, alias="ImportToken")

@property
def encrypted_key(self):
if not self.public_key:
raise ValueError("Public key not found")

serialized_public_key = serialization.load_der_public_key(
self.public_key,
backend=default_backend(),
)
return serialized_public_key.encrypt(
self.private_key,
padding.OAEP(
mgf=padding.MGF1(hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
)
)


class DeleteKey(KeyBaseModel):
key_id: str
days: int = 30
Expand Down Expand Up @@ -103,8 +142,9 @@ def sign(self, key_id, msghash):
)
return response.get("Signature")

def create_key(self, key_spec: CreateKey):
def create_key(self, key_spec: CreateKey | ImportKey):
response = self.client.create_key(**key_spec.to_aws_dict())

key_id = response["KeyMetadata"]["KeyId"]
self.client.create_alias(
AliasName=f"alias/{key_spec.alias}",
Expand All @@ -131,6 +171,22 @@ def create_key(self, key_spec: CreateKey):
)
return key_id

def import_key(self, key_spec: ImportKey):
breakpoint()
return self.client.import_key_material(
KeyId=key_spec.key_id,
ImportToken=key_spec.import_token,
EncryptedKeyMaterial=key_spec.encrypted_key,
ExpirationModel="KEY_MATERIAL_DOES_NOT_EXPIRE",
)

def get_parameters(self, key_id: str):
return self.client.get_parameters_for_import(
KeyId=key_id,
WrappingAlgorithm="RSAES_OAEP_SHA_256",
WrappingKeySpec="RSA_2048",
)

def delete_key(self, key_spec: DeleteKey):
self.client.delete_alias(AliasName=key_spec.alias)
self.client.schedule_key_deletion(KeyId=key_spec.key_id, PendingWindowInDays=key_spec.days)
Expand Down
62 changes: 59 additions & 3 deletions ape_aws/kms/_cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import base64
import click
from ape.cli import ape_cli_context

from ape_aws.client import CreateKey, DeleteKey, kms_client
from ape.cli import ape_cli_context
from ape_aws.client import (
CreateKey,
DeleteKey,
ImportKeyRequest,
ImportKey,
kms_client,
)


@click.group("kms")
Expand Down Expand Up @@ -54,7 +61,56 @@ def create_key(
cli_ctx.logger.success(f"Key created successfully with ID: {key_id}")


# TODO: Add `ape aws kms import`
@kms.command(name="import")
@ape_cli_context()
@click.option(
"-a",
"--admin",
"administrators",
multiple=True,
help="Apply key policy to a list of administrators if applicable, ex. -a ARN1, -a ARN2",
metavar="list[ARN]",
)
@click.option(
"-u",
"--user",
"users",
multiple=True,
help="Apply key policy to a list of users if applicable, ex. -u ARN1, -u ARN2",
metavar="list[ARN]",
)
@click.argument("alias_name")
@click.argument("description")
@click.argument("private_key")
def import_key(
cli_ctx,
alias_name: str,
description: str,
private_key: bytes,
administrators: list[str],
users: list[str],
):
key_spec = ImportKeyRequest(
alias=alias_name,
description=description,
admins=administrators,
users=users,
)
key_id = kms_client.create_key(key_spec)
create_key_response = kms_client.get_parameters(key_id)
public_key = base64.b64encode(create_key_response["PublicKey"])
import_token = base64.b64encode(create_key_response["ImportToken"])
import_key_spec = ImportKey(
**key_spec.model_dump(),
key_id=key_id,
public_key=public_key,
private_key=private_key,
import_token=import_token,
)
key_id = kms_client.import_key(import_key_spec)
cli_ctx.logger.success(f"Key imported successfully with ID: {key_id}")


# TODO: Add `ape aws kms sign-message [message]`
# TODO: Add `ape aws kms verify-message [message] [hex-signature]`

Expand Down

0 comments on commit 1cad64e

Please sign in to comment.