From 510b1fe4024f696617b165f640ed84e780361841 Mon Sep 17 00:00:00 2001 From: Carlos Herrero Date: Fri, 15 Nov 2024 12:16:25 +0100 Subject: [PATCH] LITE-31369: Fix AsyncConnectClient ignoring environment proxies --- connect/client/fluent.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/connect/client/fluent.py b/connect/client/fluent.py index 75a80f8..359c610 100644 --- a/connect/client/fluent.py +++ b/connect/client/fluent.py @@ -5,11 +5,14 @@ # import contextvars import threading +from functools import cache from json.decoder import JSONDecodeError from typing import Union import httpx import requests +from httpx._config import Proxy +from httpx._utils import get_environment_proxies from requests.adapters import HTTPAdapter from connect.client.constants import CONNECT_ENDPOINT_URL, CONNECT_SPECS_URL @@ -237,6 +240,20 @@ def _get_namespace_class(self): _SSL_CONTEXT = httpx.create_ssl_context() +@cache +def _get_async_mounts(): + """ + This code based on how httpx.Client mounts proxies from environment. + This is cached to allow reusing the created transport objects. + """ + return { + key: None + if url is None + else httpx.AsyncHTTPTransport(verify=_SSL_CONTEXT, proxy=Proxy(url=url)) + for key, url in get_environment_proxies().items() + } + + class AsyncConnectClient(_ConnectClientBase, AsyncClientMixin): """ Create a new instance of the AsyncConnectClient. @@ -274,12 +291,14 @@ def __init__(self, *args, **kwargs): def session(self): value = self._session.get() if not value: - value = httpx.AsyncClient( - transport=_ASYNC_TRANSPORTS.setdefault( - self.endpoint, - httpx.AsyncHTTPTransport(verify=_SSL_CONTEXT), - ), - ) + transport = _ASYNC_TRANSPORTS.get(self.endpoint) + if not transport: + transport = _ASYNC_TRANSPORTS[self.endpoint] = httpx.AsyncHTTPTransport( + verify=_SSL_CONTEXT, + ) + # When passing a transport to httpx a Client/AsyncClient, proxies defined in environment + # (like HTTP_PROXY) are ignored, so let's pass them using mounts parameter. + value = httpx.AsyncClient(transport=transport, mounts=_get_async_mounts()) self._session.set(value) return value