Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of uploading with trusted publishing authentication #1194

Merged
merged 12 commits into from
Dec 11, 2024
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"rfc3986 >= 1.4.0",
"rich >= 12.0.0",
"packaging",
"id",
]
dynamic = ["version"]

Expand Down
83 changes: 80 additions & 3 deletions twine/auth.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import functools
import getpass
import json
import logging
from typing import TYPE_CHECKING, Callable, Optional, Type, cast
from urllib.parse import urlparse

from id import AmbientCredentialError # type: ignore
from id import detect_credential

# keyring has an indirect dependency on PyCA cryptography, which has no
# pre-built wheels for ppc64le and s390x, see #1158.
if TYPE_CHECKING:
import keyring
from keyring.errors import NoKeyringError
else:
try:
import keyring
from keyring.errors import NoKeyringError
except ModuleNotFoundError: # pragma: no cover
keyring = None
NoKeyringError = None

from twine import exceptions
from twine import utils
Expand All @@ -28,7 +36,11 @@ def __init__(


class Resolver:
def __init__(self, config: utils.RepositoryConfig, input: CredentialInput) -> None:
def __init__(
self,
config: utils.RepositoryConfig,
input: CredentialInput,
) -> None:
self.config = config
self.input = input

Expand Down Expand Up @@ -57,9 +69,65 @@ def password(self) -> Optional[str]:
self.input.password,
self.config,
key="password",
prompt_strategy=self.password_from_keyring_or_prompt,
prompt_strategy=self.password_from_keyring_or_trusted_publishing_or_prompt,
)

def make_trusted_publishing_token(self) -> Optional[str]:
# Trusted publishing (OpenID Connect): get one token from the CI
# system, and exchange that for a PyPI token.
repository_domain = cast(str, urlparse(self.system).netloc)
session = utils.make_requests_session()

# Indices are expected to support `https://{domain}/_/oidc/audience`,
# which tells OIDC exchange clients which audience to use.
audience_url = f"https://{repository_domain}/_/oidc/audience"
resp = session.get(audience_url, timeout=5)
resp.raise_for_status()
audience = cast(str, resp.json()["audience"])

try:
oidc_token = detect_credential(audience)
except AmbientCredentialError as e:
# If we get here, we're on a supported CI platform for trusted
# publishing, and we have not been given any token, so we can error.
raise exceptions.TrustedPublishingFailure(
"Unable to retrieve an OIDC token from the CI platform for "
f"trusted publishing {e}"
)

if oidc_token is None:
logger.debug("This environment is not supported for trusted publishing")
return None # Fall back to prompting for a token (if possible)

logger.debug("Got OIDC token for audience %s", audience)

token_exchange_url = f"https://{repository_domain}/_/oidc/mint-token"

mint_token_resp = session.post(
token_exchange_url,
json={"token": oidc_token},
timeout=5, # S113 wants a timeout
)
try:
mint_token_payload = mint_token_resp.json()
except json.JSONDecodeError:
raise exceptions.TrustedPublishingFailure(
"The token-minting request returned invalid JSON"
)

if not mint_token_resp.ok:
reasons = "\n".join(
f'* `{error["code"]}`: {error["description"]}'
for error in mint_token_payload["errors"]
)
raise exceptions.TrustedPublishingFailure(
"The token request failed; the index server gave the following "
f"reasons:\n\n{reasons}"
)

logger.debug("Minted upload token for trusted publishing")
return cast(str, mint_token_payload["token"])

@property
def system(self) -> Optional[str]:
return self.config["repository"]
Expand Down Expand Up @@ -90,6 +158,8 @@ def get_password_from_keyring(self) -> Optional[str]:
username = cast(str, self.username)
logger.info("Querying keyring for password")
return cast(str, keyring.get_password(system, username))
except NoKeyringError:
logger.info("No keyring backend found")
except Exception as exc:
logger.warning("Error getting password from keyring", exc_info=exc)
return None
Expand All @@ -102,12 +172,19 @@ def username_from_keyring_or_prompt(self) -> str:

return self.prompt("username", input)

