Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: share kafka client on meta #19058

Merged
merged 31 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b9b2f79
share kafka client on meta
tabVersion Oct 22, 2024
ab46672
check kafka connection identical
tabVersion Oct 22, 2024
3373a58
fix
tabVersion Oct 22, 2024
c5203d6
fix with props
tabVersion Oct 22, 2024
3b6a6a2
fix
tabVersion Oct 22, 2024
3b3f725
use connection hash as hashmap entry
tabVersion Oct 22, 2024
968ed08
fix
tabVersion Oct 22, 2024
a41f3fc
fix
Oct 23, 2024
6534ebf
rerun
tabVersion Oct 23, 2024
7249e78
Merge branch 'tab/share-kafka-client-enum' of https://github.com/risi…
tabVersion Oct 23, 2024
0024e1d
Merge remote-tracking branch 'origin' into tab/share-kafka-client-enum
tabVersion Oct 23, 2024
ad8b989
fix
tabVersion Oct 23, 2024
ae1b70a
fix
tabVersion Oct 23, 2024
58b5128
better with options
Oct 25, 2024
f115a0c
use kafka connection as hashkey
Oct 25, 2024
d128644
use moka
Oct 25, 2024
ae9df41
fix lint
Oct 25, 2024
45295bc
fix
Oct 25, 2024
a9e34c7
fix
Oct 25, 2024
d27ab90
Merge branch 'main' into tab/share-kafka-client-enum
tabVersion Oct 25, 2024
256485c
Merge remote-tracking branch 'origin' into tab/share-kafka-client-enum
Oct 28, 2024
62cb953
Merge branch 'tab/share-kafka-client-enum' of https://github.com/risi…
Oct 28, 2024
725e23c
remove get hash func
Oct 28, 2024
832f66f
migrate to Weak
Oct 28, 2024
73f0b7b
minor
Oct 28, 2024
ac1d63d
fix
Oct 28, 2024
35fb002
Merge remote-tracking branch 'origin' into tab/share-kafka-client-enum
tabVersion Oct 29, 2024
ec49096
test bump quanta to 0.12.3
tabVersion Oct 29, 2024
16d8c42
update patch
tabVersion Oct 29, 2024
b3efda6
moka 0.12.3
Oct 30, 2024
51eca61
switch to madsim repo
Nov 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e_test/source_legacy/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to create source worker
3: failed to parse json
4: missing field `properties.bootstrap.server`
4: missing field `topic`


statement error
Expand Down
41 changes: 26 additions & 15 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};
use std::io::Write;
use std::time::Duration;

Expand Down Expand Up @@ -61,7 +62,7 @@ use aws_types::SdkConfig;
use risingwave_common::util::env_var::env_var_is_true;

/// A flatten config map for aws auth.
#[derive(Deserialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)]
pub struct AwsAuthProps {
#[serde(rename = "aws.region", alias = "region")]
pub region: Option<String>,
Expand Down Expand Up @@ -161,21 +162,11 @@ impl AwsAuthProps {
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash)]
pub struct KafkaConnection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we will be able to use this same struct for CREATE CONNECTION later? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

#[serde(
rename = "properties.sync.call.timeout",
deserialize_with = "deserialize_duration_from_string",
default = "default_kafka_sync_call_timeout"
)]
pub sync_call_timeout: Duration,

