Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: share kafka client on meta #19058

Merged
merged 31 commits into from
Nov 2, 2024
Merged

feat: share kafka client on meta #19058

merged 31 commits into from
Nov 2, 2024

Conversation

tabVersion
Copy link
Contributor

@tabVersion tabVersion commented Oct 22, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

as title, reuse the client if the broker addr is the same, to reduce the conn to the broker

The key changes in this PR revolve around optimizing Kafka client management by introducing connection pooling. Here's the main changes:

Introduction of SHARED_KAFKA_CLIENT:

client.rs
// Added a shared cache for Kafka clients
pub static SHARED_KAFKA_CLIENT: LazyLock<MokaCache<KafkaConnection, Weak<KafkaClientType>>> =
    LazyLock::new(|| moka::future::Cache::builder().build());

The main motivations appear to be:

  1. Resource Optimization: Instead of creating new Kafka client connections for each enumerator, connections are now reused when possible through a shared cache.
  2. Memory Management: Uses Weak references to prevent memory leaks (when all related resources are dropped, drop the client then):
// Uses Arc and Weak to manage references
type KafkaClientType = BaseConsumer<RwConsumerContext>;
// Cache stores weak references to allow cleanup when clients are no longer needed
MokaCache<KafkaConnection, Weak<KafkaClientType>>

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Signed-off-by: tabVersion <tabvision@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
@hzxa21
Copy link
Collaborator

hzxa21 commented Oct 23, 2024

#18949

@tabVersion tabVersion marked this pull request as ready for review October 23, 2024 03:42
tabversion and others added 6 commits October 23, 2024 13:45
fix
Signed-off-by: tabversion <tabversion@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
@graphite-app graphite-app bot requested a review from a team October 23, 2024 13:08
src/connector/src/source/kafka/enumerator/client.rs Outdated Show resolved Hide resolved
Comment on lines 120 to 121
// drop the guard and acquire a new one to avoid a 10s blocking call
drop(shared_client_guard);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this happen?

  1. Caller A tries to get a client for a connection, but cache missed, so it takes some time for calller A to build the client.
  2. During it, caller B tries to get a client for the same connection, but cache missed, so it also builds a client.
  3. Caller A built and inserted the client to the map and set ref_count = 1
  4. One second later, caller B also inserted the client to the map and set ref_count = 1, causing the client of caller A to leak and will never be dropped.

Copy link
Member

@fuyufjh fuyufjh Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to use moka to replace the HashMap

use moka::future::Cache;

Particularly, a caching structure should handle these concurrent gets correctly by letting the caller B blocks until caller A completes its operation and insert back the cached item i.e. the Kafka Client.

Copy link
Member

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also run some tests and describe the improvements in the PR description?

@tabVersion tabVersion requested a review from a team as a code owner October 25, 2024 09:38
@xxchan
Copy link
Member

xxchan commented Oct 28, 2024

I’m not entirely sure what specific tests you’re looking for.

At least sth like "Manually tested that num of threads is reduced from xxx to yyy for zzz Kafka sources."

Although the idea might be clear, the implementation is not that trivial, so we should verify it works.

Besides, the background of the problem should also be mentioned.

Cargo.lock Outdated
Comment on lines 16533 to 16537

[[patch.unused]]
name = "quanta"
version = "0.11.0"
source = "git+https://github.com/madsim-rs/quanta.git?rev=948bdc3#948bdc3d4cd3fcfe3d52d03dd83deee96d97d770"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might break madsim

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to fix it? Directly removing the part does not help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after removing

# from Cargo.toml, in `[patch.crates-io]`
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }

