From a97c8aecce58836473b8118b3c454fd302f09fbc Mon Sep 17 00:00:00 2001 From: johnson2427 Date: Fri, 28 Jun 2024 08:53:26 -0500 Subject: [PATCH] feat: add ability to input file --- ape_aws/accounts.py | 55 ++------------------------------------------- ape_aws/client.py | 5 ++--- ape_aws/kms/_cli.py | 33 +++++++++------------------ 3 files changed, 14 insertions(+), 79 deletions(-) diff --git a/ape_aws/accounts.py b/ape_aws/accounts.py index ae01933..13c58be 100644 --- a/ape_aws/accounts.py +++ b/ape_aws/accounts.py @@ -1,32 +1,20 @@ from functools import cached_property from json import dumps -from pathlib import Path from typing import Any, Iterator, Optional from ape.api.accounts import AccountAPI, AccountContainerAPI, TransactionAPI from ape.types import AddressType, MessageSignature, SignableMessage, TransactionSignature -from ape.utils.validators import _validate_account_passphrase -from eth_account import Account as EthAccount from eth_account._utils.legacy_transactions import serializable_unsigned_transaction_from_dict from eth_account.messages import _hash_eip191_message, encode_defunct from eth_pydantic_types import HexBytes from eth_typing import Hash32 -from eth_utils import keccak, to_bytes, to_checksum_address +from eth_utils import keccak, to_checksum_address from .client import kms_client from .utils import _convert_der_to_rsv class AwsAccountContainer(AccountContainerAPI): - loaded_accounts: dict[str, "KmsAccount"] = {} - - def model_post_init(self, __context: Any): - print("Initializing AWS KMS Account Container") - print([acc.alias for acc in self.accounts]) - - @property - def _keyfiles(self) -> list[Path]: - return [file for file in self.data_folder.glob("*.json")] @property def aliases(self) -> Iterator[str]: @@ -37,20 +25,8 @@ def __len__(self) -> int: @property def accounts(self) -> Iterator[AccountAPI]: - def _load_account(key_alias, key_id, key_arn) -> AccountAPI: - filename = f"{key_alias}.json" - keyfile = self.data_folder.joinpath(filename) - if filename not in self._keyfiles: - self.loaded_accounts[keyfile.stem] = KmsAccount( - key_alias=key_alias, - key_id=key_id, - key_arn=key_arn, - ) - keyfile.write_text(self.loaded_accounts[keyfile.stem].dump_to_json()) - return self.loaded_accounts[keyfile.stem] - return map( - lambda x: _load_account( + lambda x: KmsAccount( key_alias=x.alias.replace("alias/", ""), key_id=x.key_id, key_arn=x.arn, @@ -58,29 +34,6 @@ def _load_account(key_alias, key_id, key_arn) -> AccountAPI: kms_client.raw_aliases, ) - def add_private_key(self, alias, passphrase, private_key): - kms_account = self.loaded_accounts[alias] - _validate_account_passphrase(passphrase) - account = EthAccount.from_key(to_bytes(hexstr=private_key)) - keyfile = self.data_folder.joinpath(f"{alias}.json") - account = EthAccount.encrypt(account.key, passphrase) - model = kms_account.model_dump() - model["address"] = kms_account.address - del account["address"] - model.update(account) - keyfile.write_text(dumps(model, indent=4)) - print("Key cached successfully") - return - - def delete_account(self, alias): - alias = alias.replace("alias/", "") - keyfile = self.data_folder.joinpath(f"{alias}.json") - if keyfile.exists(): - keyfile.unlink() - print(f"Key {alias} deleted successfully") - else: - print(f"Key {alias} not found") - class KmsAccount(AccountAPI): key_alias: str @@ -155,7 +108,3 @@ def sign_transaction(self, txn: TransactionAPI, **signer_options) -> Optional[Tr return txn - def dump_to_json(self, indent: int = 4): - model = self.model_dump() - model["address"] = self.address - return dumps(model, indent=indent) diff --git a/ape_aws/client.py b/ape_aws/client.py index 30154db..b90f218 100644 --- a/ape_aws/client.py +++ b/ape_aws/client.py @@ -1,4 +1,5 @@ from datetime import datetime +from pathlib import Path from typing import ClassVar import boto3 # type: ignore[import] @@ -79,13 +80,11 @@ class ImportKeyRequest(CreateKeyModel): class ImportKey(ImportKeyRequest): key_id: str = Field(default=None, alias="KeyId") public_key: bytes = Field(default=None, alias="PublicKey") - private_key: str | bytes | None = Field(default=None, alias="PrivateKey") + private_key: str | bytes = Field(default=None, alias="PrivateKey") import_token: bytes = Field(default=None, alias="ImportToken") @field_validator("private_key") def validate_private_key(cls, value): - if not value: - return ec.generate_private_key(ec.SECP256K1(), default_backend()) if value.startswith("0x"): value = value[2:] return value diff --git a/ape_aws/kms/_cli.py b/ape_aws/kms/_cli.py index 86c152f..331f45b 100644 --- a/ape_aws/kms/_cli.py +++ b/ape_aws/kms/_cli.py @@ -1,4 +1,6 @@ import click +from pathlib import Path + from ape.cli import ape_cli_context from ape_aws.accounts import AwsAccountContainer, KmsAccount @@ -62,13 +64,6 @@ def create_key( @kms.command(name="import") @ape_cli_context() -@click.option( - "-p", - "--private-key", - "private_key", - multiple=False, - help="The private key to import", -) @click.option( "-a", "--admin", @@ -93,22 +88,20 @@ def create_key( metavar="str", ) @click.argument("alias_name") +@click.argument("private_key") def import_key( cli_ctx, alias_name: str, - private_key: bytes, + private_key: bytes | str | Path, administrators: list[str], users: list[str], description: str, ): - def ask_for_passphrase(): - return click.prompt( - "Create Passphrase to encrypt account", - hide_input=True, - confirmation_prompt=True, - ) - - passphrase = ask_for_passphrase() + path = Path(private_key) + if path.exists() and path.is_file(): + cli_ctx.logger.info(f"Reading private key from {path}") + private_key = path.read_text().strip() + key_spec = ImportKeyRequest( alias=alias_name, description=description, # type: ignore @@ -130,8 +123,6 @@ def ask_for_passphrase(): if response["ResponseMetadata"]["HTTPStatusCode"] != 200: cli_ctx.abort("Key failed to import into KMS") cli_ctx.logger.success(f"Key imported successfully with ID: {key_id}") - aws_account_container = AwsAccountContainer(name="aws", account_type=KmsAccount) - aws_account_container.add_private_key(alias_name, passphrase, import_key_spec.private_key_hex) # TODO: Add `ape aws kms sign-message [message]` @@ -141,9 +132,8 @@ def ask_for_passphrase(): @kms.command(name="delete") @ape_cli_context() @click.argument("alias_name") -@click.option("-p", "--purge", is_flag=True, help="Purge the key from the system") @click.option("-d", "--days", default=30, help="Number of days until key is deactivated") -def schedule_delete_key(cli_ctx, alias_name, purge, days): +def schedule_delete_key(cli_ctx, alias_name, days): if "alias" not in alias_name: alias_name = f"alias/{alias_name}" kms_account = None @@ -156,7 +146,4 @@ def schedule_delete_key(cli_ctx, alias_name, purge, days): delete_key_spec = DeleteKey(alias=alias_name, key_id=kms_account.key_id, days=days) key_alias = kms_client.delete_key(delete_key_spec) - if purge: - aws_account_container = AwsAccountContainer(name="aws", account_type=KmsAccount) - aws_account_container.delete_account(key_alias) cli_ctx.logger.success(f"Key {key_alias} scheduled for deletion in {days} days")