Skip to content

Commit

Permalink
source-kafka: extend metadata timeout
Browse files Browse the repository at this point in the history
This timeout is used for fetching cluster metadata during the interactive
discover and validation operations.

We need some kind of timeout here since its not uncommon for misconfigured
configurations to end up hanging forever when trying to connect, but it can also
take a little while for all the metadata to be returned from a Kafka cluster if
there is a lot of it.

This extends the timeout from a pretty restrictive 5 seconds to a much more
permissive 60 seconds. 60 seconds is quite a while for a discover/validate
attempt to hang if the configuration is incorrect but that seems like a better
direction to err on than failing correct configurations too soon for
particularly large clusters.
  • Loading branch information
williamhbaker committed Nov 18, 2024
1 parent 650c853 commit 97a0c87
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
4 changes: 2 additions & 2 deletions source-kafka/src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
RegisteredSchema::{Avro, Json, Protobuf},
SchemaRegistryClient, TopicSchema,
},
KAFKA_TIMEOUT,
KAFKA_METADATA_TIMEOUT,
};

static KAFKA_INTERNAL_TOPICS: [&str; 3] = ["__consumer_offsets", "__amazon_msk_canary", "_schemas"];
Expand All @@ -28,7 +28,7 @@ pub async fn do_discover(req: Discover) -> Result<Vec<discovered::Binding>> {
let consumer = config.to_consumer().await?;

let meta = consumer
.fetch_metadata(None, KAFKA_TIMEOUT)
.fetch_metadata(None, KAFKA_METADATA_TIMEOUT)
.context("Could not connect to bootstrap server with the provided configuration. This may be due to an incorrect configuration for authentication or bootstrap servers. Double check your configuration and try again.")?;

let mut all_topics: Vec<String> = meta
Expand Down
4 changes: 2 additions & 2 deletions source-kafka/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod msk_oauthbearer;
pub mod pull;
pub mod schema_registry;

const KAFKA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
const KAFKA_METADATA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);

pub async fn run_connector(
mut stdin: io::BufReader<io::Stdin>,
Expand Down Expand Up @@ -129,7 +129,7 @@ async fn do_validate(req: Validate) -> Result<Vec<ValidatedBinding>> {
let consumer = config.to_consumer().await?;

consumer
.fetch_metadata(None, KAFKA_TIMEOUT)
.fetch_metadata(None, KAFKA_METADATA_TIMEOUT)
.context("Could not connect to bootstrap server with the provided configuration. This may be due to an incorrect configuration for authentication or bootstrap servers. Double check your configuration and try again.")?;

match config.schema_registry {
Expand Down

0 comments on commit 97a0c87

Please sign in to comment.