diff --git a/README.md b/README.md index 1c0f746..4064f5c 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,57 @@ A library to convert between Sigstore Bundles and [PEP 740] Attestation objects python -m pip install pypi-attestation-models ``` -## Usage +## Usage as a command line tool + +````bash +python -m pypi_attestation_models --help +usage: pypi-attestation-models [-h] [-v] [-V] COMMAND ... + +Sign, inspect or verify PEP 740 attestations + +positional arguments: + COMMAND The operation to perform + sign Sign one or more inputs + verify Verify one or more inputs + inspect Inspect one or more inputs + +options: + -h, --help show this help message and exit + -v, --verbose run with additional debug logging; supply multiple times to + increase verbosity (default: 0) + -V, --version show program's version number and exit +```` + +### Signing a package + +```bash +# Generate a whl file +make package +python -m pypi_attestation_models sign dist/pypi_attestation_models-*.whl +``` + +_Note_: This will open a browser window to authenticate with the Sigstore +OAuth flow. + +### Inspecting a PEP 740 Attestation + +```bash +python -m pypi_attestation_models inspect dist/pypi_attestation_models-*.whl.publish.attestation +``` +_Warning_: Inspecting does not mean verifying. It only prints the structure of +the attestation. + +### Verifying a PEP 740 Attestation + +```bash +python -m pypi_attestation_models verify --staging \ + --identity william@yossarian.net \ + test/assets/rfc8785-0.1.2-py3-none-any.whl +``` +The attestation present in the test has been generated using the staging +environment of Sigstore and signed by William. + +## Usage as a library See the full API documentation [here]. diff --git a/pyproject.toml b/pyproject.toml index c62e1bb..ef154f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,10 @@ name = "pypi_attestation_models" [tool.coverage.run] # don't attempt code coverage for the CLI entrypoints -omit = ["src/pypi_attestation_models/_cli.py"] +omit = [ + "src/pypi_attestation_models/_cli.py", + "src/pypi_attestation_models/__main__.py" +] [tool.mypy] mypy_path = "src" @@ -93,6 +96,11 @@ ignore = ["ANN101", "ANN102", "D203", "D213", "COM812", "ISC001"] [tool.interrogate] # don't enforce documentation coverage for packaging, testing, the virtual # environment, or the CLI (which is documented separately). -exclude = ["env", "test", "src/pypi_attestation_models/_cli.py"] +exclude = [ + "env", + "test", + "src/pypi_attestation_models/_cli.py", + "src/pypi_attestation_models/__main__.py" +] ignore-semiprivate = true fail-under = 100 diff --git a/src/pypi_attestation_models/__main__.py b/src/pypi_attestation_models/__main__.py new file mode 100644 index 0000000..fc89c0e --- /dev/null +++ b/src/pypi_attestation_models/__main__.py @@ -0,0 +1,6 @@ +"""The pypi-attestation-models entrypoint.""" + +if __name__ == "__main__": + from pypi_attestation_models._cli import main + + main() diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py new file mode 100644 index 0000000..2216176 --- /dev/null +++ b/src/pypi_attestation_models/_cli.py @@ -0,0 +1,296 @@ +from __future__ import annotations + +import argparse +import json +import logging +import typing +from pathlib import Path + +import sigstore.oidc +from cryptography import x509 +from pydantic import ValidationError +from sigstore.oidc import IdentityError, IdentityToken, Issuer +from sigstore.sign import SigningContext +from sigstore.verify import Verifier, policy + +from pypi_attestation_models import Attestation, AttestationError, VerificationError, __version__ + +if typing.TYPE_CHECKING: + from collections.abc import Iterable + from typing import NoReturn + +logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()]) +_logger = logging.getLogger(__name__) +_logger.setLevel(logging.INFO) + + +def _parser() -> argparse.ArgumentParser: + parent_parser = argparse.ArgumentParser(add_help=False) + + parent_parser.add_argument( + "-v", + "--verbose", + action="count", + default=0, + help="Run with additional debug logging; supply multiple times to increase verbosity", + ) + + parser = argparse.ArgumentParser( + prog="pypi-attestation-models", + description="Sign, inspect or verify PEP 740 attestations", + parents=[parent_parser], + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "-V", + "--version", + action="version", + version=f"pypi-attestation-models {__version__}", + ) + + subcommands = parser.add_subparsers( + required=True, + dest="subcommand", + metavar="COMMAND", + help="The operation to perform", + ) + + sign_command = subcommands.add_parser( + name="sign", help="Sign one or more inputs", parents=[parent_parser] + ) + + sign_command.add_argument( + "--staging", + action="store_true", + default=False, + help="Use the staging environment", + ) + + sign_command.add_argument( + "files", + metavar="FILE", + type=Path, + nargs="+", + help="The file to sign", + ) + + verify_command = subcommands.add_parser( + name="verify", + help="Verify one or more inputs", + parents=[parent_parser], + ) + + verify_command.add_argument( + "--identity", + type=str, + required=True, + help="Signer identity", + ) + + verify_command.add_argument( + "--staging", + action="store_true", + default=False, + help="Use the staging environment", + ) + + verify_command.add_argument( + "files", + metavar="FILE", + type=Path, + nargs="+", + help="The file to sign", + ) + + inspect_command = subcommands.add_parser( + name="inspect", + help="Inspect one or more inputs", + parents=[parent_parser], + ) + + inspect_command.add_argument( + "--dump-bytes", + action="store_true", + default=False, + help="Dump the bytes of the signature", + ) + + inspect_command.add_argument( + "files", + metavar="FILE", + type=Path, + nargs="+", + help="The file to sign", + ) + + return parser + + +def _die(message: str) -> NoReturn: + """Handle errors and terminate the program with an error code.""" + _logger.error(message) + raise SystemExit(1) + + +def _validate_files(files: Iterable[Path], should_exist: bool = True) -> None: + """Validate that the list of files exists or not. + + This function exits the program if the condition is not met. + """ + for file_path in files: + if file_path.is_file() != should_exist: + if should_exist: + _die(f"{file_path} is not a file.") + else: + _die(f"{file_path} already exists.") + + +def get_identity_token(args: argparse.Namespace) -> IdentityToken: + """Generate an Identity Token. + + This method uses the following order of precedence: + - An ambient credential + - An OAuth-2 flow + """ + # Ambient credential detection + oidc_token = sigstore.oidc.detect_credential() + if oidc_token is not None: + return IdentityToken(oidc_token) + + # Fallback to interactive OAuth-2 Flow + issuer: Issuer = Issuer.staging() if args.staging else Issuer.production() + return issuer.identity_token() + + +def _sign(args: argparse.Namespace) -> None: + """Sign the files passed as argument.""" + try: + identity = get_identity_token(args) + except IdentityError as identity_error: + _die(f"Failed to detect identity: {identity_error}") + + signing_ctx = SigningContext.staging() if args.staging else SigningContext.production() + + # Validates that every file we want to sign exist but none of their attestations + _validate_files(args.files, should_exist=True) + _validate_files( + (Path(f"{file_path}.publish.attestation") for file_path in args.files), + should_exist=False, + ) + + with signing_ctx.signer(identity, cache=True) as signer: + for file_path in args.files: + _logger.debug(f"Signing {file_path}") + + signature_path = Path(f"{file_path}.publish.attestation") + try: + attestation = Attestation.sign(signer, file_path) + except AttestationError as e: + _die(f"Failed to sign: {e}") + + _logger.debug("Attestation saved for %s saved in %s", file_path, signature_path) + + signature_path.write_text(attestation.model_dump_json()) + + +def _inspect(args: argparse.Namespace) -> None: + """Inspect attestations. + + Warning: The information displayed from the attestations are not verified. + """ + _validate_files(args.files, should_exist=True) + for file_path in args.files: + try: + attestation = Attestation.model_validate_json(file_path.read_text()) + except ValidationError as validation_error: + _die(f"Invalid attestation ({file_path}): {validation_error}") + + _logger.info( + "Warning: The information displayed below are not verified, they are only " + "displayed. Use the verify command to verify them." + ) + + _logger.info(f"File: {file_path}") + _logger.info(f"Version: {attestation.version}") + + decoded_statement = json.loads(attestation.envelope.statement.decode()) + + _logger.info("Statement:") + _logger.info(f"\tType: {decoded_statement['_type']}") + _logger.info("\tSubject:") + for subject in decoded_statement["subject"]: + _logger.info(f"\t\t{subject['name']} (digest: {subject['digest']['sha256']})") + + _logger.info(f"\tPredicate type: {decoded_statement['predicateType']}") + _logger.info(f"\tPredicate: {decoded_statement['predicate']}") + + if args.dump_bytes: + _logger.info(f"Signature: {attestation.envelope.signature!r}") + + # Verification Material + verification_material = attestation.verification_material + + # Certificate + certificate = x509.load_der_x509_certificate(verification_material.certificate) + _logger.info("Certificate:") + san = certificate.extensions.get_extension_for_class(x509.SubjectAlternativeName) + _logger.info(f"\tSubjects: {[name.value for name in san.value]}") + _logger.info(f"\tIssuer: {certificate.issuer.rfc4514_string()}") + _logger.info(f"\tValidity: {certificate.not_valid_after_utc}") + + # Transparency Log + _logger.info( + f"Transparency Log ({len(verification_material.transparency_entries)} entries):" + ) + for idx, entry in enumerate(verification_material.transparency_entries): + _logger.info(f"\tLog Index: {entry['logIndex']}") + + +def _verify(args: argparse.Namespace) -> None: + """Verify the files passed as argument.""" + verifier: Verifier = Verifier.staging() if args.staging else Verifier.production() + pol = policy.Identity(identity=args.identity) + + # Validate that both the attestations and files exists + _validate_files(args.files, should_exist=True) + _validate_files( + (Path(f"{file_path}.publish.attestation") for file_path in args.files), + should_exist=True, + ) + + for file_path in args.files: + attestation_path = Path(f"{file_path}.publish.attestation") + try: + attestation = Attestation.model_validate_json(attestation_path.read_text()) + except ValidationError as validation_error: + _die(f"Invalid attestation ({file_path}): {validation_error}") + + try: + attestation.verify(verifier, pol, file_path) + except VerificationError as verification_error: + _logger.error("Verification failed for %s: %s", file_path, verification_error) + continue + + _logger.info(f"OK: {attestation_path}") + + +def main() -> None: + parser = _parser() + args: argparse.Namespace = parser.parse_args() + + if args.verbose >= 1: + _logger.setLevel("DEBUG") + if args.verbose >= 2: + logging.getLogger().setLevel("DEBUG") + + _logger.debug(args) + + args._parser = parser # noqa: SLF001. + + if args.subcommand == "sign": + _sign(args) + elif args.subcommand == "verify": + _verify(args) + elif args.subcommand == "inspect": + _inspect(args) diff --git a/src/pypi_attestation_models/_impl.py b/src/pypi_attestation_models/_impl.py index 290f291..9d89dca 100644 --- a/src/pypi_attestation_models/_impl.py +++ b/src/pypi_attestation_models/_impl.py @@ -84,18 +84,26 @@ class Attestation(BaseModel): @classmethod def sign(cls, signer: Signer, dist: Path) -> Attestation: - """Create an envelope, with signature, from a distribution file.""" + """Create an envelope, with signature, from a distribution file. + + On failure, raises `AttestationError` or an appropriate subclass. + """ with dist.open(mode="rb", buffering=0) as io: # Replace this with `hashlib.file_digest()` once # our minimum supported Python is >=3.11 digest = _sha256_streaming(io).hex() + try: + name = _ultranormalize_dist_filename(dist.name) + except ValueError as e: + raise AttestationError(str(e)) + stmt = ( _StatementBuilder() .subjects( [ _Subject( - name=_ultranormalize_dist_filename(dist.name), + name=name, digest=_DigestSet(root={"sha256": digest}), ) ] diff --git a/test/assets/rfc8785-0.1.2-py3-none-any.whl.publish.attestation b/test/assets/rfc8785-0.1.2-py3-none-any.whl.publish.attestation new file mode 100644 index 0000000..fff256e --- /dev/null +++ b/test/assets/rfc8785-0.1.2-py3-none-any.whl.publish.attestation @@ -0,0 +1,51 @@ +{ + "version": 1, + "verification_material": { + "certificate": "MIIC0zCCAlmgAwIBAgIUNa1+nVgkOX1xlssDyRyt0DZ6M5UwCgYIKoZIzj0EAwMwNzEVMBMGA1UE\nChMMc2lnc3RvcmUuZGV2MR4wHAYDVQQDExVzaWdzdG9yZS1pbnRlcm1lZGlhdGUwHhcNMjQwNjA2\nMTgzOTA1WhcNMjQwNjA2MTg0OTA1WjAAMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyrm8stLQ\nwPX/MdVS50NZ4gmXEPEh6kYlvhEo079Yk1lMMmMobwFvINC8Lc02kg+03BMscXbM/OKv3Fl1qH9P\nCKOCAXgwggF0MA4GA1UdDwEB/wQEAwIHgDATBgNVHSUEDDAKBggrBgEFBQcDAzAdBgNVHQ4EFgQU\nn98gJQymjI+dFUDEea6CKbQngj4wHwYDVR0jBBgwFoAUcYYwphR8Ym/599b0BRp/X//rb6wwIwYD\nVR0RAQH/BBkwF4EVd2lsbGlhbUB5b3NzYXJpYW4ubmV0MCwGCisGAQQBg78wAQEEHmh0dHBzOi8v\nZ2l0aHViLmNvbS9sb2dpbi9vYXV0aDAuBgorBgEEAYO/MAEIBCAMHmh0dHBzOi8vZ2l0aHViLmNv\nbS9sb2dpbi9vYXV0aDCBiQYKKwYBBAHWeQIEAgR7BHkAdwB1ACswvNxoiMni4dgmKV50H0g5MZYC\n8pwzy15DQP6yrIZ6AAABj+7Y7/YAAAQDAEYwRAIgTWyPyS2CKRm5ZUaTwngfBtrOJozwlIfOOfXH\nyyej0BQCIGCwmYVKhNS7JbUTFeDe90SWNlpwl5YAVDb/2GGFxGNCMAoGCCqGSM49BAMDA2gAMGUC\nMQCxIekmLNdhAS7HVo6CRgqVRht8RiFO6lbyGK4fDuEQOk/MPaBlRhsaUxwejf7jI2kCMCw5AOij\nMvqsXHjZYk7TfRH/079Zy0qEWjD9lurfPiTX9qSQKSiXORvxpk/DQsfTsg==\n", + "transparency_entries": [ + { + "logIndex": "28175749", + "logId": { + "keyId": "0y8wo8MtY5wrdiIFohx7sHeI5oKDpK5vQhGHI6G+pJY=" + }, + "kindVersion": { + "kind": "dsse", + "version": "0.0.1" + }, + "integratedTime": "1717699145", + "inclusionPromise": { + "signedEntryTimestamp": "MEYCIQDx9J86FXVVe/PIoY5jHvlQJ85A6oZ2BiZ6/3ZYe3EeAQIhALl97dZebI/Smm0qQMdVVkbVznthHZCaSClN4djajx3G" + }, + "inclusionProof": { + "logIndex": "28160930", + "rootHash": "zWVcqCxxaF+b1WWfb+xZZlQYK4MdEr81Dd0KzOFu0Ko=", + "treeSize": "28160931", + "hashes": [ + "qDMDpEGtUE3c8CnnlguBb24eYIGo+nv0wGjN2Wdq1V8=", + "r3g45oVhy3zCnIK7lkTsH8Sg1Qdy0kH/CqfaBUE0yok=", + "XAv5fJtrNK1YPZwvB0JIVOOwWiLHk/oWoqzN1xzF9t4=", + "14fYRBMB/6rTWV5Qpei46FU+7rHmaqqLFV/K22kI6sg=", + "KhgfVnUZkrYVk1Je+xSJ3iT5wZMgut38srFhH/iVsWQ=", + "C9LjSdxA96yalX4DOGX/fV0kuhx9LLU1BERodtxE+No=", + "NwfjLTWUBnDymaU+Ca/ykaXOiGNRvIt5/5ZZDzEyTyA=", + "jKHh3ZbaWLoBLn5qZTUpiw9oPlStl/ZSfPmdsHte+AQ=", + "ekhZZrQ/riDDmsvqy3I4gAcbUBcoyoNMChiDAXsTu3Y=", + "oMHAlypWw/lk5Q9JHd9O5UJZ7bdcH6Gzs+zCES7YUKo=", + "Kn3gkyUwY86Ut3fWtexgSLtxteycn2p6k7Kj7qJFEDw=", + "IfPx7HUTjLRrRAy6mhkYP/7aq48i6G+Mk/NQidZPJk8=", + "Edul4W41O3EfxKEEMlX2nW0+GTgCv00nGmcpwhALgVA=", + "rBWB37+HwkTZgDv0rMtGBUoDI0UZqcgDZp48M6CaUlA=" + ], + "checkpoint": { + "envelope": "rekor.sigstage.dev - 8050909264565447525\n28160931\nzWVcqCxxaF+b1WWfb+xZZlQYK4MdEr81Dd0KzOFu0Ko=\n\n— rekor.sigstage.dev 0y8wozBFAiBOHi+eUTSSX6mrNLjQwoKJLum7cpnVpvAb8QwK+DnLngIhAO2170Q0xfbOMwrbF2sM80z1wkYhnlVRidI+/j4/k4JJ\n" + } + }, + "canonicalizedBody": "eyJhcGlWZXJzaW9uIjoiMC4wLjEiLCJraW5kIjoiZHNzZSIsInNwZWMiOnsiZW52ZWxvcGVIYXNoIjp7ImFsZ29yaXRobSI6InNoYTI1NiIsInZhbHVlIjoiZGY1MDk2Njg2NzNkMmY4MjAxOTQ2ZTBmNTliNmFiNzhiZWY0NmYyMTc5NTc5N2EzYjJkMTUyZjc3NmFmYzEyZSJ9LCJwYXlsb2FkSGFzaCI6eyJhbGdvcml0aG0iOiJzaGEyNTYiLCJ2YWx1ZSI6IjcyOTM0Yjc1YzgxODk3ZWE4Yjg4NTk0N2ExOWRjODE4ZWUzNjIwYzUwMzJhZmIzYjc4ODc3ZmJjYmI3MjMwYzEifSwic2lnbmF0dXJlcyI6W3sic2lnbmF0dXJlIjoiTUVVQ0lBdmtSSEZ1K24yenMvNGorVjNjTTIyRFZaSTF6cUs0TmpmbHphdEVRTWZnQWlFQW82VjNaN3RpaE9Ha1lpeXNGMTh4dFpWcWVPdDNyZHdWVmI3Nm1XcDhETWM9IiwidmVyaWZpZXIiOiJMUzB0TFMxQ1JVZEpUaUJEUlZKVVNVWkpRMEZVUlMwdExTMHRDazFKU1VNd2VrTkRRV3h0WjBGM1NVSkJaMGxWVG1FeEsyNVdaMnRQV0RGNGJITnpSSGxTZVhRd1JGbzJUVFZWZDBObldVbExiMXBKZW1vd1JVRjNUWGNLVG5wRlZrMUNUVWRCTVZWRlEyaE5UV015Ykc1ak0xSjJZMjFWZFZwSFZqSk5ValIzU0VGWlJGWlJVVVJGZUZaNllWZGtlbVJIT1hsYVV6RndZbTVTYkFwamJURnNXa2RzYUdSSFZYZElhR05PVFdwUmQwNXFRVEpOVkdkNlQxUkJNVmRvWTA1TmFsRjNUbXBCTWsxVVp6QlBWRUV4VjJwQlFVMUdhM2RGZDFsSUNrdHZXa2w2YWpCRFFWRlpTVXR2V2tsNmFqQkVRVkZqUkZGblFVVjVjbTA0YzNSTVVYZFFXQzlOWkZaVE5UQk9XalJuYlZoRlVFVm9ObXRaYkhab1JXOEtNRGM1V1dzeGJFMU5iVTF2WW5kR2RrbE9RemhNWXpBeWEyY3JNRE5DVFhOaldHSk5MMDlMZGpOR2JERnhTRGxRUTB0UFEwRllaM2RuWjBZd1RVRTBSd3BCTVZWa1JIZEZRaTkzVVVWQmQwbElaMFJCVkVKblRsWklVMVZGUkVSQlMwSm5aM0pDWjBWR1FsRmpSRUY2UVdSQ1owNVdTRkUwUlVablVWVnVPVGhuQ2twUmVXMXFTU3RrUmxWRVJXVmhOa05MWWxGdVoybzBkMGgzV1VSV1VqQnFRa0puZDBadlFWVmpXVmwzY0doU09GbHRMelU1T1dJd1FsSndMMWd2TDNJS1lqWjNkMGwzV1VSV1VqQlNRVkZJTDBKQ2EzZEdORVZXWkRKc2MySkhiR2hpVlVJMVlqTk9lbGxZU25CWlZ6UjFZbTFXTUUxRGQwZERhWE5IUVZGUlFncG5OemgzUVZGRlJVaHRhREJrU0VKNlQyazRkbG95YkRCaFNGWnBURzFPZG1KVE9YTmlNbVJ3WW1rNWRsbFlWakJoUkVGMVFtZHZja0puUlVWQldVOHZDazFCUlVsQ1EwRk5TRzFvTUdSSVFucFBhVGgyV2pKc01HRklWbWxNYlU1MllsTTVjMkl5WkhCaWFUbDJXVmhXTUdGRVEwSnBVVmxMUzNkWlFrSkJTRmNLWlZGSlJVRm5VamRDU0d0QlpIZENNVUZEYzNkMlRuaHZhVTF1YVRSa1oyMUxWalV3U0RCbk5VMWFXVU00Y0hkNmVURTFSRkZRTm5seVNWbzJRVUZCUWdwcUt6ZFpOeTlaUVVGQlVVUkJSVmwzVWtGSloxUlhlVkI1VXpKRFMxSnROVnBWWVZSM2JtZG1RblJ5VDBwdmVuZHNTV1pQVDJaWVNIbDVaV293UWxGRENrbEhRM2R0V1ZaTGFFNVROMHBpVlZSR1pVUmxPVEJUVjA1c2NIZHNOVmxCVmtSaUx6SkhSMFo0UjA1RFRVRnZSME5EY1VkVFRUUTVRa0ZOUkVFeVowRUtUVWRWUTAxUlEzaEpaV3R0VEU1a2FFRlROMGhXYnpaRFVtZHhWbEpvZERoU2FVWlBObXhpZVVkTE5HWkVkVVZSVDJzdlRWQmhRbXhTYUhOaFZYaDNaUXBxWmpkcVNUSnJRMDFEZHpWQlQybHFUWFp4YzFoSWFscFphemRVWmxKSUx6QTNPVnA1TUhGRlYycEVPV3gxY21aUWFWUllPWEZUVVV0VGFWaFBVblo0Q25CckwwUlJjMlpVYzJjOVBRb3RMUzB0TFVWT1JDQkRSVkpVU1VaSlEwRlVSUzB0TFMwdENnPT0ifV19fQ==" + } + ] + }, + "envelope": { + "statement": "eyJfdHlwZSI6Imh0dHBzOi8vaW4tdG90by5pby9TdGF0ZW1lbnQvdjEiLCJzdWJqZWN0IjpbeyJu\nYW1lIjoicmZjODc4NS0wLjEuMi1weTMtbm9uZS1hbnkud2hsIiwiZGlnZXN0Ijp7InNoYTI1NiI6\nImM0ZTkyZTllY2M4MjhiZWYyYWE3ZGJhMWRlOGFjOTgzNTExZjc1MzJhMGRmMTFjNzcwZDM5MDk5\nYTI1Y2YyMDEifX1dLCJwcmVkaWNhdGVUeXBlIjoiaHR0cHM6Ly9kb2NzLnB5cGkub3JnL2F0dGVz\ndGF0aW9ucy9wdWJsaXNoL3YxIiwicHJlZGljYXRlIjpudWxsfQ==\n", + "signature": "MEUCIAvkRHFu+n2zs/4j+V3cM22DVZI1zqK4NjflzatEQMfgAiEAo6V3Z7tihOGkYiysF18xtZVq\neOt3rdwVVb76mWp8DMc=\n" + } +} diff --git a/test/test_cli.py b/test/test_cli.py new file mode 100644 index 0000000..a7c145c --- /dev/null +++ b/test/test_cli.py @@ -0,0 +1,285 @@ +from __future__ import annotations + +import argparse +import logging +import os +import shutil +import sys +import tempfile +from pathlib import Path + +import pypi_attestation_models._cli +import pytest +import sigstore.oidc +from pypi_attestation_models._cli import ( + _logger, + _validate_files, + get_identity_token, + main, +) +from pypi_attestation_models._impl import Attestation +from sigstore.oidc import IdentityError + +ONLINE_TESTS = "CI" in os.environ or "TEST_INTERACTIVE" in os.environ +online = pytest.mark.skipif(not ONLINE_TESTS, reason="online tests not enabled") + + +_HERE = Path(__file__).parent +_ASSETS = _HERE / "assets" + +artifact_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl" +attestation_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl.publish.attestation" + + +def run_main_with_command(cmd: list[str]) -> None: + """Helper method to run the main function with a given command.""" + sys.argv[1:] = cmd + main() + + +def test_main_verbose_level(monkeypatch: pytest.MonkeyPatch) -> None: + def default_sign(_: argparse.Namespace) -> None: + return + + monkeypatch.setattr(pypi_attestation_models._cli, "_sign", default_sign) + + run_main_with_command(["sign", "-v", ""]) + assert _logger.level == logging.DEBUG + + run_main_with_command(["sign", "-v", "-v", ""]) + assert logging.getLogger().level == logging.DEBUG + + with pytest.raises(SystemExit) as exc_info: + run_main_with_command(["not-a-command"]) + + assert exc_info.value.code == 2 + + +@online +def test_get_identity_token(monkeypatch: pytest.MonkeyPatch) -> None: + # Happy paths + identity_token = get_identity_token(argparse.Namespace(staging=True)) + assert identity_token.in_validity_period() + + # Failure path + def return_invalid_token() -> str: + return "invalid-token" + + monkeypatch.setattr(sigstore.oidc, "detect_credential", return_invalid_token) + + # Invalid token + with pytest.raises(IdentityError, match="Identity token is malformed"): + get_identity_token(argparse.Namespace(staging=True)) + + +@online +def test_sign_command(tmp_path: Path) -> None: + # Happy path + copied_artifact = tmp_path / artifact_path.with_suffix(".copy.whl").name + shutil.copy(artifact_path, copied_artifact) + + run_main_with_command( + [ + "sign", + "--staging", + copied_artifact.as_posix(), + ] + ) + copied_artifact_attestation = Path(f"{copied_artifact}.publish.attestation") + assert copied_artifact_attestation.is_file() + + attestation = Attestation.model_validate_json(copied_artifact_attestation.read_text()) + assert attestation.version + + +@online +def test_sign_command_failures( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture +) -> None: + # Missing file + with pytest.raises(SystemExit): + run_main_with_command( + [ + "sign", + "--staging", + "not_exist.txt", + ] + ) + + assert "not_exist.txt is not a file" in caplog.text + caplog.clear() + + # Signature already exists + artifact = tmp_path / artifact_path.with_suffix(".copy2.whl").name + artifact.touch(exist_ok=False) + + artifact_attestation = Path(f"{artifact}.publish.attestation") + artifact_attestation.touch(exist_ok=False) + with pytest.raises(SystemExit): + run_main_with_command( + [ + "sign", + "--staging", + artifact.as_posix(), + ] + ) + + assert "already exists" in caplog.text + caplog.clear() + + # Invalid token + def return_invalid_token() -> str: + return "invalid-token" + + monkeypatch.setattr(sigstore.oidc, "detect_credential", return_invalid_token) + + with pytest.raises(SystemExit): + run_main_with_command( + [ + "sign", + "--staging", + artifact.as_posix(), + ] + ) + + assert "Failed to detect identity" in caplog.text + + +def test_inspect_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: + # Happy path + run_main_with_command(["inspect", attestation_path.as_posix()]) + assert attestation_path.as_posix() in caplog.text + assert "CN=sigstore-intermediate,O=sigstore.dev" in caplog.text + + run_main_with_command(["inspect", "--dump-bytes", attestation_path.as_posix()]) + assert "Signature:" in caplog.text + + # Failure paths + caplog.clear() + + # Failure because not an attestation + with tempfile.NamedTemporaryFile(suffix=".publish.attestation") as f: + fake_package_name = Path(f.name.removesuffix(".publish.attestation")) + fake_package_name.touch() + + with pytest.raises(SystemExit): + run_main_with_command(["inspect", fake_package_name.as_posix()]) + + assert "Invalid attestation" in caplog.text + + # Failure because file is missing + caplog.clear() + with pytest.raises(SystemExit): + run_main_with_command(["inspect", "not_a_file.txt"]) + + assert "not_a_file.txt is not a file." in caplog.text + + +def test_verify_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: + # Happy path + run_main_with_command( + [ + "verify", + "--staging", + "--identity", + "william@yossarian.net", + artifact_path.as_posix(), + ] + ) + assert f"OK: {attestation_path.as_posix()}" in caplog.text + + caplog.clear() + + # Failure from the Sigstore environment + run_main_with_command( + [ + "verify", + "--identity", + "william@yossarian.net", + artifact_path.as_posix(), + ] + ) + assert ( + "Verification failed: failed to build chain: unable to get local issuer certificate" + in caplog.text + ) + assert "OK:" not in caplog.text + + +def test_verify_command_failures(caplog: pytest.LogCaptureFixture) -> None: + # Failure because not an attestation + with pytest.raises(SystemExit): + with tempfile.NamedTemporaryFile(suffix=".publish.attestation") as f: + fake_package_name = Path(f.name.removesuffix(".publish.attestation")) + fake_package_name.touch() + + run_main_with_command( + [ + "verify", + "--staging", + "--identity", + "william@yossarian.net", + fake_package_name.as_posix(), + ] + ) + assert "Invalid attestation" in caplog.text + + # Failure because missing package file + caplog.clear() + with pytest.raises(SystemExit): + run_main_with_command( + [ + "verify", + "--staging", + "--identity", + "william@yossarian.net", + "not_a_file.txt", + ] + ) + + assert "not_a_file.txt is not a file." in caplog.text + + # Failure because missing attestation file + caplog.clear() + with pytest.raises(SystemExit): + with tempfile.NamedTemporaryFile() as f: + run_main_with_command( + [ + "verify", + "--staging", + "--identity", + "william@yossarian.net", + f.name, + ] + ) + + assert "is not a file." in caplog.text + + +def test_validate_files(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None: + # Happy path + file_1_exist = tmp_path / "file1" + file_1_exist.touch() + + file_2_exist = tmp_path / "file2" + file_2_exist.touch() + + _validate_files([file_1_exist, file_2_exist], should_exist=True) + assert True # No exception raised + + file_1_missing = tmp_path / "file3" + file_2_missing = tmp_path / "file4" + _validate_files([file_1_missing, file_2_missing], should_exist=False) + assert True + + # Failure paths + with pytest.raises(SystemExit): + _validate_files([file_1_missing, file_2_exist], should_exist=True) + + assert f"{file_1_missing} is not a file." in caplog.text + + caplog.clear() + with pytest.raises(SystemExit): + _validate_files([file_1_missing, file_2_exist], should_exist=False) + + assert f"{file_2_exist} already exists." in caplog.text diff --git a/test/test_impl.py b/test/test_impl.py index 3f4b357..efb2c87 100644 --- a/test/test_impl.py +++ b/test/test_impl.py @@ -47,6 +47,16 @@ def test_roundtrip(self, id_token: IdentityToken) -> None: roundtripped_attestation = impl.sigstore_to_pypi(bundle) roundtripped_attestation.verify(verifier, policy.UnsafeNoOp(), artifact_path) + def test_sign_invalid_dist_filename(self, tmp_path: Path) -> None: + bad_dist = tmp_path / "invalid-name.tar.gz" + bad_dist.write_bytes(b"junk") + + with pytest.raises( + impl.AttestationError, + match=r"Invalid sdist filename \(invalid version\): invalid-name\.tar\.gz", + ): + impl.Attestation.sign(pretend.stub(), bad_dist) + def test_verify_github_attested(self) -> None: verifier = Verifier.production() pol = policy.AllOf(