Skip to content

Commit

Permalink
Move free functions to Attestation (#24)
Browse files Browse the repository at this point in the history
Modified functions:
- sigstore_to_pypi : from_bundle
- pypi_to_sigstore : to_bundle
  • Loading branch information
DarkaMaul authored Jun 27, 2024
1 parent 5d3fc53 commit a2e48d3
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 84 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Use these APIs to create a PEP 740-compliant `Attestation` object by signing a P
```python
from pathlib import Path
from pypi_attestation_models import Attestation, AttestationPayload
from pypi_attestation_models import Attestation
from sigstore.oidc import Issuer
from sigstore.sign import SigningContext
from sigstore.verify import Verifier, policy
Expand All @@ -88,7 +88,7 @@ issuer = Issuer.production()
identity_token = issuer.identity_token()
signing_ctx = SigningContext.production()
with signing_ctx.signer(identity_token, cache=True) as signer:
attestation = AttestationPayload.from_dist(artifact_path).sign(signer)
attestation = Attestation.sign(signer, artifact_path)
print(attestation.model_dump_json())
Expand All @@ -102,25 +102,25 @@ attestation.verify(verifier, policy, attestation_path)
### Low-level model conversions
These conversions assume that any Sigstore Bundle used as an input was created
by signing an `AttestationPayload` object.
by signing a distribution file.
```python
from pathlib import Path
from pypi_attestation_models import pypi_to_sigstore, sigstore_to_pypi, Attestation
from pypi_attestation_models import Attestation
from sigstore.models import Bundle
# Sigstore Bundle -> PEP 740 Attestation object
bundle_path = Path("test_package-0.0.1-py3-none-any.whl.sigstore")
with bundle_path.open("rb") as f:
sigstore_bundle = Bundle.from_json(f.read())
attestation_object = sigstore_to_pypi(sigstore_bundle)
attestation_object = Attestation.from_bundle(sigstore_bundle)
print(attestation_object.model_dump_json())
# PEP 740 Attestation object -> Sigstore Bundle
attestation_path = Path("attestation.json")
with attestation_path.open("rb") as f:
attestation = Attestation.model_validate_json(f.read())
bundle = pypi_to_sigstore(attestation)
bundle = attestation.to_bundle()
print(bundle.to_json())
```
Expand Down
4 changes: 0 additions & 4 deletions src/pypi_attestation_models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
TransparencyLogEntry,
VerificationError,
VerificationMaterial,
pypi_to_sigstore,
sigstore_to_pypi,
)

__all__ = [
Expand All @@ -22,6 +20,4 @@
"TransparencyLogEntry",
"VerificationError",
"VerificationMaterial",
"pypi_to_sigstore",
"sigstore_to_pypi",
]
117 changes: 59 additions & 58 deletions src/pypi_attestation_models/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def sign(cls, signer: Signer, dist: Path) -> Attestation:
)
bundle = signer.sign_dsse(stmt)

return sigstore_to_pypi(bundle)
return Attestation.from_bundle(bundle)

def verify(
self, verifier: Verifier, policy: VerificationPolicy, dist: Path
Expand All @@ -129,7 +129,7 @@ def verify(
# our minimum supported Python is >=3.11
expected_digest = _sha256_streaming(io).hex()

bundle = pypi_to_sigstore(self)
bundle = self.to_bundle()
try:
type_, payload = verifier.verify_dsse(bundle, policy)
except sigstore.errors.VerificationError as err:
Expand Down Expand Up @@ -168,6 +168,63 @@ def verify(

return statement.predicate_type, statement.predicate

def to_bundle(self) -> Bundle:
"""Convert a PyPI attestation object as defined in PEP 740 into a Sigstore Bundle."""
cert_bytes = self.verification_material.certificate
statement = self.envelope.statement
signature = self.envelope.signature

evp = DsseEnvelope(
_Envelope(
payload=statement,
payload_type=DsseEnvelope._TYPE, # noqa: SLF001
signatures=[_Signature(sig=signature)],
)
)

tlog_entry = self.verification_material.transparency_entries[0]
try:
certificate = x509.load_der_x509_certificate(cert_bytes)
except ValueError as err:
raise ConversionError("invalid X.509 certificate") from err

try:
log_entry = LogEntry._from_dict_rekor(tlog_entry) # noqa: SLF001
except (ValidationError, sigstore.errors.Error) as err:
raise ConversionError("invalid transparency log entry") from err

return Bundle._from_parts( # noqa: SLF001
cert=certificate,
content=evp,
log_entry=log_entry,
)

@classmethod
def from_bundle(cls, sigstore_bundle: Bundle) -> Attestation:
"""Convert a Sigstore Bundle into a PyPI attestation as defined in PEP 740."""
certificate = sigstore_bundle.signing_certificate.public_bytes(
encoding=serialization.Encoding.DER
)

envelope = sigstore_bundle._inner.dsse_envelope # noqa: SLF001

if len(envelope.signatures) != 1:
raise ConversionError(f"expected exactly one signature, got {len(envelope.signatures)}")

return cls(
version=1,
verification_material=VerificationMaterial(
certificate=base64.b64encode(certificate),
transparency_entries=[
TransparencyLogEntry(sigstore_bundle.log_entry._to_dict_rekor()) # noqa: SLF001
],
),
envelope=Envelope(
statement=base64.b64encode(envelope.payload),
signature=base64.b64encode(envelope.signatures[0].sig),
),
)


class Envelope(BaseModel):
"""The attestation envelope, containing the attested-for payload and its signature."""
Expand All @@ -186,62 +243,6 @@ class Envelope(BaseModel):
"""


def sigstore_to_pypi(sigstore_bundle: Bundle) -> Attestation:
"""Convert a Sigstore Bundle into a PyPI attestation as defined in PEP 740."""
certificate = sigstore_bundle.signing_certificate.public_bytes(
encoding=serialization.Encoding.DER
)

envelope = sigstore_bundle._inner.dsse_envelope # noqa: SLF001

if len(envelope.signatures) != 1:
raise ConversionError(f"expected exactly one signature, got {len(envelope.signatures)}")

return Attestation(
version=1,
verification_material=VerificationMaterial(
certificate=base64.b64encode(certificate),
transparency_entries=[TransparencyLogEntry(sigstore_bundle.log_entry._to_dict_rekor())], # noqa: SLF001
),
envelope=Envelope(
statement=base64.b64encode(envelope.payload),
signature=base64.b64encode(envelope.signatures[0].sig),
),
)


def pypi_to_sigstore(pypi_attestation: Attestation) -> Bundle:
"""Convert a PyPI attestation object as defined in PEP 740 into a Sigstore Bundle."""
cert_bytes = pypi_attestation.verification_material.certificate
statement = pypi_attestation.envelope.statement
signature = pypi_attestation.envelope.signature

evp = DsseEnvelope(
_Envelope(
payload=statement,
payload_type=DsseEnvelope._TYPE, # noqa: SLF001
signatures=[_Signature(sig=signature)],
)
)

tlog_entry = pypi_attestation.verification_material.transparency_entries[0]
try:
certificate = x509.load_der_x509_certificate(cert_bytes)
except ValueError as err:
raise ConversionError("invalid X.509 certificate") from err

try:
log_entry = LogEntry._from_dict_rekor(tlog_entry) # noqa: SLF001
except (ValidationError, sigstore.errors.Error) as err:
raise ConversionError("invalid transparency log entry") from err

return Bundle._from_parts( # noqa: SLF001
cert=certificate,
content=evp,
log_entry=log_entry,
)


def _ultranormalize_dist_filename(dist: str) -> str:
"""Return an "ultranormalized" form of the given distribution filename.
Expand Down
43 changes: 28 additions & 15 deletions test/test_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def test_roundtrip(self, id_token: IdentityToken) -> None:
attestation.verify(verifier, policy.UnsafeNoOp(), artifact_path)

# converting to a bundle and verifying as a bundle also works
bundle = impl.pypi_to_sigstore(attestation)
bundle = attestation.to_bundle()
verifier.verify_dsse(bundle, policy.UnsafeNoOp())

# converting back also works
roundtripped_attestation = impl.sigstore_to_pypi(bundle)
roundtripped_attestation = impl.Attestation.from_bundle(bundle)
roundtripped_attestation.verify(verifier, policy.UnsafeNoOp(), artifact_path)

def test_sign_invalid_dist_filename(self, tmp_path: Path) -> None:
Expand All @@ -69,7 +69,7 @@ def test_verify_github_attested(self) -> None:
)

bundle = Bundle.from_json(gh_signed_bundle_path.read_bytes())
attestation = impl.sigstore_to_pypi(bundle)
attestation = impl.Attestation.from_bundle(bundle)

predicate_type, predicate = attestation.verify(verifier, pol, gh_signed_artifact_path)
assert predicate_type == "https://docs.pypi.org/attestations/publish/v1"
Expand All @@ -89,7 +89,7 @@ def test_verify(self) -> None:
assert predicate is None

# convert the attestation to a bundle and verify it that way too
bundle = impl.pypi_to_sigstore(attestation)
bundle = attestation.to_bundle()
verifier.verify_dsse(bundle, policy.UnsafeNoOp())

def test_verify_digest_mismatch(self, tmp_path: Path) -> None:
Expand Down Expand Up @@ -178,7 +178,10 @@ def test_verify_too_many_subjects(self) -> None:

verifier = pretend.stub(
verify_dsse=pretend.call_recorder(
lambda bundle, policy: ("application/vnd.in-toto+json", statement.encode())
lambda bundle, policy: (
"application/vnd.in-toto+json",
statement.encode(),
)
)
)
pol = pretend.stub()
Expand All @@ -203,7 +206,10 @@ def test_verify_subject_missing_name(self) -> None:

verifier = pretend.stub(
verify_dsse=pretend.call_recorder(
lambda bundle, policy: ("application/vnd.in-toto+json", statement.encode())
lambda bundle, policy: (
"application/vnd.in-toto+json",
statement.encode(),
)
)
)
pol = pretend.stub()
Expand All @@ -219,7 +225,8 @@ def test_verify_subject_invalid_name(self) -> None:
.subjects(
[
_Subject(
name="foo-bar-invalid-wheel.whl", digest=_DigestSet(root={"sha256": "abcd"})
name="foo-bar-invalid-wheel.whl",
digest=_DigestSet(root={"sha256": "abcd"}),
),
]
)
Expand All @@ -230,7 +237,10 @@ def test_verify_subject_invalid_name(self) -> None:

verifier = pretend.stub(
verify_dsse=pretend.call_recorder(
lambda bundle, policy: ("application/vnd.in-toto+json", statement.encode())
lambda bundle, policy: (
"application/vnd.in-toto+json",
statement.encode(),
)
)
)
pol = pretend.stub()
Expand All @@ -241,28 +251,28 @@ def test_verify_subject_invalid_name(self) -> None:
attestation.verify(verifier, pol, artifact_path)


def test_sigstore_to_pypi_missing_signatures() -> None:
def test_from_bundle_missing_signatures() -> None:
bundle = Bundle.from_json(bundle_path.read_bytes())
bundle._inner.dsse_envelope.signatures = [] # noqa: SLF001

with pytest.raises(impl.ConversionError, match="expected exactly one signature, got 0"):
impl.sigstore_to_pypi(bundle)
impl.Attestation.from_bundle(bundle)


def test_pypi_to_sigstore_invalid_cert() -> None:
def test_to_bundle_invalid_cert() -> None:
attestation = impl.Attestation.model_validate_json(attestation_path.read_bytes())
attestation.verification_material.certificate = b"foo"

with pytest.raises(impl.ConversionError, match="invalid X.509 certificate"):
impl.pypi_to_sigstore(attestation)
attestation.to_bundle()


def test_pypi_to_sigstore_invalid_tlog_entry() -> None:
def test_to_bundle_invalid_tlog_entry() -> None:
attestation = impl.Attestation.model_validate_json(attestation_path.read_bytes())
attestation.verification_material.transparency_entries[0].clear()

with pytest.raises(impl.ConversionError, match="invalid transparency log entry"):
impl.pypi_to_sigstore(attestation)
attestation.to_bundle()


class TestPackaging:
Expand Down Expand Up @@ -295,7 +305,10 @@ def test_exception_types(self) -> None:
("foo-1.0-1whatever-py3-none-any.whl", "foo-1.0-1whatever-py3-none-any.whl"),
# wheel: compressed tag sets are sorted, even when conflicting or nonsense
("foo-1.0-py3.py2-none-any.whl", "foo-1.0-py2.py3-none-any.whl"),
("foo-1.0-py3.py2-none.abi3.cp37-any.whl", "foo-1.0-py2.py3-abi3.cp37.none-any.whl"),
(
"foo-1.0-py3.py2-none.abi3.cp37-any.whl",
"foo-1.0-py2.py3-abi3.cp37.none-any.whl",
),
(
"foo-1.0-py3.py2-none.abi3.cp37-linux_x86_64.any.whl",
"foo-1.0-py2.py3-abi3.cp37.none-any.linux_x86_64.whl",
Expand Down

0 comments on commit a2e48d3

Please sign in to comment.