the [[patch.unused]] section disappears. But the tests still stuck

  Running [ 00:01:43] [=============================================================================>                                                                                            ]  533/1167: 12 running, 533 passed (4 slow), 22 skipped     
   Canceling due to interrupt: 12 tests still running
   Canceling [ 00:01:43] [=============================================================================>                                                                                            ]  533/1167: 12 running, 533 passed (4 slow), 22 skipped     
      SIGINT [ 103.327s] risingwave_batch executor::generic_exchange::tests::test_exchange_multiple_sources
      SIGINT [ 102.891s] risingwave_batch executor::merge_sort_exchange::tests::test_exchange_multiple_sources
      SIGINT [ 102.802s] risingwave_batch executor::tests::test_clone_for_plan
      SIGINT [ 102.726s] risingwave_batch task::task_manager::tests::test_task_id_conflict
      SIGINT [  83.848s] risingwave_frontend binder::bind_param::test::subquery
      SIGINT [  83.893s] risingwave_frontend binder::bind_param::test::cast_after_specific
      SIGINT [  83.899s] risingwave_frontend binder::bind_param::test::basic_value
      SIGINT [  83.887s] risingwave_frontend binder::bind_param::test::default_type
      SIGINT [  83.903s] risingwave_frontend binder::bind_param::test::basic_select
      SIGINT [  83.042s] risingwave_frontend binder::expr::value::tests::test_bind_radix
      SIGINT [  83.886s] risingwave_frontend binder::bind_param::test::infer_case
      SIGINT [  83.498s] risingwave_frontend binder::expr::value::tests::test_bind_interval
------------

the dep to quanta is introduced by moka

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patching quanta is required, not unexpected.

moka 0.12.0 depends on quanta 0.11, but you upgraded moka to 0.12.8, which depends on quanta 0.12. Therefore, original quanta is used instead of the patched one. That's why it breaks.

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should either revert to moka 0.12.0, or upgrade the madsim quanta to 0.12

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
pub struct KafkaConnection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we will be able to use this same struct for CREATE CONNECTION later? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

