Skip to content

Commit

Permalink
source-kafka: discover collection keys for Avro schemas using schema …
Browse files Browse the repository at this point in the history
…registry

Use the schema registry API to look up registered schemas for topics and
discover appropriate collection keys for the resulting collections.

This initial implementation only works with Avro schemas. Support for JSON
schemas will follow. Protobuf is also an option but that will probably be an
enhancement for a later time.

Decoding Avro documents will also need to be added, and that will come in a
following commit.
  • Loading branch information
williamhbaker committed Oct 29, 2024
1 parent b803231 commit a6c3af1
Show file tree
Hide file tree
Showing 17 changed files with 2,315 additions and 61 deletions.
1,236 changes: 1,219 additions & 17 deletions source-kafka/Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions source-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ edition = "2021"


[dependencies]
doc = { git = "https://github.com/estuary/flow" }
json = { git = "https://github.com/estuary/flow" }
proto-flow = { git = "https://github.com/estuary/flow" }

anyhow = "1.0"
Expand All @@ -26,7 +28,11 @@ tracing-subscriber = { version = "0.3", features = [
"env-filter",
"time",
] }
reqwest = { version = "0.12", features = ["json"] }
futures = "0.3"
apache-avro = "0.17"


[dev-dependencies]
insta = { version = "1", features = ["json", "serde"] }
schema_registry_converter = { version = "4.2.0", features = ["avro"] }
15 changes: 15 additions & 0 deletions source-kafka/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ services:
networks:
- flow-test

schema-registry:
image: confluentinc/cp-schema-registry:7.7.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- db
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'db:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- flow-test

networks:
flow-test:
name: flow-test
Expand Down
40 changes: 40 additions & 0 deletions source-kafka/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct EndpointConfig {
bootstrap_servers: String,
credentials: Option<Credentials>,
tls: Option<TlsSettings>,
pub schema_registry: Option<SchemaRegistryConfig>,
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -53,6 +54,13 @@ pub enum TlsSettings {
SystemCertificates,
}

#[derive(Serialize, Deserialize)]
pub struct SchemaRegistryConfig {
pub endpoint: String,
pub username: String,
pub password: String,
}

impl JsonSchema for EndpointConfig {
fn schema_name() -> String {
"EndpointConfig".to_owned()
Expand Down Expand Up @@ -162,6 +170,38 @@ impl JsonSchema for EndpointConfig {
"title": "TLS Settings",
"type": "string",
"order": 2
},
"schema_registry": {
"title": "Schema Registry",
"description": "Connection details for interacting with a schema registry. This is necessary for processing messages encoded with Avro.",
"type": "object",
"properties": {
"endpoint": {
"type": "string",
"title": "Schema Registry Endpoint",
"description": "Schema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud",
"order": 0
},
"username": {
"type": "string",
"title": "Schema Registry Username",
"description": "Schema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.",
"order": 1
},
"password": {
"type": "string",
"title": "Schema Registry Password",
"description": "Schema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.",
"order": 2,
"secret": true
}
},
"required": [
"endpoint",
"username",
"password"
],
"order": 3
}
}
}))
Expand Down
Loading

0 comments on commit a6c3af1

Please sign in to comment.