Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: share kafka client on meta #19058

Merged
merged 31 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b9b2f79
share kafka client on meta
tabVersion Oct 22, 2024
ab46672
check kafka connection identical
tabVersion Oct 22, 2024
3373a58
fix
tabVersion Oct 22, 2024
c5203d6
fix with props
tabVersion Oct 22, 2024
3b6a6a2
fix
tabVersion Oct 22, 2024
3b3f725
use connection hash as hashmap entry
tabVersion Oct 22, 2024
968ed08
fix
tabVersion Oct 22, 2024
a41f3fc
fix
Oct 23, 2024
6534ebf
rerun
tabVersion Oct 23, 2024
7249e78
Merge branch 'tab/share-kafka-client-enum' of https://github.com/risi…
tabVersion Oct 23, 2024
0024e1d
Merge remote-tracking branch 'origin' into tab/share-kafka-client-enum
tabVersion Oct 23, 2024
ad8b989
fix
tabVersion Oct 23, 2024
ae1b70a
fix
tabVersion Oct 23, 2024
58b5128
better with options
Oct 25, 2024
f115a0c
use kafka connection as hashkey
Oct 25, 2024
d128644
use moka
Oct 25, 2024
ae9df41
fix lint
Oct 25, 2024
45295bc
fix
Oct 25, 2024
a9e34c7
fix
Oct 25, 2024
d27ab90
Merge branch 'main' into tab/share-kafka-client-enum
tabVersion Oct 25, 2024
256485c
Merge remote-tracking branch 'origin' into tab/share-kafka-client-enum
Oct 28, 2024
62cb953
Merge branch 'tab/share-kafka-client-enum' of https://github.com/risi…
Oct 28, 2024
725e23c
remove get hash func
Oct 28, 2024
832f66f
migrate to Weak
Oct 28, 2024
73f0b7b
minor
Oct 28, 2024
ac1d63d
fix
Oct 28, 2024
35fb002
Merge remote-tracking branch 'origin' into tab/share-kafka-client-enum
tabVersion Oct 29, 2024
ec49096
test bump quanta to 0.12.3
tabVersion Oct 29, 2024
16d8c42
update patch
tabVersion Oct 29, 2024
b3efda6
moka 0.12.3
Oct 30, 2024
51eca61
switch to madsim repo
Nov 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 13 additions & 97 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,7 @@ arrow-udf-flight = "0.4"
clap = { version = "4", features = ["cargo", "derive", "env"] }
# Use a forked version which removes the dependencies on dynamo db to reduce
# compile time and binary size.
deltalake = { version = "0.20.1", features = [
"s3",
"gcs",
"datafusion",
] }
deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] }
itertools = "0.13.0"
jsonbb = "0.1.4"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" }
Expand Down Expand Up @@ -343,7 +339,7 @@ opt-level = 2

[patch.crates-io]
# Patch third-party crates for deterministic simulation.
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
quanta = { git = "https://github.com/tabVersion/quanta.git", rev = "bb6c780894d06c0ec3f487d58c72920665b5cb0a" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may contribute to madsim-rs/quanta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's merge madsim-rs/quanta#2 first and we can switch back to madsim.

getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" }
# Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies.
# Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`.
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source_legacy/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to create source worker
3: failed to parse json
4: missing field `properties.bootstrap.server`
4: missing field `topic`


statement error
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
maplit = "1.0.2"
moka = { version = "0.12.0", features = ["future"] }
moka = { version = "0.12.8", features = ["future"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still use 0.12.0 in this PR, so that there's no changes in Cargo.lockand bumping of quanta can be reviewed in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid not. We are using and_try_compute_with, which is unavailable in v0.12.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you try 0.12.3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it works

Copy link
Member

@BugenZhao BugenZhao Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to checkout the Cargo.lock on the main branch to downgrade the locked version of quanta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

moka 0.12.3 still needs a higher version of quanta. I am afraid we have to bump quanta first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. 😢 In its repo the version 0.12.3 released before bumping dependency on quanta, but on crates.io it's the other way around.

mongodb = { version = "2.8.2", features = ["tokio-runtime"] }
mysql_async = { version = "0.34", default-features = false, features = [
"default",
Expand Down
35 changes: 20 additions & 15 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::hash::Hash;
use std::io::Write;
use std::time::Duration;

Expand Down Expand Up @@ -61,7 +62,7 @@ use aws_types::SdkConfig;
use risingwave_common::util::env_var::env_var_is_true;

/// A flatten config map for aws auth.
#[derive(Deserialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)]
pub struct AwsAuthProps {
#[serde(rename = "aws.region", alias = "region")]
pub region: Option<String>,
Expand Down Expand Up @@ -161,21 +162,11 @@ impl AwsAuthProps {
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
pub struct KafkaConnection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we will be able to use this same struct for CREATE CONNECTION later? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

#[serde(
rename = "properties.sync.call.timeout",
deserialize_with = "deserialize_duration_from_string",
default = "default_kafka_sync_call_timeout"
)]
pub sync_call_timeout: Duration,

/// Security protocol used for RisingWave to communicate with Kafka brokers. Could be
/// PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL.
#[serde(rename = "properties.security.protocol")]
Expand Down Expand Up @@ -252,6 +243,20 @@ pub struct KafkaCommon {

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

#[serde(
rename = "properties.sync.call.timeout",
deserialize_with = "deserialize_duration_from_string",
default = "default_kafka_sync_call_timeout"
)]
pub sync_call_timeout: Duration,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)]
pub struct KafkaPrivateLinkCommon {
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
#[serde(rename = "broker.rewrite.endpoints")]
Expand All @@ -269,7 +274,7 @@ pub struct RdKafkaPropertiesCommon {
/// Maximum Kafka protocol request message size. Due to differing framing overhead between
/// protocol versions the producer is unable to reliably enforce a strict max message limit at
/// produce time and may exceed the maximum size by one message in protocol ProduceRequests,
/// the broker will enforce the the topic's max.message.bytes limit
/// the broker will enforce the topic's max.message.bytes limit
#[serde(rename = "properties.message.max.bytes")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub message_max_bytes: Option<usize>,
Expand Down Expand Up @@ -316,7 +321,7 @@ impl RdKafkaPropertiesCommon {
}
}

impl KafkaCommon {
impl KafkaConnection {
pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) {
// AWS_MSK_IAM
if self.is_aws_msk_iam() {
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/connector_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService};

mod common;
pub use common::{
AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaPrivateLinkCommon, KinesisCommon,
MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon,
PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon,
KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon,
RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
};

mod iceberg;
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use risingwave_common::array::ArrayError;
use risingwave_common::error::def_anyhow_newtype;
use risingwave_pb::PbFieldNotFound;
Expand All @@ -29,6 +31,7 @@ def_anyhow_newtype! {

// Common errors
std::io::Error => transparent,
Arc<ConnectorError> => transparent,

// Fine-grained connector errors
AccessError => transparent,
Expand Down
Loading
Loading