Skip to content

Commit

Permalink
feat: add ability to input file
Browse files Browse the repository at this point in the history
  • Loading branch information
johnson2427 committed Jun 28, 2024
1 parent 5062fe4 commit a97c8ae
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 79 deletions.
55 changes: 2 additions & 53 deletions ape_aws/accounts.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
from functools import cached_property
from json import dumps
from pathlib import Path
from typing import Any, Iterator, Optional

from ape.api.accounts import AccountAPI, AccountContainerAPI, TransactionAPI
from ape.types import AddressType, MessageSignature, SignableMessage, TransactionSignature
from ape.utils.validators import _validate_account_passphrase
from eth_account import Account as EthAccount
from eth_account._utils.legacy_transactions import serializable_unsigned_transaction_from_dict
from eth_account.messages import _hash_eip191_message, encode_defunct
from eth_pydantic_types import HexBytes
from eth_typing import Hash32
from eth_utils import keccak, to_bytes, to_checksum_address
from eth_utils import keccak, to_checksum_address

from .client import kms_client
from .utils import _convert_der_to_rsv


class AwsAccountContainer(AccountContainerAPI):
loaded_accounts: dict[str, "KmsAccount"] = {}

def model_post_init(self, __context: Any):
print("Initializing AWS KMS Account Container")
print([acc.alias for acc in self.accounts])

@property
def _keyfiles(self) -> list[Path]:
return [file for file in self.data_folder.glob("*.json")]

@property
def aliases(self) -> Iterator[str]:
Expand All @@ -37,50 +25,15 @@ def __len__(self) -> int:

@property
def accounts(self) -> Iterator[AccountAPI]:
def _load_account(key_alias, key_id, key_arn) -> AccountAPI:
filename = f"{key_alias}.json"
keyfile = self.data_folder.joinpath(filename)
if filename not in self._keyfiles:
self.loaded_accounts[keyfile.stem] = KmsAccount(
key_alias=key_alias,
key_id=key_id,
key_arn=key_arn,
)
keyfile.write_text(self.loaded_accounts[keyfile.stem].dump_to_json())
return self.loaded_accounts[keyfile.stem]

return map(
lambda x: _load_account(
lambda x: KmsAccount(
key_alias=x.alias.replace("alias/", ""),
key_id=x.key_id,
key_arn=x.arn,
),
kms_client.raw_aliases,
)

def add_private_key(self, alias, passphrase, private_key):
kms_account = self.loaded_accounts[alias]
_validate_account_passphrase(passphrase)
account = EthAccount.from_key(to_bytes(hexstr=private_key))
keyfile = self.data_folder.joinpath(f"{alias}.json")
account = EthAccount.encrypt(account.key, passphrase)
model = kms_account.model_dump()
model["address"] = kms_account.address
del account["address"]
model.update(account)
keyfile.write_text(dumps(model, indent=4))
print("Key cached successfully")
return

def delete_account(self, alias):
alias = alias.replace("alias/", "")
keyfile = self.data_folder.joinpath(f"{alias}.json")
if keyfile.exists():
keyfile.unlink()
print(f"Key {alias} deleted successfully")
else:
print(f"Key {alias} not found")


class KmsAccount(AccountAPI):
key_alias: str
Expand Down Expand Up @@ -155,7 +108,3 @@ def sign_transaction(self, txn: TransactionAPI, **signer_options) -> Optional[Tr

return txn

def dump_to_json(self, indent: int = 4):
model = self.model_dump()
model["address"] = self.address
return dumps(model, indent=indent)
5 changes: 2 additions & 3 deletions ape_aws/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from pathlib import Path
from typing import ClassVar

import boto3 # type: ignore[import]
Expand Down Expand Up @@ -79,13 +80,11 @@ class ImportKeyRequest(CreateKeyModel):
class ImportKey(ImportKeyRequest):
key_id: str = Field(default=None, alias="KeyId")
public_key: bytes = Field(default=None, alias="PublicKey")
private_key: str | bytes | None = Field(default=None, alias="PrivateKey")
private_key: str | bytes = Field(default=None, alias="PrivateKey")
import_token: bytes = Field(default=None, alias="ImportToken")

@field_validator("private_key")
def validate_private_key(cls, value):
if not value:
return ec.generate_private_key(ec.SECP256K1(), default_backend())
if value.startswith("0x"):
value = value[2:]
return value
Expand Down
33 changes: 10 additions & 23 deletions ape_aws/kms/_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import click
from pathlib import Path

from ape.cli import ape_cli_context

from ape_aws.accounts import AwsAccountContainer, KmsAccount
Expand Down Expand Up @@ -62,13 +64,6 @@ def create_key(

@kms.command(name="import")
@ape_cli_context()
@click.option(
"-p",
"--private-key",
"private_key",
multiple=False,
help="The private key to import",
)
@click.option(
"-a",
"--admin",
Expand All @@ -93,22 +88,20 @@ def create_key(
metavar="str",
)
@click.argument("alias_name")
@click.argument("private_key")
def import_key(
cli_ctx,
alias_name: str,
private_key: bytes,
private_key: bytes | str | Path,
administrators: list[str],
users: list[str],
description: str,
):
def ask_for_passphrase():
return click.prompt(
"Create Passphrase to encrypt account",
hide_input=True,
confirmation_prompt=True,
)

passphrase = ask_for_passphrase()
path = Path(private_key)
if path.exists() and path.is_file():
cli_ctx.logger.info(f"Reading private key from {path}")
private_key = path.read_text().strip()

key_spec = ImportKeyRequest(
alias=alias_name,
description=description, # type: ignore
Expand All @@ -130,8 +123,6 @@ def ask_for_passphrase():
if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
cli_ctx.abort("Key failed to import into KMS")
cli_ctx.logger.success(f"Key imported successfully with ID: {key_id}")
aws_account_container = AwsAccountContainer(name="aws", account_type=KmsAccount)
aws_account_container.add_private_key(alias_name, passphrase, import_key_spec.private_key_hex)


# TODO: Add `ape aws kms sign-message [message]`
Expand All @@ -141,9 +132,8 @@ def ask_for_passphrase():
@kms.command(name="delete")
@ape_cli_context()
@click.argument("alias_name")
@click.option("-p", "--purge", is_flag=True, help="Purge the key from the system")
@click.option("-d", "--days", default=30, help="Number of days until key is deactivated")
def schedule_delete_key(cli_ctx, alias_name, purge, days):
def schedule_delete_key(cli_ctx, alias_name, days):
if "alias" not in alias_name:
alias_name = f"alias/{alias_name}"
kms_account = None
Expand All @@ -156,7 +146,4 @@ def schedule_delete_key(cli_ctx, alias_name, purge, days):

delete_key_spec = DeleteKey(alias=alias_name, key_id=kms_account.key_id, days=days)
key_alias = kms_client.delete_key(delete_key_spec)
if purge:
aws_account_container = AwsAccountContainer(name="aws", account_type=KmsAccount)
aws_account_container.delete_account(key_alias)
cli_ctx.logger.success(f"Key {key_alias} scheduled for deletion in {days} days")

0 comments on commit a97c8ae

Please sign in to comment.