def password_from_keyring_or_prompt(self) -> str:
def password_from_keyring_or_trusted_publishing_or_prompt(self) -> str:
password = self.get_password_from_keyring()
if password:
logger.info("password set from keyring")
return password

if self.is_pypi() and self.username == "__token__":
di marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(
"Trying to use trusted publishing (no token was explicitly provided)"
)
if (token := self.make_trusted_publishing_token()) is not None:
return token

# Prompt for API token when required.
what = "API token" if self.is_pypi() else "password"

Expand Down
1 change: 1 addition & 0 deletions twine/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def list_dependencies_and_versions() -> List[Tuple[str, str]]:
"requests",
"requests-toolbelt",
"urllib3",
"id",
takluyver marked this conversation as resolved.
Show resolved Hide resolved
]
if sys.version_info < (3, 10):
deps.append("importlib-metadata")
Expand Down
6 changes: 6 additions & 0 deletions twine/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ class NonInteractive(TwineException):
pass


class TrustedPublishingFailure(TwineException):
"""Raised if we expected to use trusted publishing but couldn't."""

pass


class InvalidPyPIUploadURL(TwineException):
"""Repository configuration tries to use PyPI with an incorrect URL.

Expand Down
34 changes: 3 additions & 31 deletions twine/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Dict, List, Optional, Set, Tuple, cast
from typing import Any, Dict, List, Optional, Set, Tuple

import requests
import requests_toolbelt
import rich.progress
import urllib3
from requests import adapters
from requests_toolbelt.utils import user_agent
from rich import print

import twine
from twine import package as package_file
from twine.utils import make_requests_session

KEYWORDS_TO_NOT_FLATTEN = {"gpg_signature", "attestations", "content"}

Expand All @@ -47,7 +44,7 @@ def __init__(
) -> None:
self.url = repository_url

self.session = requests.session()
self.session = make_requests_session()
# requests.Session.auth should be Union[None, Tuple[str, str], ...]
# But username or password could be None
# See TODO for utils.RepositoryConfig
Expand All @@ -57,35 +54,10 @@ def __init__(
logger.info(f"username: {username if username else '<empty>'}")
logger.info(f"password: <{'hidden' if password else 'empty'}>")

self.session.headers["User-Agent"] = self._make_user_agent_string()
for scheme in ("http://", "https://"):
self.session.mount(scheme, self._make_adapter_with_retries())

# Working around https://github.com/python/typing/issues/182
self._releases_json_data: Dict[str, Dict[str, Any]] = {}
self.disable_progress_bar = disable_progress_bar

@staticmethod
def _make_adapter_with_retries() -> adapters.HTTPAdapter:
retry = urllib3.Retry(
allowed_methods=["GET"],
connect=5,
total=10,
status_forcelist=[500, 501, 502, 503],
)

return adapters.HTTPAdapter(max_retries=retry)

@staticmethod
def _make_user_agent_string() -> str:
user_agent_string = (
user_agent.UserAgentBuilder("twine", twine.__version__)
.include_implementation()
.build()
)

return cast(str, user_agent_string)

def close(self) -> None:
self.session.close()

Expand Down
26 changes: 26 additions & 0 deletions twine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@

import requests
import rfc3986
import urllib3
from requests.adapters import HTTPAdapter
from requests_toolbelt.utils import user_agent

import twine
from twine import exceptions

# Shim for input to allow testing.
Expand Down Expand Up @@ -304,6 +308,28 @@ def get_userpass_value(
get_clientcert = functools.partial(get_userpass_value, key="client_cert")


def make_requests_session() -> requests.Session:
"""Prepare a requests Session with retries & twine's user-agent string."""
s = requests.Session()

retry = urllib3.Retry(
allowed_methods=["GET"],
connect=5,
total=10,
status_forcelist=[500, 501, 502, 503],
)

for scheme in ("http://", "https://"):
s.mount(scheme, HTTPAdapter(max_retries=retry))

s.headers["User-Agent"] = (
user_agent.UserAgentBuilder("twine", twine.__version__)
.include_implementation()
.build()
)
return s


class EnvironmentDefault(argparse.Action):
"""Get values from environment variable."""

Expand Down
Loading