Ok(item_val) => {
let share_item = SharedKafkaItem {
client: item_val.client.clone(),
ref_count: item_val.ref_count - 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why do we need ref_count. Can we just use sth like get_with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then how do we manage the RdKafka client instance if all related sources are dropped? IIUC, if we remove the ref_count, the client instance will always be kept in memory, until restarting the meta node. There is no evict policy enabled for the cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned by Bugen, I think we can store Weak in the cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if Weak fits well with moka. We may also try dashmap or weak-table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BugenZhao Why do you think Weak might not work with moka? Actually I'm also thinking what's the difference between moka (cache) and dashmap (concurrent hashmap)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm also thinking what's the difference between moka (cache) and dashmap (concurrent hashmap)

I think moka is a dashmap with evict policy and guarantees the updates can be done atomically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dashmap behaves similar to moka::sync::Cache with unlimited capacity. 🤔

Why do you think Weak might not work with moka?

Because the interface does not seem to be that compatible with storing a Weak, like, no auto-eviction, requires the caller to keep the strong reference:

 let mut client_arc: Option<Arc<KafkaClientType>> = None;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requires the caller to keep the strong reference

Isn't this expected usage? i.e., store Weak in the map, while store Arc in the caller. Otherwise who keeps the Arc? 👀

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean when inserting. 🤣 For example, weak-table allows you to pass a closure returning an Arc in insert_with while actually storing a Weak but returning an Arc back to the caller. With moka we need to temporarily hold the strong reference to prevent it from being deallocated.

@graphite-app graphite-app bot requested a review from a team October 28, 2024 08:59
@tabVersion
Copy link
Contributor Author

tabVersion commented Oct 28, 2024

I’m not entirely sure what specific tests you’re looking for.

At least sth like "Manually tested that num of threads is reduced from xxx to yyy for zzz Kafka sources."

Although the idea might be clear, the implementation is not that trivial, so we should verify it works.

Besides, the background of the problem should also be mentioned.

Tested locally

Kafka env

Metadata for test (from broker 0: 127.0.0.1:9092/0):
 1 brokers:
  broker 0 at 127.0.0.1:9092 (controller)
 1 topics:
  topic "test" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0

command:

for i in {0..100}; do  psql -h localhost -p 4566 -d dev -U root -c "create source s_$i (a int, b varchar) with (connector = 'kafka', topic = 'test', properties.bootstrap.server = '127.0.0.1:9092') format plain encode json ;" ; done

on main (a176ace): 1573 threads
on this pr (ac1d63d): 1272 threads
when system is idle: 767 threads

@xxchan
Copy link
Member

xxchan commented Oct 28, 2024

Would you mind testing Kafka with multiple brokers, where we might see a larger difference?

@tabVersion

This comment was marked as outdated.

@tabVersion
Copy link
Contributor Author

Would you mind testing Kafka with multiple brokers, where we might see a larger difference?

tested with confluent with multiple AZ

Metadata for all topics (from broker -1: sasl_ssl://pkc-p11xm.us-east-1.aws.confluent.cloud:9092/bootstrap):
 18 brokers:
  broker 0 at b0-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 1 at b1-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 2 at b2-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 3 at b3-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 4 at b4-pkc-p11xm.us-east-1.aws.confluent.cloud:9092 (controller)
  broker 5 at b5-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 6 at b6-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 7 at b7-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 8 at b8-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 9 at b9-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 10 at b10-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 11 at b11-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 12 at b12-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 13 at b13-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 14 at b14-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 15 at b15-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 16 at b16-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
  broker 17 at b17-pkc-p11xm.us-east-1.aws.confluent.cloud:9092
 1 topics:
  topic "topic_0" with 6 partitions:
    partition 0, leader 12, replicas: 12,17,13, isrs: 12,17,13
    partition 1, leader 17, replicas: 17,13,15, isrs: 17,13,15
    partition 2, leader 13, replicas: 13,15,14, isrs: 13,15,14
    partition 3, leader 15, replicas: 15,14,16, isrs: 15,14,16
    partition 4, leader 14, replicas: 14,16,12, isrs: 14,16,12
    partition 5, leader 16, replicas: 16,12,17, isrs: 16,12,17

testing script

for i in {0..4}; do 
	psql  -h 127.0.0.1 -p 4566 -d dev -U root -c "create source s_$i(a int, b varchar) with (connector = 'kafka', topic = 'topic_0', properties.bootstrap.server = 'xxx.us-east-1.aws.confluent.cloud:9092', properties.security.protocol = 'SASL_SSL', properties.sasl.mechanism = 'PLAIN', properties.sasl.username = '[...]', properties.sasl.password = '[...]') format plain encode json;" ; 
done

EC2 idle: 71
EC2 with risingwave running: 195
on main (a176ace): 744
on this pr (ac1d63d): 654


why not test with more source?

creating 5 source at once seems the SDK's maximum, I got the error afterward:

Caused by these errors (recent errors listed first):         
  1: gRPC request to meta service failed: Internal error
  2: The cluster is recovering                               
  3: get error from control stream, in worker node 1
  4: gRPC request to stream service failed: Internal error
  5: recv actor failure                                                                                                                                                                                                                              
  6: Actor 175 exited unexpectedly                 
  7: Executor error                            
  8: Connector error                                                                                                      
  9: Kafka error                                                                                                          
 10: Meta data fetch error                                                                                                                                                                                                                           
 11: Resolve (Local: Host resolution failure)

a little weird but irrelevant to the issue.

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

src/meta/Cargo.toml Outdated Show resolved Hide resolved
@@ -71,7 +71,7 @@ jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
maplit = "1.0.2"
moka = { version = "0.12.0", features = ["future"] }
moka = { version = "0.12.8", features = ["future"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still use 0.12.0 in this PR, so that there's no changes in Cargo.lockand bumping of quanta can be reviewed in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid not. We are using and_try_compute_with, which is unavailable in v0.12.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you try 0.12.3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it works

Copy link
Member

@BugenZhao BugenZhao Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to checkout the Cargo.lock on the main branch to downgrade the locked version of quanta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

moka 0.12.3 still needs a higher version of quanta. I am afraid we have to bump quanta first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. 😢 In its repo the version 0.12.3 released before bumping dependency on quanta, but on crates.io it's the other way around.

Cargo.toml Outdated
@@ -343,7 +339,7 @@ opt-level = 2

[patch.crates-io]
# Patch third-party crates for deterministic simulation.
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
quanta = { git = "https://github.com/tabVersion/quanta.git", rev = "bb6c780894d06c0ec3f487d58c72920665b5cb0a" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may contribute to madsim-rs/quanta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's merge madsim-rs/quanta#2 first and we can switch back to madsim.

@tabVersion tabVersion added this pull request to the merge queue Nov 2, 2024
Merged via the queue into main with commit e7e4a2c Nov 2, 2024
30 of 31 checks passed
@tabVersion tabVersion deleted the tab/share-kafka-client-enum branch November 2, 2024 15:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants