Skip to content

Commit

Permalink
materialize-kafka: remove MSK connection option
Browse files Browse the repository at this point in the history
Connecting to AWK MSK with the rust Kafka library we are using requires the
client to call "poll" so that an auth token is generated. But we need to use an
admin client to create topics during the materialization `Apply`, and there is
no way that I could find to call "poll" with the admin client to generate a
token.

It's possible that I'm missing something here, but I tried pretty hard to make
it work. If it it turns out that anybody wants to use this connector with MSK we
can revisit this later. For now I think it's best to remove MSK as an option,
since it isn't going to work well for most users unless they somehow pre-create
all of the topics the connector needs, which is the only other option I can
think of.
  • Loading branch information
williamhbaker committed Dec 11, 2024
1 parent c16d094 commit a728c2d
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 249 deletions.
125 changes: 11 additions & 114 deletions materialize-kafka/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use anyhow::Result;
use rdkafka::{
admin::AdminClient,
client::{ClientContext, OAuthToken},
producer::{ProducerContext, ThreadedProducer},
ClientConfig, Message,
client::DefaultClientContext,
producer::{DefaultProducerContext, ThreadedProducer},
ClientConfig,
};
use schemars::{schema::RootSchema, JsonSchema};
use serde::{Deserialize, Serialize};
Expand All @@ -28,12 +28,6 @@ pub enum Credentials {
username: String,
password: String,
},
#[serde(rename = "AWS")]
AWS {
aws_access_key_id: String,
aws_secret_access_key: String,
region: String,
},
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -142,38 +136,6 @@ impl JsonSchema for EndpointConfig {
"password",
"username"
]
}, {
"title": "AWS MSK IAM",
"properties": {
"auth_type": {
"type": "string",
"default": "AWS",
"const": "AWS",
"order": 0
},
"aws_access_key_id": {
"title": "AWS Access Key ID",
"type": "string",
"order": 1
},
"aws_secret_access_key": {
"order": 2,
"secret": true,
"title": "AWS Secret Access Key",
"type": "string"
},
"region": {
"order": 3,
"title": "AWS Region",
"type": "string"
}
},
"required": [
"auth_type",
"aws_access_key_id",
"aws_secret_access_key",
"region"
]
}]
},
"tls": {
Expand Down Expand Up @@ -248,59 +210,6 @@ impl JsonSchema for EndpointConfig {
}
}

pub struct FlowClientContext {
auth: Option<Credentials>,
}

impl ClientContext for FlowClientContext {
const ENABLE_REFRESH_OAUTH_TOKEN: bool = true;

fn generate_oauth_token(
&self,
_oauthbearer_config: Option<&str>,
) -> Result<OAuthToken, Box<dyn std::error::Error>> {
match &self.auth {
Some(Credentials::AWS {
aws_access_key_id,
aws_secret_access_key,
region,
}) => {
let (token, lifetime_ms) = crate::msk_oauthbearer::token(
region,
aws_access_key_id,
aws_secret_access_key,
)?;
Ok(OAuthToken {
// This is just a descriptive name of the principal which is
// accessing the resource, not a specific constant
principal_name: "flow-kafka-materialize".to_string(),
token,
lifetime_ms,
})
}
_ => Err(anyhow::anyhow!("generate_oauth_token called without AWS credentials").into()),
}
}
}

impl ProducerContext for FlowClientContext {
type DeliveryOpaque = ();

fn delivery(
&self,
delivery_result: &rdkafka::message::DeliveryResult<'_>,
_: Self::DeliveryOpaque,
) {
if let Err((err, msg)) = delivery_result {
tracing::warn!(
"failed to deliver message for topic {}: {}",
msg.topic(),
err.to_string()
);
}
}
}

impl EndpointConfig {
pub fn validate(&self) -> Result<()> {
if matches!(self.message_format, MessageFormat::Avro) && self.schema_registry.is_none() {
Expand All @@ -310,20 +219,18 @@ impl EndpointConfig {
Ok(())
}

pub fn to_producer(&self) -> Result<ThreadedProducer<FlowClientContext>> {
let (mut config, ctx) = self.common_config()?;
pub fn to_producer(&self) -> Result<ThreadedProducer<DefaultProducerContext>> {
let mut config = self.common_config()?;
config.set("compression.type", "lz4");
Ok(config.create_with_context(ctx)?)
Ok(config.create()?)
}

pub fn to_admin(&self) -> Result<AdminClient<FlowClientContext>> {
// TODO(whb): Will this generate an OAuth token without calling `poll`?
// May not work with MSK etc.
let (config, ctx) = self.common_config()?;
Ok(config.create_with_context(ctx)?)
pub fn to_admin(&self) -> Result<AdminClient<DefaultClientContext>> {
let config = self.common_config()?;
Ok(config.create()?)
}

fn common_config(&self) -> Result<(ClientConfig, FlowClientContext)> {
fn common_config(&self) -> Result<ClientConfig> {
let mut config = ClientConfig::new();

config.set("bootstrap.servers", self.bootstrap_servers.clone());
Expand All @@ -339,20 +246,10 @@ impl EndpointConfig {
config.set("sasl.username", username);
config.set("sasl.password", password);
}
Some(Credentials::AWS { .. }) => {
if self.security_protocol() != "SASL_SSL" {
anyhow::bail!("must use tls=system_certificates for AWS")
}
config.set("sasl.mechanism", "OAUTHBEARER");
}
None => (),
}

let ctx = FlowClientContext {
auth: self.credentials.clone(),
};

Ok((config, ctx))
Ok(config)
}

fn security_protocol(&self) -> &'static str {
Expand Down
1 change: 0 additions & 1 deletion materialize-kafka/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use validate::do_validate;
pub mod apply;
pub mod binding_info;
pub mod configuration;
pub mod msk_oauthbearer;
pub mod transactor;
pub mod validate;

Expand Down
101 changes: 0 additions & 101 deletions materialize-kafka/src/msk_oauthbearer.rs

This file was deleted.

33 changes: 0 additions & 33 deletions materialize-kafka/tests/snapshots/test__spec.snap
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,6 @@ expression: "serde_json::to_string_pretty(&got).unwrap()"
"username"
],
"title": "SASL (User & Password)"
},
{
"properties": {
"auth_type": {
"const": "AWS",
"default": "AWS",
"order": 0,
"type": "string"
},
"aws_access_key_id": {
"order": 1,
"title": "AWS Access Key ID",
"type": "string"
},
"aws_secret_access_key": {
"order": 2,
"secret": true,
"title": "AWS Secret Access Key",
"type": "string"
},
"region": {
"order": 3,
"title": "AWS Region",
"type": "string"
}
},
"required": [
"auth_type",
"aws_access_key_id",
"aws_secret_access_key",
"region"
],
"title": "AWS MSK IAM"
}
],
"order": 1,
Expand Down

0 comments on commit a728c2d

Please sign in to comment.