Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cannot connect to aws kafka from on-prem server with SASL_SSL enabled #139

Open
gaganyaan2 opened this issue Jul 28, 2023 · 2 comments
Open

Comments

@gaganyaan2
Copy link

gaganyaan2 commented Jul 28, 2023

I'm trying to use Kafka-proxy to connect aws kafka from my on-prem local machine with SASL_SSL auth enabled.
image

What's working:

  • Plaintext method is working
  • SASL method works if I use the same Bootstrap server name provided by AWS(b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096) within the internal VPC network using the Python client below.

What's NOT working:

When I run a proxy with the below option and,

[root@ip-10-20-109-135 ~]# kafka-proxy server --bootstrap-server-mapping "b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096,10.20.109.135:3001" \
>                      --bootstrap-server-mapping "b-1.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096,10.20.109.135:3002" \
>                      --sasl-enable \
>                      --sasl-username "alice" \
>                      --sasl-password "alice-secret" \
>                      --sasl-method "SCRAM-SHA-512" \
>                      --tls-enable \
>                      --tls-insecure-skip-verify \
>                      --log-level debug
INFO[2023-07-28T06:42:11Z] Starting kafka-proxy version 0.3.6
INFO[2023-07-28T06:42:11Z] Bootstrap server b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096 advertised as 10.20.109.135:3001
INFO[2023-07-28T06:42:11Z] Bootstrap server b-1.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096 advertised as 10.20.109.135:3002
INFO[2023-07-28T06:42:11Z] Listening on 10.20.109.135:3001 (10.20.109.135:3001) for remote b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
INFO[2023-07-28T06:42:11Z] Listening on 10.20.109.135:3002 (10.20.109.135:3002) for remote b-1.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
INFO[2023-07-28T06:42:11Z] Ready for new connectionsINFO[2023-07-28T06:42:19Z] New connection for b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
DEBU[2023-07-28T06:42:19Z] SASLSCRAM: Doing handshake. Mechanism: SCRAM-SHA-512DEBU[2023-07-28T06:42:19Z] Successful SASL handshake. Available mechanisms: [SCRAM-SHA-512]DEBU[2023-07-28T06:42:19Z] Commencing scram loopDEBU[2023-07-28T06:42:19Z] SASL SCRAM authentication succeededDEBU[2023-07-28T06:42:19Z] Kafka request key 9217, version 1, length 369295617INFO[2023-07-28T06:42:19Z] Reading data from local connection on 10.20.109.135:3001 from 100.MASK.MASK.MASK:62486 (b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096) had error: api key 9217 is invalidINFO[2023-07-28T06:42:20Z] New connection for b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
DEBU[2023-07-28T06:42:20Z] SASLSCRAM: Doing handshake. Mechanism: SCRAM-SHA-512
DEBU[2023-07-28T06:42:20Z] Successful SASL handshake. Available mechanisms: [SCRAM-SHA-512]
DEBU[2023-07-28T06:42:20Z] Commencing scram loop
DEBU[2023-07-28T06:42:20Z] SASL SCRAM authentication succeeded
DEBU[2023-07-28T06:42:20Z] Kafka request key 9217, version 1, length 369295617
INFO[2023-07-28T06:42:20Z] Reading data from local connection on 10.20.109.135:3001 from 100.MASK.MASK.MASK:62488 (b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
) had error: api key 9217 is invalid

Run Python client to connect to Kafka.

consumer.py

from confluent_kafka import Consumer, KafkaError, KafkaException
import sys
import time
import argparse
parser = argparse.ArgumentParser(description='kafka consumer parameters')
parser.add_argument('--server', type=str,
                    help='kafka server with port eg. kafka:9092.')

parser.add_argument('--consumer_group', type=str,
                    help='consumer_group name.')

parser.add_argument('--topic', type=str,
                    help='kafka topic name.')

parser.add_argument('--messages', type=int,
                    help='number of messages needs to be consumed.')

parser.add_argument('--sleep', type=float,
                    help='sleep between messages consumed.')

args = parser.parse_args()

kafka_server = args.server
kafka_consumer_group = args.consumer_group
kafka_topic = args.topic
kafka_messages= args.messages
kafka_sleep = args.sleep

conf = {'bootstrap.servers': kafka_server,
        'group.id': kafka_consumer_group,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-512',
        'sasl.username': 'alice',
        'sasl.password': 'alice-secret',
        'ssl.endpoint.identification.algorithm': " ",
        'enable.ssl.certificate.verification': "false",
        # 'enable.auto.commit': True,
        'auto.offset.reset': 'earliest'}

consumer = Consumer(conf)

def msg_process(msg):
    print(f"key: {msg.key()}, value: {msg.value()}")

running = True

def kafka_consumer(consumer, kafka_topic):
    try:
        print("Consuming from kafka_topic: ",str(kafka_topic))
        consumer.subscribe([kafka_topic])
        count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                print("message processing:")
                msg_process(msg)
                time.sleep(kafka_sleep)
                
                count = count + 1
                print("count :",count)
                if count == kafka_messages:
                    while True:
                        print("infinite sleep...")
                        time.sleep(60)

    finally:
        print("finally block")
        consumer.close()

#call kafka_consumer function
kafka_consumer(consumer,kafka_topic)

This is what I'm running on my on-prem machine.

on-prem@user:/$ python3 consumer.py --server X.X.X.X:3001  --consumer_group group1 --topic test --messages 100 --sleep 1
Consuming from kafka_topic:  test
%3|1690526538.799|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://X.X.X.X::3001/bootstrap]: sasl_ssl://X.X.X.X::3001/bootstrap: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 174ms in state SSL_HANDSHAKE)
%3|1690526539.776|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://X.X.X.X::3001/bootstrap]: sasl_ssl://X.X.X.X::3001/bootstrap: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 151ms in state SSL_HANDSHAKE, 1 identical error(s) suppressed)

I'm using Kafka 2.8.1 version and kafka-proxy 0.3.6 version.

Issues seems similar to #28 I tried adding below option but no luck.

'ssl.endpoint.identification.algorithm': " ",
'enable.ssl.certificate.verification': "false",

Please help.

@dnltech2020
Copy link

dnltech2020 commented Oct 24, 2023

are you sure you enabled SCRAM auth on your MSK cluster?

SCRAM an only be enabled on new MSK builds, you can not enable SCRAM after the cluster is already built.

@dnltech2020
Copy link

also noticed in your py script showing your kafkabrocker on port 9092 if your broker is on 9096 for SCRAM you would need to change below in your py script to 9096

help='kafka server with port eg. kafka:9092.')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants