Skip to content

Commit

Permalink
Get provenance URL using the simple API
Browse files Browse the repository at this point in the history
Signed-off-by: Facundo Tuesca <facundo.tuesca@trailofbits.com>
  • Loading branch information
facutuesca committed Oct 29, 2024
1 parent f77ffaa commit 88688a8
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 52 deletions.
73 changes: 50 additions & 23 deletions src/pip_plugin_pep740/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,49 @@
PluginType = Literal["dist-inspector"]


def _get_provenance_url(filename: str, index_host: str) -> str | None:
if filename.endswith(".tar.gz"):
name, _ = parse_sdist_filename(filename)
elif filename.endswith(".whl"):
name, _, _, _ = parse_wheel_filename(filename)
else:
# Unexpected file, ignore
return None

simple_index_package_url = (
builder.URIBuilder()
.add_scheme("https")
.add_host(index_host)
.add_path(f"simple/{name}/")
.geturl()
)
try:
r = requests.get(
url=simple_index_package_url,
headers={"Accept": "application/vnd.pypi.simple.v1+json"},
timeout=5,
)
r.raise_for_status()
except requests.RequestException as e:
msg = f"Error accessing PyPI simple API: {e}"
raise ValueError(msg) from e

try:
package_json = r.json()
except JSONDecodeError as e:
msg = f"Invalid PyPI simple index JSON response: {e}"
raise ValueError(msg) from e

matching_artifacts = [f for f in package_json["files"] if f["filename"] == filename]
if len(matching_artifacts) == 0:
msg = f"Could not find file {filename} using the simple API at {index_host}"
raise ValueError(msg)

artifact_info = matching_artifacts[0]
provenance_url: str | None = artifact_info.get("provenance")
return provenance_url


def _get_provenance(filename: str, url: str) -> Provenance | None:
"""Download the provenance for a given distribution."""
url_authority = rfc3986.api.uri_reference(url).authority
Expand All @@ -33,45 +76,29 @@ def _get_provenance(filename: str, url: str) -> Provenance | None:
else:
return None

if filename.endswith(".tar.gz"):
name, version = parse_sdist_filename(filename)
elif filename.endswith(".whl"):
name, version, _, _ = parse_wheel_filename(filename)
else:
# Unexpected file, ignore
provenance_url = _get_provenance_url(filename=filename, index_host=index_host)
if provenance_url is None:
# Can't verify artifacts uploaded without attestations
return None

provenance_url = (
builder.URIBuilder()
.add_scheme("https")
.add_host(index_host)
.add_path(f"integrity/{name}/{version}/{filename}/provenance")
.geturl()
)
try:
r = requests.get(
url=provenance_url,
params={"Accept": "application/vnd.pypi.integrity.v1+json"},
headers={"Accept": "application/vnd.pypi.integrity.v1+json"},
timeout=5,
)
r.raise_for_status()
except requests.HTTPError as e:
# If there is no provenance available, continue
if e.response.status_code == requests.codes.not_found:
return None
raise ValueError(e) from e
except requests.RequestException as e:
msg = f"Error downloading provenance file: {e}"
raise ValueError(msg) from e

try:
return Provenance.model_validate(r.json())
except ValidationError as e:
msg = f"Invalid provenance: {e}"
raise ValueError(msg) from e
except JSONDecodeError as e:
msg = f"Invalid provenance JSON: {e}"
raise ValueError(msg) from e
except ValidationError as e:
msg = f"Invalid provenance: {e}"
raise ValueError(msg) from e


def plugin_type() -> PluginType:
Expand Down
132 changes: 103 additions & 29 deletions test/test_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
import pip_plugin_pep740

PACKAGE_NAME = "abi3info"
PACKAGE_VERSION_1 = "2024.10.8"
DIST_FILE_1 = Path("test/assets/abi3info-2024.10.8-py3-none-any.whl")
PROVENANCE_FILE_1 = Path("test/assets/abi3info-2024.10.8-py3-none-any.whl.provenance")

PACKAGE_VERSION_2 = "2024.10.3"
DIST_FILE_2 = Path("test/assets/abi3info-2024.10.3-py3-none-any.whl")
PROVENANCE_FILE_2 = Path("test/assets/abi3info-2024.10.3-py3-none-any.whl.provenance")

