diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8eb7504a7..62d487543 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -147,6 +147,7 @@ class KafkaAdminClient(object): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + socks5_proxy (str): Socks5 proxy url. Default: None """ DEFAULT_CONFIG = { @@ -182,6 +183,7 @@ class KafkaAdminClient(object): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, + 'socks5_proxy': None, # metrics configs 'metric_reporters': [], @@ -272,7 +274,7 @@ def _refresh_controller_id(self): version = self._matching_api_version(MetadataRequest) if 1 <= version <= 6: request = MetadataRequest[version]() - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) @@ -310,7 +312,7 @@ def _find_coordinator_id_send_request(self, group_id): raise NotImplementedError( "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." .format(version)) - return self._send_request_to_node(self._client.least_loaded_node(), request) + return self._send_request_to_least_loaded_node(request) def _find_coordinator_id_process_response(self, response): """Process a FindCoordinatorResponse. @@ -355,9 +357,36 @@ def _find_coordinator_ids(self, group_ids): } return groups_coordinators - def _send_request_to_node(self, node_id, request, wakeup=True): + def _send_request_to_least_loaded_node(self, request): + """Send a Kafka protocol message to the least loaded broker. + + Returns a future that may be polled for status and results. + + :param request: The message to send. + :return: A future object that may be polled for status and results. + :exception: The exception if the message could not be sent. + """ + node_id = self._client.least_loaded_node() + while not self._client.ready(node_id): + # poll until the connection to broker is ready, otherwise send() + # will fail with NodeNotReadyError + self._client.poll() + + # node_id is not part of the cluster anymore, choose a new broker + # to connect to + if self._client.cluster.broker_metadata(node_id) is None: + node_id = self._client.least_loaded_node() + + return self._client.send(node_id, request) + + def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. + .. note:: + + This function will enter in an infinite loop if `node_id` is + removed from the cluster. + Returns a future that may be polled for status and results. :param node_id: The broker id to which to send the message. @@ -383,10 +412,23 @@ def _send_request_to_controller(self, request): tries = 2 # in case our cached self._controller_id is outdated while tries: tries -= 1 - future = self._send_request_to_node(self._controller_id, request) + future = self._client.send(self._controller_id, request) self._wait_for_futures([future]) + if future.exception is not None: + log.error( + "Sending request to controller_id %s failed with %s", + self._controller_id, + future.exception, + ) + is_outdated_controler = ( + self._client.cluster.broker_metadata(self._controller_id) is None + ) + if is_outdated_controler: + self._refresh_controller_id() + continue + response = future.value # In Java, the error field name is inconsistent: # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors @@ -507,10 +549,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): allow_auto_topic_creation=auto_topic_creation ) - future = self._send_request_to_node( - self._client.least_loaded_node(), - request - ) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) return future.value @@ -602,7 +641,7 @@ def describe_acls(self, acl_filter): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -693,7 +732,7 @@ def create_acls(self, acls): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -787,7 +826,7 @@ def delete_acls(self, acl_filters): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -847,8 +886,7 @@ def describe_configs(self, config_resources, include_synonyms=False): )) if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), + futures.append(self._send_request_to_least_loaded_node( DescribeConfigsRequest[version](resources=topic_resources) )) @@ -868,8 +906,7 @@ def describe_configs(self, config_resources, include_synonyms=False): )) if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), + futures.append(self._send_request_to_least_loaded_node( DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) )) else: @@ -916,7 +953,7 @@ def alter_configs(self, config_resources): # // a single request that may be sent to any broker. # # So this is currently broken as it always sends to the least_loaded_node() - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..2e3a680ff 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,7 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + socks5_proxy (str): Socks5 proxy URL. Default: None """ DEFAULT_CONFIG = { @@ -192,7 +193,8 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'socks5_proxy': None, } def __init__(self, **configs): @@ -368,18 +370,26 @@ def _maybe_connect(self, node_id): conn = self._conns.get(node_id) if conn is None: - broker = self.cluster.broker_metadata(node_id) - assert broker, 'Broker id %s not in current metadata' % (node_id,) - - log.debug("Initiating connection to node %s at %s:%s", - node_id, broker.host, broker.port) - host, port, afi = get_ip_port_afi(broker.host) - cb = WeakMethod(self._conn_state_change) - conn = BrokerConnection(host, broker.port, afi, - state_change_callback=cb, - node_id=node_id, - **self.config) - self._conns[node_id] = conn + broker_metadata = self.cluster.broker_metadata(node_id) + + # The broker may have been removed from the cluster after the + # call to `maybe_connect`. At this point there is no way to + # recover, so just ignore the connection + if broker_metadata is None: + log.debug("Node %s is not available anymore, discarding connection", node_id) + if node_id in self._connecting: + self._connecting.remove(node_id) + return False + else: + log.debug("Initiating connection to node %s at %s:%s", + node_id, broker_metadata.host, broker_metadata.port) + host, port, afi = get_ip_port_afi(broker_metadata.host) + cb = WeakMethod(self._conn_state_change) + conn = BrokerConnection(host, broker_metadata.port, afi, + state_change_callback=cb, + node_id=node_id, + **self.config) + self._conns[node_id] = conn # Check if existing connection should be recreated because host/port changed elif self._should_recycle_connection(conn): @@ -637,6 +647,9 @@ def _poll(self, timeout): self._sensors.select_time.record((end_select - start_select) * 1000000000) for key, events in ready: + if key.fileobj.fileno() < 0: + time.sleep(0.1) + if key.fileobj is self._wake_r: self._clear_wake_fd() continue diff --git a/kafka/conn.py b/kafka/conn.py index 1efb8a0a1..df903f264 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -33,6 +33,7 @@ from kafka.protocol.parser import KafkaProtocol from kafka.protocol.types import Int32, Int8 from kafka.scram import ScramClient +from kafka.socks5_wrapper import Socks5Wrapper from kafka.version import __version__ @@ -191,6 +192,7 @@ class BrokerConnection(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + socks5_proxy (str): Socks5 proxy url. Default: None """ DEFAULT_CONFIG = { @@ -224,7 +226,8 @@ class BrokerConnection(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'socks5_proxy': None, } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") @@ -236,6 +239,7 @@ def __init__(self, host, port, afi, **configs): self._sock_afi = afi self._sock_addr = None self._api_versions = None + self._socks5_proxy = None self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -368,7 +372,11 @@ def connect(self): log.debug('%s: creating new socket', self) assert self._sock is None self._sock_afi, self._sock_addr = next_lookup - self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) + if self.config["socks5_proxy"] is not None: + self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) + self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM) + else: + self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) for option in self.config['socket_options']: log.debug('%s: setting socket option %s', self, option) @@ -385,7 +393,10 @@ def connect(self): # to check connection status ret = None try: - ret = self._sock.connect_ex(self._sock_addr) + if self._socks5_proxy: + ret = self._socks5_proxy.connect_ex(self._sock_addr) + else: + ret = self._sock.connect_ex(self._sock_addr) except socket.error as err: ret = err.errno diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a1d1dfa37..969969932 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -245,6 +245,7 @@ class KafkaConsumer(six.Iterator): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + socks5_proxy (str): Socks5 proxy URL. Default: None Note: Configuration parameters are described in more detail at @@ -308,6 +309,7 @@ class KafkaConsumer(six.Iterator): 'sasl_oauth_token_provider': None, 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator 'kafka_client': KafkaClient, + 'socks5_proxy': None, } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index dd1cc508c..431642776 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -281,6 +281,7 @@ class KafkaProducer(object): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + socks5_proxy (str): Socks5 proxy URL. Default: None Note: Configuration parameters are described in more detail at @@ -335,6 +336,7 @@ class KafkaProducer(object): 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, 'kafka_client': KafkaClient, + 'socks5_proxy': None, } _COMPRESSORS = { diff --git a/kafka/socks5_wrapper.py b/kafka/socks5_wrapper.py new file mode 100644 index 000000000..18bea7c8d --- /dev/null +++ b/kafka/socks5_wrapper.py @@ -0,0 +1,248 @@ +try: + from urllib.parse import urlparse +except ImportError: + from urlparse import urlparse + +import errno +import logging +import random +import socket +import struct + +log = logging.getLogger(__name__) + + +class ProxyConnectionStates: + DISCONNECTED = '' + CONNECTING = '' + NEGOTIATE_PROPOSE = '' + NEGOTIATING = '' + AUTHENTICATING = '' + REQUEST_SUBMIT = '' + REQUESTING = '' + READ_ADDRESS = '' + COMPLETE = '' + + +class Socks5Wrapper: + """Socks5 proxy wrapper + + Manages connection through socks5 proxy with support for username/password + authentication. + """ + + def __init__(self, proxy_url, afi): + self._buffer_in = b'' + self._buffer_out = b'' + self._proxy_url = urlparse(proxy_url) + self._sock = None + self._state = ProxyConnectionStates.DISCONNECTED + self._target_afi = socket.AF_UNSPEC + + proxy_addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, afi) + # TODO raise error on lookup failure + self._proxy_addr = random.choice(proxy_addrs) + + @classmethod + def is_inet_4_or_6(cls, gai): + """Given a getaddrinfo struct, return True iff ipv4 or ipv6""" + return gai[0] in (socket.AF_INET, socket.AF_INET6) + + @classmethod + def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC): + """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)""" + # XXX: all DNS functions in Python are blocking. If we really + # want to be non-blocking here, we need to use a 3rd-party + # library like python-adns, or move resolution onto its + # own thread. This will be subject to the default libc + # name resolution timeout (5s on most Linux boxes) + try: + return list(filter(cls.is_inet_4_or_6, + socket.getaddrinfo(host, port, afi, + socket.SOCK_STREAM))) + except socket.gaierror as ex: + log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex) + return [] + + def socket(self, family, sock_type): + """Open and record a socket. + + Returns the actual underlying socket + object to ensure e.g. selects and ssl wrapping works as expected. + """ + self._target_afi = family # Store the address family of the target + afi, _, _, _, _ = self._proxy_addr + self._sock = socket.socket(afi, sock_type) + return self._sock + + def _flush_buf(self): + """Send out all data that is stored in the outgoing buffer. + + It is expected that the caller handles error handling, including non-blocking + as well as connection failure exceptions. + """ + while self._buffer_out: + sent_bytes = self._sock.send(self._buffer_out) + self._buffer_out = self._buffer_out[sent_bytes:] + + def _peek_buf(self, datalen): + """Ensure local inbound buffer has enough data, and return that data without + consuming the local buffer + + It's expected that the caller handles e.g. blocking exceptions""" + while True: + bytes_remaining = datalen - len(self._buffer_in) + if bytes_remaining <= 0: + break + data = self._sock.recv(bytes_remaining) + if not data: + break + self._buffer_in = self._buffer_in + data + + return self._buffer_in[:datalen] + + def _read_buf(self, datalen): + """Read and consume bytes from socket connection + + It's expected that the caller handles e.g. blocking exceptions""" + buf = self._peek_buf(datalen) + if buf: + self._buffer_in = self._buffer_in[len(buf):] + return buf + + def connect_ex(self, addr): + """Runs a state machine through connection to authentication to + proxy connection request. + + The somewhat strange setup is to facilitate non-intrusive use from + BrokerConnection state machine. + + This function is called with a socket in non-blocking mode. Both + send and receive calls can return in EWOULDBLOCK/EAGAIN which we + specifically avoid handling here. These are handled in main + BrokerConnection connection loop, which then would retry calls + to this function.""" + + if self._state == ProxyConnectionStates.DISCONNECTED: + self._state = ProxyConnectionStates.CONNECTING + + if self._state == ProxyConnectionStates.CONNECTING: + _, _, _, _, sockaddr = self._proxy_addr + ret = self._sock.connect_ex(sockaddr) + if not ret or ret == errno.EISCONN: + self._state = ProxyConnectionStates.NEGOTIATE_PROPOSE + else: + return ret + + if self._state == ProxyConnectionStates.NEGOTIATE_PROPOSE: + if self._proxy_url.username and self._proxy_url.password: + # Propose username/password + self._buffer_out = b"\x05\x01\x02" + else: + # Propose no auth + self._buffer_out = b"\x05\x01\x00" + self._state = ProxyConnectionStates.NEGOTIATING + + if self._state == ProxyConnectionStates.NEGOTIATING: + self._flush_buf() + buf = self._read_buf(2) + if buf[0:1] != b"\x05": + log.error("Unrecognized SOCKS version") + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if buf[1:2] == b"\x00": + # No authentication required + self._state = ProxyConnectionStates.REQUEST_SUBMIT + elif buf[1:2] == b"\x02": + # Username/password authentication selected + userlen = len(self._proxy_url.username) + passlen = len(self._proxy_url.password) + self._buffer_out = struct.pack( + "!bb{}sb{}s".format(userlen, passlen), + 1, # version + userlen, + self._proxy_url.username.encode(), + passlen, + self._proxy_url.password.encode(), + ) + self._state = ProxyConnectionStates.AUTHENTICATING + else: + log.error("Unrecognized SOCKS authentication method") + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if self._state == ProxyConnectionStates.AUTHENTICATING: + self._flush_buf() + buf = self._read_buf(2) + if buf == b"\x01\x00": + # Authentication succesful + self._state = ProxyConnectionStates.REQUEST_SUBMIT + else: + log.error("Socks5 proxy authentication failure") + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if self._state == ProxyConnectionStates.REQUEST_SUBMIT: + if self._target_afi == socket.AF_INET: + addr_type = 1 + addr_len = 4 + elif self._target_afi == socket.AF_INET6: + addr_type = 4 + addr_len = 16 + else: + log.error("Unknown address family, %r", self._target_afi) + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + self._buffer_out = struct.pack( + "!bbbb{}sh".format(addr_len), + 5, # version + 1, # command: connect + 0, # reserved + addr_type, # 1 for ipv4, 4 for ipv6 address + socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address + addr[1], # port + ) + self._state = ProxyConnectionStates.REQUESTING + + if self._state == ProxyConnectionStates.REQUESTING: + self._flush_buf() + buf = self._read_buf(2) + if buf[0:2] == b"\x05\x00": + self._state = ProxyConnectionStates.READ_ADDRESS + else: + log.error("Proxy request failed: %r", buf[1:2]) + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if self._state == ProxyConnectionStates.READ_ADDRESS: + # we don't really care about the remote endpoint address, but need to clear the stream + buf = self._peek_buf(2) + if buf[0:2] == b"\x00\x01": + _ = self._read_buf(2 + 4 + 2) # ipv4 address + port + elif buf[0:2] == b"\x00\x05": + _ = self._read_buf(2 + 16 + 2) # ipv6 address + port + else: + log.error("Unrecognized remote address type %r", buf[1:2]) + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + self._state = ProxyConnectionStates.COMPLETE + + if self._state == ProxyConnectionStates.COMPLETE: + return 0 + + # not reached; + # Send and recv will raise socket error on EWOULDBLOCK/EAGAIN that is assumed to be handled by + # the caller. The caller re-enters this state machine from retry logic with timer or via select & family + log.error("Internal error, state %r not handled correctly", self._state) + self._state = ProxyConnectionStates.DISCONNECTED + if self._sock: + self._sock.close() + return errno.ECONNREFUSED diff --git a/test/test_client_async.py b/test/test_client_async.py index 66b227aa9..b0592086a 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -71,13 +71,8 @@ def test_can_connect(cli, conn): def test_maybe_connect(cli, conn): - try: - # Node not in metadata, raises AssertionError - cli._maybe_connect(2) - except AssertionError: - pass - else: - assert False, 'Exception not raised' + # Node not in metadata should be ignored + cli._maybe_connect(2) # New node_id creates a conn object assert 0 not in cli._conns