From 840f2867eb7121f81f4465286ff49cd590ebb9fe Mon Sep 17 00:00:00 2001 From: Nick <0xb000@gmail.com> Date: Sun, 24 Nov 2024 13:13:01 +0200 Subject: [PATCH] Introduce user-defined wrapdb mirror This consolidates all queries to the wrapdb to go through the open_wrapdburl() function. The function handles a domain-specific url scheme wrapdb. When encountered, it substitutes the scheme with https and an authority(netloc) with either upstream wrapdb address or a user-defined one from the MESON_WRAPDB_MIRROR environment variable. --- mesonbuild/wrap/wrap.py | 53 +++++++++++++++++++++++++------------ mesonbuild/wrap/wraptool.py | 2 +- 2 files changed, 37 insertions(+), 18 deletions(-) diff --git a/mesonbuild/wrap/wrap.py b/mesonbuild/wrap/wrap.py index 7aae1663fd1f..c163d70eda40 100644 --- a/mesonbuild/wrap/wrap.py +++ b/mesonbuild/wrap/wrap.py @@ -49,18 +49,35 @@ has_ssl = False REQ_TIMEOUT = 30.0 -WHITELIST_SUBDOMAIN = 'wrapdb.mesonbuild.com' + +WRAPDB_UPSTREAM_HOSTNAME = 'wrapdb.mesonbuild.com' +WRAPDB_NETLOC = os.environ.get('MESON_WRAPDB_MIRROR', WRAPDB_UPSTREAM_HOSTNAME) ALL_TYPES = ['file', 'git', 'hg', 'svn', 'redirect'] PATCH = shutil.which('patch') -def whitelist_wrapdb(urlstr: str) -> urllib.parse.ParseResult: +def is_trusted_subdomain(hostname: str) -> bool: + trusted_subdomains = { + WRAPDB_UPSTREAM_HOSTNAME, + urllib.parse.urlparse(f'//{WRAPDB_NETLOC}').hostname, + } + for entry in trusted_subdomains: + if hostname.endswith(entry): + return True + + return False + +def expand_wrapdburl(urlstr: str) -> urllib.parse.ParseResult: """ raises WrapException if not whitelisted subdomain """ url = urllib.parse.urlparse(urlstr) + if url.scheme == 'wrapdb': + if url.netloc: + raise WrapException(f'{urlstr} with wrapdb: scheme should not have a netloc') + url = url._replace(scheme='https', netloc=WRAPDB_NETLOC) if not url.hostname: raise WrapException(f'{urlstr} is not a valid URL') - if not url.hostname.endswith(WHITELIST_SUBDOMAIN): + if not is_trusted_subdomain(url.hostname): raise WrapException(f'{urlstr} is not a whitelisted WrapDB URL') if has_ssl and not url.scheme == 'https': raise WrapException(f'WrapDB did not have expected SSL https url, instead got {urlstr}') @@ -72,12 +89,13 @@ def open_wrapdburl(urlstring: str, allow_insecure: bool = False, have_opt: bool else: insecure_msg = '' - url = whitelist_wrapdb(urlstring) + url = expand_wrapdburl(urlstring) if has_ssl: + urlstring_ = urllib.parse.urlunparse(url) try: - return T.cast('http.client.HTTPResponse', urllib.request.urlopen(urllib.parse.urlunparse(url), timeout=REQ_TIMEOUT)) + return T.cast('http.client.HTTPResponse', urllib.request.urlopen(urlstring_, timeout=REQ_TIMEOUT)) except OSError as excp: - msg = f'WrapDB connection failed to {urlstring} with error {excp}.' + msg = f'WrapDB connection failed to {urlstring_} with error {excp}.' if isinstance(excp, urllib.error.URLError) and isinstance(excp.reason, ssl.SSLCertVerificationError): if allow_insecure: mlog.warning(f'{msg}\n\n Proceeding without authentication.') @@ -93,14 +111,15 @@ def open_wrapdburl(urlstring: str, allow_insecure: bool = False, have_opt: bool # If we got this far, allow_insecure was manually passed nossl_url = url._replace(scheme='http') + urlstring_ = urllib.parse.urlunparse(nossl_url) try: - return T.cast('http.client.HTTPResponse', urllib.request.urlopen(urllib.parse.urlunparse(nossl_url), timeout=REQ_TIMEOUT)) + return T.cast('http.client.HTTPResponse', urllib.request.urlopen(urlstring_, timeout=REQ_TIMEOUT)) except OSError as excp: - raise WrapException(f'WrapDB connection failed to {urlstring} with error {excp}') + raise WrapException(f'WrapDB connection failed to {urlstring_} with error {excp}') def get_releases_data(allow_insecure: bool) -> bytes: - url = open_wrapdburl('https://wrapdb.mesonbuild.com/v2/releases.json', allow_insecure, True) - return url.read() + resp = open_wrapdburl('wrapdb:///v2/releases.json', allow_insecure, True) + return resp.read() @lru_cache(maxsize=None) def get_releases(allow_insecure: bool) -> T.Dict[str, T.Any]: @@ -108,14 +127,14 @@ def get_releases(allow_insecure: bool) -> T.Dict[str, T.Any]: return T.cast('T.Dict[str, T.Any]', json.loads(data.decode())) def update_wrap_file(wrapfile: str, name: str, new_version: str, new_revision: str, allow_insecure: bool) -> None: - url = open_wrapdburl(f'https://wrapdb.mesonbuild.com/v2/{name}_{new_version}-{new_revision}/{name}.wrap', + resp = open_wrapdburl(f'wrapdb:///v2/{name}_{new_version}-{new_revision}/{name}.wrap', allow_insecure, True) with open(wrapfile, 'wb') as f: - f.write(url.read()) + f.write(resp.read()) def parse_patch_url(patch_url: str) -> T.Tuple[str, str]: u = urllib.parse.urlparse(patch_url) - if u.netloc != 'wrapdb.mesonbuild.com': + if not is_trusted_subdomain(u.hostname): raise WrapException(f'URL {patch_url} does not seems to be a wrapdb patch') arr = u.path.strip('/').split('/') if arr[0] == 'v1': @@ -384,10 +403,10 @@ def get_from_wrapdb(self, subp_name: str) -> T.Optional[PackageDefinition]: self.check_can_download() latest_version = info['versions'][0] version, revision = latest_version.rsplit('-', 1) - url = urllib.request.urlopen(f'https://wrapdb.mesonbuild.com/v2/{subp_name}_{version}-{revision}/{subp_name}.wrap') + resp = open_wrapdburl(f'wrapdb:///v2/{subp_name}_{version}-{revision}/{subp_name}.wrap') fname = Path(self.subdir_root, f'{subp_name}.wrap') with fname.open('wb') as f: - f.write(url.read()) + f.write(resp.read()) mlog.log(f'Installed {subp_name} version {version} revision {revision}') wrap = PackageDefinition.from_wrap_file(str(fname)) self.wraps[wrap.name] = wrap @@ -687,9 +706,9 @@ def get_data(self, urlstring: str) -> T.Tuple[str, str]: h = hashlib.sha256() tmpfile = tempfile.NamedTemporaryFile(mode='wb', dir=self.cachedir, delete=False) url = urllib.parse.urlparse(urlstring) - if url.hostname and url.hostname.endswith(WHITELIST_SUBDOMAIN): + if url.hostname and is_trusted_subdomain(url.hostname): resp = open_wrapdburl(urlstring, allow_insecure=self.allow_insecure, have_opt=self.wrap_frontend) - elif WHITELIST_SUBDOMAIN in urlstring: + elif WRAPDB_UPSTREAM_HOSTNAME in urlstring: raise WrapException(f'{urlstring} may be a WrapDB-impersonating URL') else: headers = { diff --git a/mesonbuild/wrap/wraptool.py b/mesonbuild/wrap/wraptool.py index 5486a26a7782..1221d4c24ebb 100644 --- a/mesonbuild/wrap/wraptool.py +++ b/mesonbuild/wrap/wraptool.py @@ -99,7 +99,7 @@ def install(options: 'argparse.Namespace') -> None: if os.path.exists(wrapfile): raise SystemExit('Wrap file already exists.') (version, revision) = get_latest_version(name, options.allow_insecure) - url = open_wrapdburl(f'https://wrapdb.mesonbuild.com/v2/{name}_{version}-{revision}/{name}.wrap', options.allow_insecure, True) + url = open_wrapdburl(f'wrapdb:///v2/{name}_{version}-{revision}/{name}.wrap', options.allow_insecure, True) with open(wrapfile, 'wb') as f: f.write(url.read()) print(f'Installed {name} version {version} revision {revision}')