/// Security protocol used for RisingWave to communicate with Kafka brokers. Could be
/// PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL.
#[serde(rename = "properties.security.protocol")]
Expand Down Expand Up @@ -252,6 +243,20 @@ pub struct KafkaCommon {

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

#[serde(
rename = "properties.sync.call.timeout",
deserialize_with = "deserialize_duration_from_string",
default = "default_kafka_sync_call_timeout"
)]
pub sync_call_timeout: Duration,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)]
pub struct KafkaPrivateLinkCommon {
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
#[serde(rename = "broker.rewrite.endpoints")]
Expand All @@ -269,7 +274,7 @@ pub struct RdKafkaPropertiesCommon {
/// Maximum Kafka protocol request message size. Due to differing framing overhead between
/// protocol versions the producer is unable to reliably enforce a strict max message limit at
/// produce time and may exceed the maximum size by one message in protocol ProduceRequests,
/// the broker will enforce the the topic's max.message.bytes limit
/// the broker will enforce the topic's max.message.bytes limit
#[serde(rename = "properties.message.max.bytes")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub message_max_bytes: Option<usize>,
Expand Down Expand Up @@ -316,7 +321,13 @@ impl RdKafkaPropertiesCommon {
}
}

impl KafkaCommon {
impl KafkaConnection {
pub fn get_hash(&self) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
self.hash(&mut hasher);
hasher.finish()
}

tabVersion marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) {
// AWS_MSK_IAM
if self.is_aws_msk_iam() {
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/connector_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService};

mod common;
pub use common::{
AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaPrivateLinkCommon, KinesisCommon,
MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon,
PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon,
KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon,
RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
};

mod iceberg;
Expand Down
14 changes: 9 additions & 5 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ pub struct KafkaConfig {
#[serde(flatten)]
pub common: KafkaCommon,

#[serde(flatten)]
pub connection: crate::connector_common::KafkaConnection,
tabVersion marked this conversation as resolved.
Show resolved Hide resolved

#[serde(
rename = "properties.retry.max",
default = "_default_max_retries",
Expand Down Expand Up @@ -269,6 +272,7 @@ impl From<KafkaConfig> for KafkaProperties {
time_offset: None,
upsert: None,
common: val.common,
connection: val.connection,
rdkafka_properties_common: val.rdkafka_properties_common,
rdkafka_properties_consumer: Default::default(),
privatelink_common: val.privatelink_common,
Expand Down Expand Up @@ -368,7 +372,7 @@ impl Sink for KafkaSink {
if !check.check_reachability().await {
return Err(SinkError::Config(anyhow!(
"cannot connect to kafka broker ({})",
self.config.common.brokers
self.config.connection.brokers
)));
}
Ok(())
Expand Down Expand Up @@ -413,11 +417,11 @@ impl KafkaSinkWriter {
let mut c = ClientConfig::new();

// KafkaConfig configuration
config.common.set_security_properties(&mut c);
config.connection.set_security_properties(&mut c);
config.set_client(&mut c);

// ClientConfig configuration
c.set("bootstrap.servers", &config.common.brokers);
c.set("bootstrap.servers", &config.connection.brokers);

// Create the producer context, will be used to create the producer
let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone();
Expand All @@ -426,7 +430,7 @@ impl KafkaSinkWriter {
None,
None,
config.aws_auth_props.clone(),
config.common.is_aws_msk_iam(),
config.connection.is_aws_msk_iam(),
)
.await?;
let producer_ctx = RwProducerContext::new(ctx_common);
Expand Down Expand Up @@ -685,7 +689,7 @@ mod test {
"broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(),
};
let config = KafkaConfig::from_btreemap(properties).unwrap();
assert_eq!(config.common.brokers, "localhost:9092");
assert_eq!(config.connection.brokers, "localhost:9092");
assert_eq!(config.common.topic, "test");
assert_eq!(config.max_retry_num, 20);
assert_eq!(config.retry_interval, Duration::from_millis(500));
Expand Down
98 changes: 70 additions & 28 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use anyhow::{anyhow, Context};
Expand All @@ -32,6 +33,14 @@ use crate::source::kafka::{
};
use crate::source::SourceEnumeratorContextRef;

pub static SHARED_KAFKA_CLIENT: LazyLock<tokio::sync::Mutex<HashMap<u64, SharedKafkaItem>>> =
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
LazyLock::new(|| tokio::sync::Mutex::new(HashMap::new()));

pub struct SharedKafkaItem {
pub client: Arc<BaseConsumer<RwConsumerContext>>,
pub ref_count: i32,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum KafkaEnumeratorOffset {
Earliest,
Expand All @@ -44,7 +53,7 @@ pub struct KafkaSplitEnumerator {
context: SourceEnumeratorContextRef,
broker_address: String,
topic: String,
client: BaseConsumer<RwConsumerContext>,
client: Arc<BaseConsumer<RwConsumerContext>>,
start_offset: KafkaEnumeratorOffset,

// maybe used in the future for batch processing
Expand All @@ -68,12 +77,14 @@ impl SplitEnumerator for KafkaSplitEnumerator {
let mut config = rdkafka::ClientConfig::new();
let common_props = &properties.common;

let broker_address = common_props.brokers.clone();
let broker_address = properties.connection.brokers.clone();

let connection_hash = properties.connection.get_hash();
let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
let topic = common_props.topic.clone();
config.set("bootstrap.servers", &broker_address);
config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
common_props.set_security_properties(&mut config);
properties.connection.set_security_properties(&mut config);
properties.set_client(&mut config);
let mut scan_start_offset = match properties
.scan_startup_mode
Expand All @@ -94,36 +105,67 @@ impl SplitEnumerator for KafkaSplitEnumerator {
scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
}

// don't need kafka metrics from enumerator
let ctx_common = KafkaContextCommon::new(
broker_rewrite_map,
None,
None,
properties.aws_auth_props,
common_props.is_aws_msk_iam(),
)
.await?;
let client_ctx = RwConsumerContext::new(ctx_common);
let client: BaseConsumer<RwConsumerContext> =
config.create_with_context(client_ctx).await?;

// Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call
// rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either
// rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval
// of an initial token to occur.
// https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf
if common_props.is_aws_msk_iam() {
#[cfg(not(madsim))]
client.poll(Duration::from_secs(10)); // note: this is a blocking call
#[cfg(madsim)]
client.poll(Duration::from_secs(10)).await;
let kafka_client: Arc<BaseConsumer<RwConsumerContext>>;
let mut shared_client_guard = SHARED_KAFKA_CLIENT.lock().await;
if let Some(item) = shared_client_guard.get_mut(&connection_hash) {
tracing::info!(
"reusing kafka client for connection hash {}, to broker {}",
connection_hash,
broker_address
);
kafka_client = item.client.clone();
item.ref_count += 1;
drop(shared_client_guard);
} else {
// drop the guard and acquire a new one to avoid a 10s blocking call
drop(shared_client_guard);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this happen?

  1. Caller A tries to get a client for a connection, but cache missed, so it takes some time for calller A to build the client.
  2. During it, caller B tries to get a client for the same connection, but cache missed, so it also builds a client.
  3. Caller A built and inserted the client to the map and set ref_count = 1
  4. One second later, caller B also inserted the client to the map and set ref_count = 1, causing the client of caller A to leak and will never be dropped.

Copy link
Member

@fuyufjh fuyufjh Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to use moka to replace the HashMap

use moka::future::Cache;

Particularly, a caching structure should handle these concurrent gets correctly by letting the caller B blocks until caller A completes its operation and insert back the cached item i.e. the Kafka Client.


// don't need kafka metrics from enumerator
let ctx_common = KafkaContextCommon::new(
broker_rewrite_map,
None,
None,
properties.aws_auth_props,
properties.connection.is_aws_msk_iam(),
)
.await?;
let client_ctx = RwConsumerContext::new(ctx_common);
let client: BaseConsumer<RwConsumerContext> =
config.create_with_context(client_ctx).await?;

// Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call
// rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either
// rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval
// of an initial token to occur.
// https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf
if properties.connection.is_aws_msk_iam() {
#[cfg(not(madsim))]
client.poll(Duration::from_secs(10)); // note: this is a blocking call
#[cfg(madsim)]
client.poll(Duration::from_secs(10)).await;
}

kafka_client = Arc::new(client);
tracing::debug!(
"created kafka client for connection hash {} to broker {}",
connection_hash,
broker_address
);
let mut shared_client_guard = SHARED_KAFKA_CLIENT.lock().await;
shared_client_guard.insert(
connection_hash,
SharedKafkaItem {
client: kafka_client.clone(),
ref_count: 1,
},
);
}

Ok(Self {
context,
broker_address,
topic,
client,
client: kafka_client,
start_offset: scan_start_offset,
stop_offset: KafkaEnumeratorOffset::None,
sync_call_timeout: properties.common.sync_call_timeout,
Expand All @@ -148,7 +190,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
.fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
.await?;

let ret = topic_partitions
let ret: Vec<_> = topic_partitions
.into_iter()
.map(|partition| KafkaSplit {
topic: self.topic.clone(),
Expand Down
5 changes: 4 additions & 1 deletion src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};

use crate::connector_common::{AwsAuthProps, KafkaPrivateLinkCommon};
use crate::connector_common::{AwsAuthProps, KafkaConnection, KafkaPrivateLinkCommon};

mod client_context;
pub mod enumerator;
Expand Down Expand Up @@ -143,6 +143,9 @@ pub struct KafkaProperties {
#[serde(flatten)]
pub common: KafkaCommon,

#[serde(flatten)]
pub connection: KafkaConnection,

#[serde(flatten)]
pub rdkafka_properties_common: RdKafkaPropertiesCommon,

Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl SplitReader for KafkaSplitReader {
) -> Result<Self> {
let mut config = ClientConfig::new();

let bootstrap_servers = &properties.common.brokers;
let bootstrap_servers = &properties.connection.brokers;
let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();

// disable partition eof
Expand All @@ -73,7 +73,7 @@ impl SplitReader for KafkaSplitReader {
config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
config.set("bootstrap.servers", bootstrap_servers);

properties.common.set_security_properties(&mut config);
properties.connection.set_security_properties(&mut config);
properties.set_client(&mut config);

let group_id_prefix = properties
Expand All @@ -95,7 +95,7 @@ impl SplitReader for KafkaSplitReader {
// explicitly
Some(source_ctx.metrics.rdkafka_native_metric.clone()),
properties.aws_auth_props,
properties.common.is_aws_msk_iam(),
properties.connection.is_aws_msk_iam(),
)
.await?;

Expand Down
Loading
Loading