-
Notifications
You must be signed in to change notification settings - Fork 575
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: share kafka client on meta #19058
Changes from 22 commits
b9b2f79
ab46672
3373a58
c5203d6
3b6a6a2
3b3f725
968ed08
a41f3fc
6534ebf
7249e78
0024e1d
ad8b989
ae1b70a
58b5128
f115a0c
d128644
ae9df41
45295bc
a9e34c7
d27ab90
256485c
62cb953
725e23c
832f66f
73f0b7b
ac1d63d
35fb002
ec49096
16d8c42
b3efda6
51eca61
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
// limitations under the License. | ||
|
||
use std::collections::BTreeMap; | ||
use std::hash::{Hash, Hasher}; | ||
use std::io::Write; | ||
use std::time::Duration; | ||
|
||
|
@@ -61,7 +62,7 @@ use aws_types::SdkConfig; | |
use risingwave_common::util::env_var::env_var_is_true; | ||
|
||
/// A flatten config map for aws auth. | ||
#[derive(Deserialize, Debug, Clone, WithOptions)] | ||
#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)] | ||
pub struct AwsAuthProps { | ||
#[serde(rename = "aws.region", alias = "region")] | ||
pub region: Option<String>, | ||
|
@@ -161,21 +162,11 @@ impl AwsAuthProps { | |
} | ||
|
||
#[serde_as] | ||
#[derive(Debug, Clone, Deserialize, WithOptions)] | ||
pub struct KafkaCommon { | ||
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] | ||
pub struct KafkaConnection { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems we will be able to use this same struct for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. |
||
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] | ||
pub brokers: String, | ||
|
||
#[serde(rename = "topic", alias = "kafka.topic")] | ||
pub topic: String, | ||
|
||
#[serde( | ||
rename = "properties.sync.call.timeout", | ||
deserialize_with = "deserialize_duration_from_string", | ||
default = "default_kafka_sync_call_timeout" | ||
)] | ||
pub sync_call_timeout: Duration, | ||
|
||
/// Security protocol used for RisingWave to communicate with Kafka brokers. Could be | ||
/// PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. | ||
#[serde(rename = "properties.security.protocol")] | ||
|
@@ -252,6 +243,20 @@ pub struct KafkaCommon { | |
|
||
#[serde_as] | ||
#[derive(Debug, Clone, Deserialize, WithOptions)] | ||
pub struct KafkaCommon { | ||
#[serde(rename = "topic", alias = "kafka.topic")] | ||
pub topic: String, | ||
|
||
#[serde( | ||
rename = "properties.sync.call.timeout", | ||
deserialize_with = "deserialize_duration_from_string", | ||
default = "default_kafka_sync_call_timeout" | ||
)] | ||
pub sync_call_timeout: Duration, | ||
} | ||
|
||
#[serde_as] | ||
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] | ||
pub struct KafkaPrivateLinkCommon { | ||
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. | ||
#[serde(rename = "broker.rewrite.endpoints")] | ||
|
@@ -269,7 +274,7 @@ pub struct RdKafkaPropertiesCommon { | |
/// Maximum Kafka protocol request message size. Due to differing framing overhead between | ||
/// protocol versions the producer is unable to reliably enforce a strict max message limit at | ||
/// produce time and may exceed the maximum size by one message in protocol ProduceRequests, | ||
/// the broker will enforce the the topic's max.message.bytes limit | ||
/// the broker will enforce the topic's max.message.bytes limit | ||
#[serde(rename = "properties.message.max.bytes")] | ||
#[serde_as(as = "Option<DisplayFromStr>")] | ||
pub message_max_bytes: Option<usize>, | ||
|
@@ -316,7 +321,13 @@ impl RdKafkaPropertiesCommon { | |
} | ||
} | ||
|
||
impl KafkaCommon { | ||
impl KafkaConnection { | ||
pub fn get_hash(&self) -> u64 { | ||
let mut hasher = std::collections::hash_map::DefaultHasher::new(); | ||
self.hash(&mut hasher); | ||
hasher.finish() | ||
} | ||
|
||
tabVersion marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) { | ||
// AWS_MSK_IAM | ||
if self.is_aws_msk_iam() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we still use
0.12.0
in this PR, so that there's no changes inCargo.lock
and bumping ofquanta
can be reviewed in a separate PR?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid not. We are using
and_try_compute_with
, which is unavailable in v0.12.0.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you try
0.12.3
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to checkout the
Cargo.lock
on the main branch to downgrade the locked version ofquanta
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moka 0.12.3 still needs a higher version of
quanta
. I am afraid we have to bump quanta first.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. 😢 In its repo the version
0.12.3
released before bumping dependency onquanta
, but on crates.io it's the other way around.