Skip to content

Commit

Permalink
remove Verifier param from verify() API (#62)
Browse files Browse the repository at this point in the history
Co-authored-by: Facundo Tuesca <facundo.tuesca@trailofbits.com>
  • Loading branch information
woodruffw and facutuesca authored Oct 22, 2024
1 parent a68278c commit b172958
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 64 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ classifiers = [
dependencies = [
"cryptography",
"packaging",
"pyasn1 ~= 0.6",
"pydantic",
"sigstore~=3.3",
"sigstore~=3.4",
"sigstore-protobuf-specs",
]
requires-python = ">=3.11"
Expand Down
5 changes: 2 additions & 3 deletions src/pypi_attestations/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pydantic import ValidationError
from sigstore.oidc import IdentityError, IdentityToken, Issuer
from sigstore.sign import SigningContext
from sigstore.verify import Verifier, policy
from sigstore.verify import policy

from pypi_attestations import Attestation, AttestationError, VerificationError, __version__
from pypi_attestations._impl import Distribution
Expand Down Expand Up @@ -256,7 +256,6 @@ def _inspect(args: argparse.Namespace) -> None:

def _verify(args: argparse.Namespace) -> None:
"""Verify the files passed as argument."""
verifier: Verifier = Verifier.staging() if args.staging else Verifier.production()
pol = policy.Identity(identity=args.identity)

# Validate that both the attestations and files exists
Expand Down Expand Up @@ -291,7 +290,7 @@ def _verify(args: argparse.Namespace) -> None:
_die(f"Invalid Python package distribution: {e}")

try:
attestation.verify(verifier, pol, dist)
attestation.verify(pol, dist, staging=args.staging)
except VerificationError as verification_error:
_die(f"Verification failed for {input}: {verification_error}")

Expand Down
141 changes: 133 additions & 8 deletions src/pypi_attestations/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from packaging.utils import parse_sdist_filename, parse_wheel_filename
from pyasn1.codec.der.decoder import decode as der_decode
from pyasn1.type.char import UTF8String
from pydantic import Base64Encoder, BaseModel, ConfigDict, EncodedBytes, Field, field_validator
from pydantic.alias_generators import to_snake
from pydantic_core import ValidationError
Expand All @@ -23,15 +25,16 @@
from sigstore.dsse import Error as DsseError
from sigstore.models import Bundle, LogEntry
from sigstore.sign import ExpiredCertificate, ExpiredIdentity
from sigstore.verify import Verifier, policy
from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope
from sigstore_protobuf_specs.io.intoto import Signature as _Signature

if TYPE_CHECKING:
from pathlib import Path # pragma: no cover
if TYPE_CHECKING: # pragma: no cover
from pathlib import Path

from sigstore.sign import Signer # pragma: no cover
from sigstore.verify import Verifier # pragma: no cover
from sigstore.verify.policy import VerificationPolicy # pragma: no cover
from cryptography.x509 import Certificate
from sigstore.sign import Signer
from sigstore.verify.policy import VerificationPolicy


class Base64EncoderSansNewline(Base64Encoder):
Expand Down Expand Up @@ -180,14 +183,36 @@ def sign(cls, signer: Signer, dist: Distribution) -> Attestation:

def verify(
self,
verifier: Verifier,
policy: VerificationPolicy,
identity: VerificationPolicy | Publisher,
dist: Distribution,
*,
staging: bool = False,
) -> tuple[str, dict[str, Any] | None]:
"""Verify against an existing Python distribution.
The `identity` can be an object confirming to
`sigstore.policy.VerificationPolicy` or a `Publisher`, which will be
transformed into an appropriate verification policy.
By default, Sigstore's production verifier will be used. The
`staging` parameter can be toggled to enable the staging verifier
instead.
On failure, raises an appropriate subclass of `AttestationError`.
"""
# NOTE: Can't do `isinstance` with `Publisher` since it's
# a `_GenericAlias`; instead we punch through to the inner
# `_Publisher` union.
if isinstance(identity, _Publisher):
policy = identity._as_policy() # noqa: SLF001
else:
policy = identity

if staging:
verifier = Verifier.staging()
else:
verifier = Verifier.production()

bundle = self.to_bundle()
try:
type_, payload = verifier.verify_dsse(bundle, policy)
Expand Down Expand Up @@ -364,6 +389,82 @@ class _PublisherBase(BaseModel):
kind: str
claims: dict[str, Any] | None = None

def _as_policy(self) -> VerificationPolicy:
"""Return an appropriate `sigstore.policy.VerificationPolicy` for this publisher."""
raise NotImplementedError # pragma: no cover


class _GitHubTrustedPublisherPolicy:
"""A custom sigstore-python policy for verifying against a GitHub-based Trusted Publisher."""

def __init__(self, repository: str, workflow: str) -> None:
self._repository = repository
self._workflow = workflow
# This policy must also satisfy some baseline underlying policies:
# the issuer must be GitHub Actions, and the repo must be the one
# we expect.
self._subpolicy = policy.AllOf(
[
policy.OIDCIssuerV2("https://token.actions.githubusercontent.com"),
policy.OIDCSourceRepositoryURI(f"https://github.com/{self._repository}"),
]
)

@classmethod
def _der_decode_utf8string(cls, der: bytes) -> str:
"""Decode a DER-encoded UTF8String."""
return der_decode(der, UTF8String)[0].decode() # type: ignore[no-any-return]

def verify(self, cert: Certificate) -> None:
"""Verify the certificate against the Trusted Publisher identity."""
self._subpolicy.verify(cert)

# This process has a few annoying steps, since a Trusted Publisher
# isn't aware of the commit or ref it runs on, while Sigstore's
# leaf certificate claims (like GitHub Actions' OIDC claims) only
# ever encode the workflow filename (which we need to check) next
# to the ref/sha (which we can't check).
#
# To get around this, we:
# (1) extract the `Build Config URI` extension;
# (2) extract the `Source Repository Digest` and
# `Source Repository Ref` extensions;
# (3) build the *expected* URI with the user-controlled
# Trusted Publisher identity *with* (2)
# (4) compare (1) with (3)

# (1) Extract the build config URI, which looks like this:
# https://github.com/OWNER/REPO/.github/workflows/WORKFLOW@REF
# where OWNER/REPO and WORKFLOW are controlled by the TP identity,
# and REF is controlled by the certificate's own claims.
build_config_uri = cert.extensions.get_extension_for_oid(policy._OIDC_BUILD_CONFIG_URI_OID) # noqa: SLF001
raw_build_config_uri = self._der_decode_utf8string(build_config_uri.value.public_bytes())

# (2) Extract the source repo digest and ref.
source_repo_digest = cert.extensions.get_extension_for_oid(
policy._OIDC_SOURCE_REPOSITORY_DIGEST_OID # noqa: SLF001
)
sha = self._der_decode_utf8string(source_repo_digest.value.public_bytes())

source_repo_ref = cert.extensions.get_extension_for_oid(
policy._OIDC_SOURCE_REPOSITORY_REF_OID # noqa: SLF001
)
ref = self._der_decode_utf8string(source_repo_ref.value.public_bytes())

# (3)-(4): Build the expected URIs and compare them
for suffix in [sha, ref]:
expected = (
f"https://github.com/{self._repository}/.github/workflows/{self._workflow}@{suffix}"
)
if raw_build_config_uri == expected:
return

# If none of the expected URIs matched, the policy fails.
raise sigstore.errors.VerificationError(
f"Certificate's Build Config URI ({build_config_uri}) does not match expected "
f"Trusted Publisher ({self._workflow} @ {self._repository})"
)


class GitHubPublisher(_PublisherBase):
"""A GitHub-based Trusted Publisher."""
Expand All @@ -388,6 +489,9 @@ class GitHubPublisher(_PublisherBase):
action was performed from.
"""

def _as_policy(self) -> VerificationPolicy:
return _GitHubTrustedPublisherPolicy(self.repository, self.workflow)


class GitLabPublisher(_PublisherBase):
"""A GitLab-based Trusted Publisher."""
Expand All @@ -406,8 +510,29 @@ class GitLabPublisher(_PublisherBase):
The optional environment that the publishing action was performed from.
"""

def _as_policy(self) -> VerificationPolicy:
policies: list[VerificationPolicy] = [
policy.OIDCIssuerV2("https://gitlab.com"),
policy.OIDCSourceRepositoryURI(f"https://gitlab.com/{self.repository}"),
]

if not self.claims:
raise VerificationError("refusing to build a policy without claims")

if ref := self.claims.get("ref"):
policies.append(
policy.OIDCBuildConfigURI(
f"https://gitlab.com/{self.repository}//.gitlab-ci.yml@{ref}"
)
)
else:
raise VerificationError("refusing to build a policy without a ref claim")

return policy.AllOf(policies)


Publisher = Annotated[GitHubPublisher | GitLabPublisher, Field(discriminator="kind")]
_Publisher = GitHubPublisher | GitLabPublisher
Publisher = Annotated[_Publisher, Field(discriminator="kind")]


class AttestationBundle(BaseModel):
Expand Down
Loading

0 comments on commit b172958

Please sign in to comment.