PACKAGE_VERSION_3 = "2024.10.8"
DIST_FILE_3 = Path("test/assets/abi3info-2024.10.8.tar.gz")
PROVENANCE_FILE_3 = Path("test/assets/abi3info-2024.10.8.tar.gz.provenance")

Expand All @@ -38,18 +35,26 @@ def test_plugin_type(self) -> None:
assert pip_plugin_pep740.plugin_type() == "dist-inspector"

@pytest.mark.parametrize(
("version", "filename", "provenance_file", "digest"),
("filename", "provenance_file", "digest"),
[
(PACKAGE_VERSION_1, DIST_FILE_1.name, PROVENANCE_FILE_1, DIST_DIGEST_1),
(PACKAGE_VERSION_3, DIST_FILE_3.name, PROVENANCE_FILE_3, DIST_DIGEST_3),
(DIST_FILE_1.name, PROVENANCE_FILE_1, DIST_DIGEST_1),
(DIST_FILE_3.name, PROVENANCE_FILE_3, DIST_DIGEST_3),
],
)
def test_pre_download_valid_provenance(
self, version: str, filename: str, provenance_file: Path, digest: str
self, filename: str, provenance_file: Path, digest: str
) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{version}/{filename}/provenance",
f"https://test.pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{filename}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{filename}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
text=provenance_file.read_text(),
)
pip_plugin_pep740.pre_download(
Expand Down Expand Up @@ -77,8 +82,8 @@ def test_pre_download_invalid_filename(self) -> None:
def test_pre_download_no_provenance_found(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
status_code=404,
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}"}}]}}',
)
assert (
pip_plugin_pep740.pre_download(
Expand All @@ -89,20 +94,47 @@ def test_pre_download_no_provenance_found(self) -> None:
is None
)

def test_pre_download_index_http_error(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/simple/{PACKAGE_NAME}/",
status_code=403,
)
with pytest.raises(ValueError, match="403 Client Error"):
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_index_timeout(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/simple/{PACKAGE_NAME}/",
exc=requests.exceptions.ConnectTimeout,
)
with pytest.raises(ValueError, match="Error accessing PyPI simple API"):
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_provenance_download_error(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
status_code=403,
)
with pytest.raises(ValueError, match="403 Client Error"):
assert (
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)
is None
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_not_pypi_url(self) -> None:
Expand All @@ -118,23 +150,28 @@ def test_pre_download_not_pypi_url(self) -> None:
def test_pre_download_provenance_timeout(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
exc=requests.exceptions.ConnectTimeout,
)
with pytest.raises(ValueError, match="Error downloading provenance file"):
assert (
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)
is None
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_invalid_provenance(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
text=PROVENANCE_FILE_2.read_text(),
)
with pytest.raises(
Expand All @@ -147,10 +184,43 @@ def test_pre_download_invalid_provenance(self) -> None:
digest=DIST_DIGEST_1,
)

def test_pre_download_invalid_index_json(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(f"https://pypi.org/simple/{PACKAGE_NAME}/", text="invalidjson")
with pytest.raises(
ValueError,
match="Invalid PyPI simple index JSON response",
):
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_missing_package_from_index_json(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_2.name}", "provenance": "https://provenance_url"}}]}}',
)
with pytest.raises(
ValueError,
match=f"Could not find file {DIST_FILE_1.name} using the simple API at pypi.org",
):
pip_plugin_pep740.pre_download(
url="https://files.pythonhosted.org/some_path",
filename=DIST_FILE_1.name,
digest=DIST_DIGEST_1,
)

def test_pre_download_invalid_provenance_json(self) -> None:
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
text="invalidjson",
)
with pytest.raises(
Expand All @@ -168,7 +238,11 @@ def test_pre_download_malformed_provenance_valid_json(self) -> None:
provenance["attestation_bundles"] = "invalid"
with requests_mock.Mocker(real_http=True) as m:
m.get(
f"https://pypi.org/integrity/{PACKAGE_NAME}/{PACKAGE_VERSION_1}/{DIST_FILE_1.name}/provenance",
f"https://pypi.org/simple/{PACKAGE_NAME}/",
text=f'{{"files": [{{"filename": "{DIST_FILE_1.name}", "provenance": "https://provenance_url"}}]}}',
)
m.get(
"https://provenance_url",
text=json.dumps(provenance),
)
with pytest.raises(
Expand Down

0 comments on commit 88688a8

Please sign in to comment.