diff --git a/materialize-kafka/src/configuration.rs b/materialize-kafka/src/configuration.rs index 179a36c7a..73f19f10a 100644 --- a/materialize-kafka/src/configuration.rs +++ b/materialize-kafka/src/configuration.rs @@ -1,9 +1,9 @@ use anyhow::Result; use rdkafka::{ admin::AdminClient, - client::{ClientContext, OAuthToken}, - producer::{ProducerContext, ThreadedProducer}, - ClientConfig, Message, + client::DefaultClientContext, + producer::{DefaultProducerContext, ThreadedProducer}, + ClientConfig, }; use schemars::{schema::RootSchema, JsonSchema}; use serde::{Deserialize, Serialize}; @@ -28,12 +28,6 @@ pub enum Credentials { username: String, password: String, }, - #[serde(rename = "AWS")] - AWS { - aws_access_key_id: String, - aws_secret_access_key: String, - region: String, - }, } #[derive(Serialize, Deserialize, Clone)] @@ -142,38 +136,6 @@ impl JsonSchema for EndpointConfig { "password", "username" ] - }, { - "title": "AWS MSK IAM", - "properties": { - "auth_type": { - "type": "string", - "default": "AWS", - "const": "AWS", - "order": 0 - }, - "aws_access_key_id": { - "title": "AWS Access Key ID", - "type": "string", - "order": 1 - }, - "aws_secret_access_key": { - "order": 2, - "secret": true, - "title": "AWS Secret Access Key", - "type": "string" - }, - "region": { - "order": 3, - "title": "AWS Region", - "type": "string" - } - }, - "required": [ - "auth_type", - "aws_access_key_id", - "aws_secret_access_key", - "region" - ] }] }, "tls": { @@ -248,59 +210,6 @@ impl JsonSchema for EndpointConfig { } } -pub struct FlowClientContext { - auth: Option, -} - -impl ClientContext for FlowClientContext { - const ENABLE_REFRESH_OAUTH_TOKEN: bool = true; - - fn generate_oauth_token( - &self, - _oauthbearer_config: Option<&str>, - ) -> Result> { - match &self.auth { - Some(Credentials::AWS { - aws_access_key_id, - aws_secret_access_key, - region, - }) => { - let (token, lifetime_ms) = crate::msk_oauthbearer::token( - region, - aws_access_key_id, - aws_secret_access_key, - )?; - Ok(OAuthToken { - // This is just a descriptive name of the principal which is - // accessing the resource, not a specific constant - principal_name: "flow-kafka-materialize".to_string(), - token, - lifetime_ms, - }) - } - _ => Err(anyhow::anyhow!("generate_oauth_token called without AWS credentials").into()), - } - } -} - -impl ProducerContext for FlowClientContext { - type DeliveryOpaque = (); - - fn delivery( - &self, - delivery_result: &rdkafka::message::DeliveryResult<'_>, - _: Self::DeliveryOpaque, - ) { - if let Err((err, msg)) = delivery_result { - tracing::warn!( - "failed to deliver message for topic {}: {}", - msg.topic(), - err.to_string() - ); - } - } -} - impl EndpointConfig { pub fn validate(&self) -> Result<()> { if matches!(self.message_format, MessageFormat::Avro) && self.schema_registry.is_none() { @@ -310,20 +219,18 @@ impl EndpointConfig { Ok(()) } - pub fn to_producer(&self) -> Result> { - let (mut config, ctx) = self.common_config()?; + pub fn to_producer(&self) -> Result> { + let mut config = self.common_config()?; config.set("compression.type", "lz4"); - Ok(config.create_with_context(ctx)?) + Ok(config.create()?) } - pub fn to_admin(&self) -> Result> { - // TODO(whb): Will this generate an OAuth token without calling `poll`? - // May not work with MSK etc. - let (config, ctx) = self.common_config()?; - Ok(config.create_with_context(ctx)?) + pub fn to_admin(&self) -> Result> { + let config = self.common_config()?; + Ok(config.create()?) } - fn common_config(&self) -> Result<(ClientConfig, FlowClientContext)> { + fn common_config(&self) -> Result { let mut config = ClientConfig::new(); config.set("bootstrap.servers", self.bootstrap_servers.clone()); @@ -339,20 +246,10 @@ impl EndpointConfig { config.set("sasl.username", username); config.set("sasl.password", password); } - Some(Credentials::AWS { .. }) => { - if self.security_protocol() != "SASL_SSL" { - anyhow::bail!("must use tls=system_certificates for AWS") - } - config.set("sasl.mechanism", "OAUTHBEARER"); - } None => (), } - let ctx = FlowClientContext { - auth: self.credentials.clone(), - }; - - Ok((config, ctx)) + Ok(config) } fn security_protocol(&self) -> &'static str { diff --git a/materialize-kafka/src/lib.rs b/materialize-kafka/src/lib.rs index c0eefe165..6dea41a2d 100644 --- a/materialize-kafka/src/lib.rs +++ b/materialize-kafka/src/lib.rs @@ -14,7 +14,6 @@ use validate::do_validate; pub mod apply; pub mod binding_info; pub mod configuration; -pub mod msk_oauthbearer; pub mod transactor; pub mod validate; diff --git a/materialize-kafka/src/msk_oauthbearer.rs b/materialize-kafka/src/msk_oauthbearer.rs deleted file mode 100644 index c26cadd68..000000000 --- a/materialize-kafka/src/msk_oauthbearer.rs +++ /dev/null @@ -1,101 +0,0 @@ -use anyhow::Result; -use aws_sdk_iam::config::Credentials; -use aws_sigv4::http_request::{ - sign, SignableBody, SignableRequest, SignatureLocation, SigningSettings, -}; -use aws_sigv4::sign::v4; -use base64::prelude::{Engine as _, BASE64_URL_SAFE_NO_PAD}; -use http; -use std::time::{Duration, SystemTime}; - -/* Generate a token for Amazon Streaming Kafka service. - * This is based on AWS V4: https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html - * - * This process works by us first preparing a draft request for connecting to kafka (action=kafka-cluster:Connect), - * and then we use a library to generate the signature for it, which itself becomes part of the query parameters, so we end - * up with a GET request url that includes X-Amz-Signature query parameter (along with other new - * query parameters). Finally we add the user agent after the signature has been computed (this is in line with official sdks of AWS, - * see: https://github.com/aws/aws-msk-iam-sasl-signer-go/blob/main/signer/msk_auth_token_provider.go#L188-L191 - * - * For more detailed specifics on how the signature itself is calculated, see: - * https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html - * - * The output is a url-safe base64 encoding of the GET request URL and an expiry timestamp - * (milliseconds since Unix epoch) - */ - -// Taken from the Go SDK's implementation -// https://github.com/aws/aws-msk-iam-sasl-signer-go/blob/main/signer/msk_auth_token_provider.go#L33 -const DEFAULT_EXPIRY_SECONDS: u64 = 900; -pub fn token(region: &str, access_key_id: &str, secret_access_key: &str) -> Result<(String, i64)> { - let endpoint = format!( - "https://kafka.{}.amazonaws.com/?Action=kafka-cluster%3AConnect", - region - ); - let expiry_duration = Duration::new(DEFAULT_EXPIRY_SECONDS, 0); - let now = SystemTime::now(); - - // Set up information and settings for the signing - let identity = Credentials::new( - access_key_id, - secret_access_key, - None, - None, - "user credentials", - ) - .into(); - let mut signing_settings = SigningSettings::default(); - - // The default behaviour of the signing library is to put the signature in headers. - // This ensures that the signature is placed as X-Amz-Signature in GET - // query parameters instead. This is important because the token we use for OAuth Bearer - // is the base64 encoding of a presigned GET URL. - signing_settings.signature_location = SignatureLocation::QueryParams; - signing_settings.expires_in = Some(expiry_duration); - - let signing_params = v4::SigningParams::builder() - .identity(&identity) - .region(region) - // This is the constant name of the service for which we are signing the token - .name("kafka-cluster") - .time(now) - .settings(signing_settings) - .build() - .unwrap() - .into(); - - // Prepare a signable HTTP request - let signable_request = SignableRequest::new( - "GET", - &endpoint, - std::iter::empty(), - SignableBody::Bytes(&[]), - ) - .expect("signable request"); - - // Create an empty draft HTTP request. The `sign` function provides us with a bunch of - // "signing instructions" which we then apply to this draft HTTP request. The signing - // instructions basically add a bunch of query parameters to this that have been computed to - // this http request. - let mut signed_req = http::Request::builder() - .method("GET") - .uri(&endpoint) - .body("") - .unwrap(); - - // Sign and then apply the signature to the request - let (signing_instructions, _signature) = sign(signable_request, &signing_params)?.into_parts(); - signing_instructions.apply_to_request_http0x(&mut signed_req); - - // Finally add User Agent to the final signed url. This is based on the Go SDK that does this - // after signing: https://github.com/aws/aws-msk-iam-sasl-signer-go/blob/main/signer/msk_auth_token_provider.go#L188 - let signed_url = format!("{}&User-Agent=EstuaryFlowCapture", signed_req.uri()); - - let token = BASE64_URL_SAFE_NO_PAD.encode(signed_url); - let expires_in = (now + expiry_duration) - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis(); - - Ok((token, expires_in.try_into()?)) -} diff --git a/materialize-kafka/tests/snapshots/test__spec.snap b/materialize-kafka/tests/snapshots/test__spec.snap index 1a1ab36d3..83af2edb0 100644 --- a/materialize-kafka/tests/snapshots/test__spec.snap +++ b/materialize-kafka/tests/snapshots/test__spec.snap @@ -58,39 +58,6 @@ expression: "serde_json::to_string_pretty(&got).unwrap()" "username" ], "title": "SASL (User & Password)" - }, - { - "properties": { - "auth_type": { - "const": "AWS", - "default": "AWS", - "order": 0, - "type": "string" - }, - "aws_access_key_id": { - "order": 1, - "title": "AWS Access Key ID", - "type": "string" - }, - "aws_secret_access_key": { - "order": 2, - "secret": true, - "title": "AWS Secret Access Key", - "type": "string" - }, - "region": { - "order": 3, - "title": "AWS Region", - "type": "string" - } - }, - "required": [ - "auth_type", - "aws_access_key_id", - "aws_secret_access_key", - "region" - ], - "title": "AWS MSK IAM" } ], "order": 1,