From a6c3af1f9efff67bf7a5ceb06b2a1ae757688c47 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Tue, 29 Oct 2024 14:09:47 -0400 Subject: [PATCH] source-kafka: discover collection keys for Avro schemas using schema registry Use the schema registry API to look up registered schemas for topics and discover appropriate collection keys for the resulting collections. This initial implementation only works with Avro schemas. Support for JSON schemas will follow. Protobuf is also an option but that will probably be an enhancement for a later time. Decoding Avro documents will also need to be added, and that will come in a following commit. --- source-kafka/Cargo.lock | 1236 ++++++++++++++++- source-kafka/Cargo.toml | 6 + source-kafka/docker-compose.yaml | 15 + source-kafka/src/configuration.rs | 40 + source-kafka/src/discover.rs | 467 ++++++- source-kafka/src/lib.rs | 1 + source-kafka/src/schema_registry.rs | 202 +++ ..._avro record with scalar compound key.snap | 55 + ...ests__nested avro record with scalars.snap | 59 + ...source_kafka__discover__tests__no key.snap | 35 + ...er__tests__single non-scalar avro key.snap | 35 + ...ests__single nullable scalar avro key.snap | 35 + ...scover__tests__single scalar avro key.snap | 39 + .../tests/snapshots/test__discover.snap | 34 +- source-kafka/tests/snapshots/test__spec.snap | 34 +- source-kafka/tests/test.flow.yaml | 4 + source-kafka/tests/test.rs | 79 +- 17 files changed, 2315 insertions(+), 61 deletions(-) create mode 100644 source-kafka/src/schema_registry.rs create mode 100644 source-kafka/src/snapshots/source_kafka__discover__tests__avro record with scalar compound key.snap create mode 100644 source-kafka/src/snapshots/source_kafka__discover__tests__nested avro record with scalars.snap create mode 100644 source-kafka/src/snapshots/source_kafka__discover__tests__no key.snap create mode 100644 source-kafka/src/snapshots/source_kafka__discover__tests__single non-scalar avro key.snap create mode 100644 source-kafka/src/snapshots/source_kafka__discover__tests__single nullable scalar avro key.snap create mode 100644 source-kafka/src/snapshots/source_kafka__discover__tests__single scalar avro key.snap diff --git a/source-kafka/Cargo.lock b/source-kafka/Cargo.lock index 4493e650a1..0745df1865 100644 --- a/source-kafka/Cargo.lock +++ b/source-kafka/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93b8a41dbe230ad5087cc721f8d41611de654542180586b315d9f4cf6b72bef" +dependencies = [ + "psl-types", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -17,6 +26,35 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -26,12 +64,48 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "anyhow" version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37bf3594c4c988a53154954629820791dde498571819ae4ca50ca811e060cc95" +[[package]] +name = "apache-avro" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aef82843a0ec9f8b19567445ad2421ceeb1d711514384bdd3d49fe37102ee13" +dependencies = [ + "bigdecimal 0.4.5", + "digest", + "libflate", + "log", + "num-bigint", + "quad-rand", + "rand", + "regex-lite", + "serde", + "serde_bytes", + "serde_json", + "strum", + "strum_macros", + "thiserror", + "typed-builder", + "uuid", +] + +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.4.0" @@ -183,17 +257,17 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "http-body 1.0.1", "httparse", - "hyper", - "hyper-rustls", + "hyper 0.14.31", + "hyper-rustls 0.24.2", "once_cell", "pin-project-lite", "pin-utils", - "rustls", + "rustls 0.21.12", "tokio", "tracing", ] @@ -279,6 +353,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -301,12 +381,76 @@ dependencies = [ "vsimd", ] +[[package]] +name = "bigdecimal" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6773ddc0eafc0e509fb60e48dff7f450f8e674a0686ae8605e8d9901bd5eefa" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + +[[package]] +name = "bigdecimal" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d712318a27c7150326677b321a5fa91b55f6d9034ffd67f20319e147d40cee" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", + "serde", +] + +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "bitvec" +version = "0.19.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55f93d0ef3363c364d5976646a38f04cf67cfe1d4c8d160cdea02cab2c116b33" +dependencies = [ + "funty 1.1.0", + "radium 0.5.3", + "tap", + "wyz 0.2.0", +] + +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty 2.0.0", + "radium 0.7.0", + "tap", + "wyz 0.5.1", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -316,6 +460,40 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + +[[package]] +name = "bytecheck" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" +dependencies = [ + "bytecheck_derive", + "ptr_meta", + "simdutf8", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.7.2" @@ -393,6 +571,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.14" @@ -402,6 +589,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -412,6 +614,26 @@ dependencies = [ "typenum", ] +[[package]] +name = "dary_heap" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04d2cd9c18b9f454ed67da600630b021a8a80bf33f8c95896ab33aaf1c26b728" + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deranged" version = "0.3.11" @@ -419,6 +641,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -432,6 +655,35 @@ dependencies = [ "subtle", ] +[[package]] +name = "doc" +version = "0.0.0" +source = "git+https://github.com/estuary/flow#60e47f697826a8128b533cc5f4600acb5d80cdbc" +dependencies = [ + "base64 0.13.1", + "bigdecimal 0.3.1", + "bumpalo", + "bytes", + "fancy-regex", + "futures", + "fxhash", + "itertools 0.10.5", + "json", + "lz4", + "proto-gazette", + "rkyv", + "schemars", + "serde", + "serde_json", + "tempfile", + "thiserror", + "time", + "tracing", + "tuple", + "url", + "uuid", +] + [[package]] name = "duct" version = "0.13.7" @@ -462,6 +714,15 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -478,6 +739,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "fancy-regex" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0678ab2d46fa5195aaf59ad034c083d351377d4af57f3e073c074d0da3e3c766" +dependencies = [ + "bit-set", + "regex", +] + [[package]] name = "fastrand" version = "2.1.1" @@ -496,6 +767,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -505,6 +791,33 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "funty" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" + +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -512,6 +825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -520,6 +834,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -538,10 +880,25 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", +] + +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", ] [[package]] @@ -590,6 +947,44 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.8", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash 0.8.11", + "allocator-api2", +] + [[package]] name = "hashbrown" version = "0.15.0" @@ -707,7 +1102,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -721,6 +1116,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -729,12 +1144,74 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper", + "hyper 0.14.31", "log", - "rustls", + "rustls 0.21.12", "rustls-native-certs", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.5.0", + "hyper-util", + "rustls 0.23.15", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", +] + +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.5.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.5.0", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", ] [[package]] @@ -744,7 +1221,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", ] [[package]] @@ -760,6 +1237,31 @@ dependencies = [ "similar", ] +[[package]] +name = "ipnet" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" + +[[package]] +name = "iri-string" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0586ad318a04c73acdbad33f67969519b5452c80770c4c72059a686da48a7e" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -775,6 +1277,38 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "js-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "json" +version = "0.0.0" +source = "git+https://github.com/estuary/flow#60e47f697826a8128b533cc5f4600acb5d80cdbc" +dependencies = [ + "addr", + "bigdecimal 0.3.1", + "bitvec 0.19.6", + "fancy-regex", + "fxhash", + "iri-string", + "itertools 0.10.5", + "lazy_static", + "percent-encoding", + "serde", + "serde_json", + "thiserror", + "time", + "tracing", + "url", + "uuid", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -787,6 +1321,36 @@ version = "0.2.161" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +[[package]] +name = "libflate" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45d9dfdc14ea4ef0900c1cddbc8dcd553fbaacd8a4a282cf4018ae9dd04fb21e" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" +dependencies = [ + "core2", + "hashbrown 0.14.5", + "rle-decode-fast", +] + +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + [[package]] name = "libz-sys" version = "1.1.20" @@ -827,6 +1391,25 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lz4" +version = "1.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -842,6 +1425,12 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -869,6 +1458,23 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -879,6 +1485,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", + "serde", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -939,6 +1556,32 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "openssl" +version = "0.10.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "openssl-probe" version = "0.1.5" @@ -1019,7 +1662,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6eea3058763d6e656105d1403cb04e0a41b7bbac6362d413e7c33be0c32279c9" dependencies = [ "heck", - "itertools", + "itertools 0.13.0", "prost", "prost-types", ] @@ -1079,6 +1722,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + [[package]] name = "prettyplease" version = "0.2.22" @@ -1126,7 +1778,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", "heck", - "itertools", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -1146,7 +1798,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.79", @@ -1164,7 +1816,7 @@ dependencies = [ [[package]] name = "proto-flow" version = "0.0.0" -source = "git+https://github.com/estuary/flow#fcc99e87afea299e8595cff978883c1f7cc8e8d5" +source = "git+https://github.com/estuary/flow#60e47f697826a8128b533cc5f4600acb5d80cdbc" dependencies = [ "bytes", "pbjson", @@ -1179,7 +1831,7 @@ dependencies = [ [[package]] name = "proto-gazette" version = "0.0.0" -source = "git+https://github.com/estuary/flow#fcc99e87afea299e8595cff978883c1f7cc8e8d5" +source = "git+https://github.com/estuary/flow#60e47f697826a8128b533cc5f4600acb5d80cdbc" dependencies = [ "bytes", "pbjson", @@ -1190,6 +1842,38 @@ dependencies = [ "uuid", ] +[[package]] +name = "psl-types" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" + +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "quad-rand" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b76f1009795ca44bb5aaae8fd3f18953e209259c33d9b059b1f53d58ab7511db" + [[package]] name = "quote" version = "1.0.37" @@ -1199,6 +1883,48 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8" + +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "rdkafka" version = "0.36.2" @@ -1291,6 +2017,58 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "rend" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" +dependencies = [ + "bytecheck", +] + +[[package]] +name = "reqwest" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f713147fbe92361e52392c73b8c9e48c04c6625bce969ef54dc901e58e042a7b" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.0", + "hyper-rustls 0.27.3", + "hyper-tls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile 2.2.0", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "windows-registry", +] + [[package]] name = "ring" version = "0.17.8" @@ -1306,6 +2084,41 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rkyv" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" +dependencies = [ + "bitvec 1.0.1", + "bytecheck", + "bytes", + "hashbrown 0.12.3", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", + "tinyvec", + "uuid", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1342,10 +2155,23 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.23.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993" +dependencies = [ + "once_cell", + "rustls-pki-types", + "rustls-webpki 0.102.8", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -1353,7 +2179,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -1367,6 +2193,21 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -1377,6 +2218,23 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + +[[package]] +name = "rustversion" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" + [[package]] name = "ryu" version = "1.0.18" @@ -1404,6 +2262,21 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "schema_registry_converter" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcc3cf40651cf503827a34bcd7efbbd4750a7e3adc6768bb8089977e4d07303b" +dependencies = [ + "apache-avro", + "byteorder", + "dashmap", + "futures", + "reqwest", + "serde", + "serde_json", +] + [[package]] name = "schemars" version = "0.8.21" @@ -1444,6 +2317,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "security-framework" version = "2.11.1" @@ -1482,6 +2361,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.210" @@ -1516,6 +2404,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha2" version = "0.10.8" @@ -1561,6 +2461,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "similar" version = "2.6.0" @@ -1597,16 +2503,22 @@ name = "source-kafka" version = "0.1.0" dependencies = [ "anyhow", + "apache-avro", "aws-sdk-iam", "aws-sigv4", "base64 0.22.1", + "doc", + "futures", "hex", "highway", "http 0.2.12", "insta", + "json", "lazy_static", "proto-flow", "rdkafka", + "reqwest", + "schema_registry_converter", "schemars", "serde", "serde_json", @@ -1621,6 +2533,25 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.79", +] + [[package]] name = "subtle" version = "2.6.1" @@ -1649,6 +2580,42 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "tempfile" version = "3.13.0" @@ -1723,6 +2690,21 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinyvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.40.0" @@ -1752,13 +2734,34 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.12", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.15", + "rustls-pki-types", "tokio", ] @@ -1879,24 +2882,80 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tuple" +version = "0.0.0" +source = "git+https://github.com/estuary/flow#60e47f697826a8128b533cc5f4600acb5d80cdbc" +dependencies = [ + "memchr", + "serde_json", +] + +[[package]] +name = "typed-builder" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06fbd5b8de54c5f7c91f6fe4cebb949be2125d7758e630bb58b1d831dbce600" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9534daa9fd3ed0bd911d462a37f172228077e7abf18c18a5f67199d959205f8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "typenum" version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "unicode-bidi" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893" + [[package]] name = "unicode-ident" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +[[package]] +name = "unicode-normalization" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" +dependencies = [ + "tinyvec", +] + [[package]] name = "untrusted" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "url" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + [[package]] name = "urlencoding" version = "2.1.3" @@ -1952,6 +3011,83 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.79", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" + +[[package]] +name = "web-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1974,6 +3110,36 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -2065,12 +3231,48 @@ dependencies = [ "memchr", ] +[[package]] +name = "wyz" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" + +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "xmlparser" version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "zeroize" version = "1.8.1" diff --git a/source-kafka/Cargo.toml b/source-kafka/Cargo.toml index 912f53a3f8..6904607230 100644 --- a/source-kafka/Cargo.toml +++ b/source-kafka/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" [dependencies] +doc = { git = "https://github.com/estuary/flow" } +json = { git = "https://github.com/estuary/flow" } proto-flow = { git = "https://github.com/estuary/flow" } anyhow = "1.0" @@ -26,7 +28,11 @@ tracing-subscriber = { version = "0.3", features = [ "env-filter", "time", ] } +reqwest = { version = "0.12", features = ["json"] } +futures = "0.3" +apache-avro = "0.17" [dev-dependencies] insta = { version = "1", features = ["json", "serde"] } +schema_registry_converter = { version = "4.2.0", features = ["avro"] } diff --git a/source-kafka/docker-compose.yaml b/source-kafka/docker-compose.yaml index d8d1802f04..da3451da33 100644 --- a/source-kafka/docker-compose.yaml +++ b/source-kafka/docker-compose.yaml @@ -29,6 +29,21 @@ services: networks: - flow-test + schema-registry: + image: confluentinc/cp-schema-registry:7.7.1 + hostname: schema-registry + container_name: schema-registry + depends_on: + - db + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'db:29092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + networks: + - flow-test + networks: flow-test: name: flow-test diff --git a/source-kafka/src/configuration.rs b/source-kafka/src/configuration.rs index 39642463c4..0357ed391a 100644 --- a/source-kafka/src/configuration.rs +++ b/source-kafka/src/configuration.rs @@ -10,6 +10,7 @@ pub struct EndpointConfig { bootstrap_servers: String, credentials: Option, tls: Option, + pub schema_registry: Option, } #[derive(Serialize, Deserialize, Clone)] @@ -53,6 +54,13 @@ pub enum TlsSettings { SystemCertificates, } +#[derive(Serialize, Deserialize)] +pub struct SchemaRegistryConfig { + pub endpoint: String, + pub username: String, + pub password: String, +} + impl JsonSchema for EndpointConfig { fn schema_name() -> String { "EndpointConfig".to_owned() @@ -162,6 +170,38 @@ impl JsonSchema for EndpointConfig { "title": "TLS Settings", "type": "string", "order": 2 + }, + "schema_registry": { + "title": "Schema Registry", + "description": "Connection details for interacting with a schema registry. This is necessary for processing messages encoded with Avro.", + "type": "object", + "properties": { + "endpoint": { + "type": "string", + "title": "Schema Registry Endpoint", + "description": "Schema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud", + "order": 0 + }, + "username": { + "type": "string", + "title": "Schema Registry Username", + "description": "Schema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.", + "order": 1 + }, + "password": { + "type": "string", + "title": "Schema Registry Password", + "description": "Schema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.", + "order": 2, + "secret": true + } + }, + "required": [ + "endpoint", + "username", + "password" + ], + "order": 3 } } })) diff --git a/source-kafka/src/discover.rs b/source-kafka/src/discover.rs index c2dc876d2d..ba53125919 100644 --- a/source-kafka/src/discover.rs +++ b/source-kafka/src/discover.rs @@ -1,14 +1,27 @@ +use std::collections::HashMap; + use anyhow::Result; +use apache_avro::schema::Schema as AvroSchema; +use doc::{ + shape::{schema::to_schema, ObjProperty}, + Shape, +}; +use json::schema as JsonSchema; use proto_flow::capture::{request::Discover, response::discovered}; use rdkafka::consumer::Consumer; +use schemars::schema::RootSchema; use serde_json::json; use crate::{ configuration::{EndpointConfig, Resource}, + schema_registry::{ + RegisteredSchema::Avro, RegisteredSchema::Json, RegisteredSchema::Protobuf, + SchemaRegistryClient, TopicSchema, + }, KAFKA_TIMEOUT, }; -static KAFKA_INTERNAL_TOPICS: [&str; 2] = ["__consumer_offsets", "__amazon_msk_canary"]; +static KAFKA_INTERNAL_TOPICS: [&str; 3] = ["__consumer_offsets", "__amazon_msk_canary", "_schemas"]; pub async fn do_discover(req: Discover) -> Result> { let config: EndpointConfig = serde_json::from_str(&req.config_json)?; @@ -31,44 +44,432 @@ pub async fn do_discover(req: Discover) -> Result> { all_topics.sort(); - // TODO(whb): Consider information from schema registry for generating - // schemas, and selecting topics to capture. + let registered_schemas = match config.schema_registry { + Some(cfg) => { + let client = SchemaRegistryClient::new(cfg.endpoint, cfg.username, cfg.password); + client.schemas_for_topics(&all_topics).await? + } + None => HashMap::new(), + }; Ok(all_topics .into_iter() - .map(|s| discovered::Binding { - recommended_name: s.clone(), - resource_config_json: serde_json::to_string(&Resource { topic: s.clone() }) + .filter_map(|topic| { + let registered_schema = match registered_schemas.get(&topic) { + Some(s) => s, + None => &TopicSchema::default(), + }; + + match (®istered_schema.key, ®istered_schema.value) { + (None, None) => (), + (Some(Avro(_)), Some(Avro(_))) => (), + (None, Some(Avro(_))) => (), + (Some(Avro(_)), None) => (), + // TODO(whb): Support JSON and Protobuf. + _ => return None, + }; + + let (collection_schema, key_ptrs) = topic_schema_to_collection_spec(registered_schema); + + Some(discovered::Binding { + recommended_name: topic.to_owned(), + resource_config_json: serde_json::to_string(&Resource { + topic: topic.to_owned(), + }) .expect("resource config must serialize"), - document_schema_json: serde_json::to_string(&json!({ - "x-infer-schema": true, + document_schema_json: serde_json::to_string(&collection_schema) + .expect("document schema must serialize"), + key: key_ptrs, + resource_path: vec![topic.to_owned()], + ..Default::default() + }) + }) + .collect()) +} + +fn topic_schema_to_collection_spec(topic_schema: &TopicSchema) -> (RootSchema, Vec) { + let mut collection_key = vec!["/_meta/partition".to_string(), "/_meta/offset".to_string()]; + let doc_schema_json = json!({ + "x-infer-schema": true, + "type": "object", + "properties": { + "_meta": { "type": "object", "properties": { - "_meta": { - "type": "object", - "properties": { - "topic": { - "description": "The topic the message was read from", - "type": "string", - }, - "partition": { - "description": "The partition the message was read from", - "type": "integer", - }, - "offset": { - "description": "The offset of the message within the partition", - "type": "integer", - } - }, - "required": ["partition", "offset"] + "topic": { + "description": "The topic the message was read from", + "type": "string", + }, + "partition": { + "description": "The partition the message was read from", + "type": "integer", + }, + "offset": { + "description": "The offset of the message within the partition", + "type": "integer", } }, - "required": ["_meta"] - })) - .expect("document schema must serialize"), - key: vec!["/_meta/partition".to_string(), "/_meta/offset".to_string()], - resource_path: vec![s.clone()], - ..Default::default() - }) - .collect()) + "required": ["offset", "partition"] + } + }, + "required": ["_meta"] + }); + + let mut collection_schema: RootSchema = serde_json::from_value(doc_schema_json).unwrap(); + + if let Some(mut key_shape) = match &topic_schema.key { + Some(Avro(schema)) => avro_key_schema_to_shape(schema), + Some(Json(_)) => todo!(), + Some(Protobuf) => todo!(), + None => None, + } { + if !key_shape.type_.overlaps(JsonSchema::types::OBJECT) { + // The topic key is a single value not part of a record, and + // that cannot be represented as a JSON document. There has + // to be a key/value pair in the document, so transmute this + // unnamed scalar value into a document with a synthetic key + // to reference its value. + let scalar_shape = key_shape.clone(); + key_shape = Shape::nothing(); + key_shape.type_ = JsonSchema::types::OBJECT; + key_shape.object.properties = vec![ObjProperty { + name: "_key".into(), + is_required: true, + shape: scalar_shape, + }]; + } + + collection_key = key_shape + .locations() + .into_iter() + .filter(|(_, _, shape, ..)| !shape.type_.overlaps(JsonSchema::types::OBJECT)) + .map(|(ptr, ..)| ptr.to_string()) + .collect(); + + let mut key_schema = to_schema(key_shape); + + collection_schema + .schema + .object() + .properties + .append(&mut key_schema.schema.object().properties); + + collection_schema + .schema + .object() + .required + .append(&mut key_schema.schema.object().required); + } + + (collection_schema, collection_key) +} + +fn avro_key_schema_to_shape(schema: &AvroSchema) -> Option { + let mut shape = Shape::nothing(); + + shape.type_ = match schema { + AvroSchema::Boolean => JsonSchema::types::BOOLEAN, + AvroSchema::Int | AvroSchema::Long => JsonSchema::types::INTEGER, + AvroSchema::String => JsonSchema::types::STRING, + AvroSchema::Bytes | AvroSchema::Fixed(_) => { + shape.string.content_encoding = Some("base64".into()); + JsonSchema::types::STRING + } + AvroSchema::Decimal(_) | AvroSchema::BigDecimal => { + shape.string.format = Some(JsonSchema::formats::Format::Number); + JsonSchema::types::STRING + } + AvroSchema::Uuid => { + shape.string.format = Some(JsonSchema::formats::Format::Uuid); + JsonSchema::types::STRING + } + AvroSchema::Date => { + shape.string.format = Some(JsonSchema::formats::Format::Date); + JsonSchema::types::STRING + } + AvroSchema::TimeMillis | AvroSchema::TimeMicros => { + shape.string.format = Some(JsonSchema::formats::Format::Time); + JsonSchema::types::STRING + } + AvroSchema::TimestampMillis + | AvroSchema::TimestampMicros + | AvroSchema::TimestampNanos + | AvroSchema::LocalTimestampMillis + | AvroSchema::LocalTimestampMicros + | AvroSchema::LocalTimestampNanos => { + shape.string.format = Some(JsonSchema::formats::Format::DateTime); + JsonSchema::types::STRING + } + AvroSchema::Duration => { + shape.string.format = Some(JsonSchema::formats::Format::Duration); + JsonSchema::types::STRING + } + AvroSchema::Enum(enum_schema) => { + shape.enum_ = Some( + enum_schema + .symbols + .iter() + .map(|s| s.to_string().into()) + .collect(), + ); + JsonSchema::types::STRING + } + AvroSchema::Record(record_schema) => { + shape.object.properties = record_schema + .fields + .iter() + .map(|field| { + let mut field_shape = avro_key_schema_to_shape(&field.schema)?; + + if let Some(doc) = &field.doc { + field_shape.description = Some(doc.to_string().into()); + } + + if let Some(default) = &field.default { + field_shape.default = Some((default.to_owned(), None).into()); + } + + Some(ObjProperty { + name: field.name.clone().into(), + is_required: field.default.is_none(), + shape: field_shape, + }) + }) + .collect::>>()?; + JsonSchema::types::OBJECT + } + + // Schemas that allow 'null' are not schematized as keys since + // nullable keys are not very useful in practice. + AvroSchema::Null => return None, + + // Similarly, schemas with multiple types or a single type with an + // additional explicit 'null' are not very useful in practice, + // although technically allowed by Flow, so they won't be + // schematized. + AvroSchema::Union(_) => return None, + + // We could perhaps treat floating points as string-encoded numbers, + // but if that's what they were then they should probably specified + // as decimals. And this is more consistent with what is achievable + // with JSON schemas. + AvroSchema::Float => return None, + AvroSchema::Double => return None, + + // Arrays and maps just don't make any sense as key schemas. + AvroSchema::Array(_) => return None, + AvroSchema::Map(_) => return None, + + AvroSchema::Ref { name } => panic!("key schema contains ref {}", name), + }; + + Some(shape) +} + +#[cfg(test)] +mod tests { + use insta::assert_snapshot; + + use super::*; + + #[test] + fn test_topic_schema_to_collection_spec() { + let test_cases = [ + ( + "no key", + &TopicSchema { + key: None, + ..Default::default() + }, + ), + ( + "single scalar avro key", + &TopicSchema { + key: Some(Avro( + apache_avro::Schema::parse(&json!({"type": "string"})).unwrap(), + )), + ..Default::default() + }, + ), + ( + "single nullable scalar avro key", + &TopicSchema { + key: Some(Avro( + apache_avro::Schema::parse(&json!({"type": ["null", "string"]})).unwrap(), + )), + ..Default::default() + }, + ), + ( + "single non-scalar avro key", + &TopicSchema { + key: Some(Avro( + apache_avro::Schema::parse(&json!({"type": "array", "items": "string"})) + .unwrap(), + )), + ..Default::default() + }, + ), + ( + "avro record with scalar compound key", + &TopicSchema { + key: Some(Avro( + apache_avro::Schema::parse(&json!({ + "type": "record", + "name": "someRecord", + "fields": [ + {"name": "firstKey", "type": "string", "doc": "the first key field"}, + {"name": "secondKey", "type": "long", "doc": "the second key field"}, + { + "name": "thirdKey", + "type": "enum", + "symbols": ["a", "b", "c"], + "default": "a", + "doc": "the third key field, which is an enum with a default value", + }, + ], + })) + .unwrap(), + )), + ..Default::default() + }, + ), + ( + "nested avro record with scalars", + &TopicSchema { + key: Some(Avro( + apache_avro::Schema::parse(&json!({ + "type": "record", + "name": "someRecord", + "fields": [ + {"name": "firstKey", "type": "string", "doc": "the first key field"}, + { + "name": "nestedRecord", + "type": "record", + "fields": [ + {"name": "secondKeyNested", "type": "long", "doc": "the second key field"}, + {"name": "thirdKeyNested", "type": "bytes", "doc": "the third key field"}, + ], + }, + ], + })) + .unwrap(), + )), + ..Default::default() + }, + ), + ]; + + for (name, input) in test_cases { + let (discovered_schema, discovered_key) = topic_schema_to_collection_spec(input); + let mut snap = String::new(); + snap.push_str(&serde_json::to_string(&discovered_key).unwrap()); + snap.push_str("\n"); + snap.push_str(&serde_json::to_string_pretty(&discovered_schema).unwrap()); + assert_snapshot!(name, snap); + } + } + + #[test] + fn test_avro_key_schema_to_shape() { + let must_schema = |v: serde_json::Value| apache_avro::Schema::parse(&v).unwrap(); + + let test_cases = [ + ( + vec![must_schema(json!({"type": "boolean"}))], + Some(json!({"type": "boolean"})), + ), + ( + vec![ + must_schema(json!({"type": "int"})), + must_schema(json!({"type": "long"})), + ], + Some(json!({"type": "integer"})), + ), + ( + vec![must_schema(json!({"type": "string"}))], + Some(json!({"type": "string"})), + ), + ( + vec![ + must_schema(json!({"type": "bytes"})), + must_schema(json!({"type": "fixed", "name": "foo", "size": 10})), + ], + Some(json!({"type": "string", "contentEncoding": "base64"})), + ), + ( + vec![ + must_schema( + json!({"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}), + ), + must_schema( + json!({"type": "bytes", "logicalType": "big-decimal", "precision": 4, "scale": 2}), + ), + ], + Some(json!({"type": "string", "format": "number"})), + ), + ( + vec![must_schema( + json!({"type": "string", "logicalType": "uuid"}), + )], + Some(json!({"type": "string", "format": "uuid"})), + ), + ( + vec![must_schema(json!({"type": "int", "logicalType": "date"}))], + Some(json!({"type": "string", "format": "date"})), + ), + ( + vec![ + must_schema(json!({"type": "int", "logicalType": "time-millis"})), + must_schema(json!({"type": "long", "logicalType": "time-micros"})), + ], + Some(json!({"type": "string", "format": "time"})), + ), + ( + vec![ + must_schema(json!({"type": "long", "logicalType": "timestamp-millis"})), + must_schema(json!({"type": "long", "logicalType": "timestamp-micros"})), + must_schema(json!({"type": "long", "logicalType": "local-timestamp-millis"})), + must_schema(json!({"type": "long", "logicalType": "local-timestamp-micros"})), + // TODO(whb): These nanosecond timestamps are not parsed by + // the Avro library and I'm not sure if they are really part + // of the spec. + // must_schema(json!({"type": "long", "logicalType": "timestamp-nanos"})), + // must_schema(json!({"type": "long", "logicalType": "local-timestamp-nanos"})), + ], + Some(json!({"type": "string", "format": "date-time"})), + ), + ( + vec![must_schema( + json!({"type": "fixed", "name": "foo", "size": 12, "logicalType": "duration"}), + )], + Some(json!({"type": "string", "format": "duration"})), + ), + ( + vec![ + must_schema(json!({"type": "null"})), + must_schema(json!({"type": ["null", "long"]})), + must_schema(json!({"type": "float"})), + must_schema(json!({"type": "double"})), + must_schema(json!({"type": "array", "items": "string"})), + must_schema(json!({"type": "map", "values": "string"})), + ], + None, + ), + ]; + + for (avros, want) in test_cases { + for avro in avros { + match avro_key_schema_to_shape(&avro) { + Some(shape) => { + assert_eq!( + serde_json::to_value(&to_schema(shape).schema).unwrap(), + serde_json::to_value(&want.clone().unwrap()).unwrap() + ) + } + None => assert!(want.is_none()), + } + } + } + } } diff --git a/source-kafka/src/lib.rs b/source-kafka/src/lib.rs index 7f8a4ca6d3..b678350a99 100644 --- a/source-kafka/src/lib.rs +++ b/source-kafka/src/lib.rs @@ -17,6 +17,7 @@ pub mod configuration; pub mod discover; pub mod msk_oauthbearer; pub mod pull; +pub mod schema_registry; const KAFKA_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); diff --git a/source-kafka/src/schema_registry.rs b/source-kafka/src/schema_registry.rs new file mode 100644 index 0000000000..2424366110 --- /dev/null +++ b/source-kafka/src/schema_registry.rs @@ -0,0 +1,202 @@ +use anyhow::Result; +use futures::stream::{self, StreamExt}; +use reqwest::Client; +use schemars::schema::RootSchema; +use serde::Deserialize; +use std::{ + collections::{HashMap, HashSet}, + u32, +}; + +const TOPIC_KEY_SUFFIX: &str = "-key"; +const TOPIC_VALUE_SUFFIX: &str = "-value"; +const CONCURRENT_SCHEMA_REQUESTS: usize = 10; + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct FetchedSchema { + #[serde(default = "SchemaType::default")] + schema_type: SchemaType, + schema: String, + reference: Option, +} + +#[derive(Deserialize, Debug)] +struct SchemaReference { + name: String, + subject: String, + version: u32, +} + +#[derive(Deserialize, Debug)] +struct FetchedLatestVersion { + id: u32, +} + +#[derive(Deserialize, Debug)] +enum SchemaType { + AVRO, + JSON, + PROTOBUF, +} + +impl SchemaType { + fn default() -> Self { + SchemaType::AVRO + } +} + +#[derive(Debug)] +pub enum RegisteredSchema { + Avro(apache_avro::Schema), + Json(RootSchema), + Protobuf, // TODO(whb): Protobuf support. +} + +#[derive(Debug, Default)] +pub struct TopicSchema { + pub key: Option, + pub value: Option, +} + +pub struct SchemaRegistryClient { + endpoint: String, + http: Client, + username: String, + password: String, +} + +impl SchemaRegistryClient { + pub fn new(endpoint: String, username: String, password: String) -> SchemaRegistryClient { + SchemaRegistryClient { + endpoint: endpoint.to_string(), + http: reqwest::Client::default(), + username, + password, + } + } + + pub async fn schemas_for_topics( + &self, + topics: &[String], + ) -> Result> { + let applicable_topics: HashSet = topics.iter().cloned().collect(); + + let subjects: Vec = self + .http + .get(format!("{}/subjects", self.endpoint)) + .basic_auth(&self.username, Some(&self.password)) + .send() + .await? + .json() + .await?; + + let filter_by_suffix = |s: &str, suffix: &str| { + if let Some(s) = s.strip_suffix(suffix) { + if !applicable_topics.contains(s) { + return None; + } + return Some(s.to_string()); + } + None + }; + + let topics_with_key_schema: HashSet = subjects + .iter() + .filter_map(|s| filter_by_suffix(s, TOPIC_KEY_SUFFIX)) + .collect(); + + let topics_with_value_schema: HashSet = subjects + .iter() + .filter_map(|s| filter_by_suffix(s, TOPIC_VALUE_SUFFIX)) + .collect(); + + let schema_futures: Vec<_> = applicable_topics + .iter() + .filter_map(|topic| { + let need_key = topics_with_key_schema.contains(topic); + let need_value = topics_with_value_schema.contains(topic); + if !need_key && !need_value { + return None; + } + Some(async move { + let mut schema = TopicSchema { + key: None, + value: None, + }; + + if need_key { + schema.key = Some(self.fetch_latest_schema(&topic, true).await?) + } + if need_value { + schema.value = Some(self.fetch_latest_schema(&topic, false).await?) + } + + Ok::<(String, TopicSchema), anyhow::Error>((topic.to_owned(), schema)) + }) + }) + .collect(); + + Ok(stream::iter(schema_futures) + .buffer_unordered(CONCURRENT_SCHEMA_REQUESTS) + .collect::>() + .await + .into_iter() + .collect::>>()?) + } + + async fn fetch_schema(&self, id: u32) -> Result { + let fetched: FetchedSchema = self + .http + .get(format!("{}/schemas/ids/{}", self.endpoint, id)) + .basic_auth(&self.username, Some(&self.password)) + .send() + .await? + .json() + .await?; + + match fetched.schema_type { + SchemaType::AVRO => { + // TODO(whb): Resolve references. + let schema = apache_avro::Schema::parse_str(&fetched.schema) + .expect("failed to parse avro schema"); + Ok(RegisteredSchema::Avro(schema)) + } + SchemaType::JSON => { + let schema: RootSchema = + serde_json::from_str(&fetched.schema).expect("failed to parse json schema"); + Ok(RegisteredSchema::Json(schema)) + } + SchemaType::PROTOBUF => Ok(RegisteredSchema::Protobuf), + } + } + + async fn fetch_latest_version(&self, subject: &str) -> Result { + let fetched: FetchedLatestVersion = self + .http + .get(format!( + "{}/subjects/{}/versions/latest", + self.endpoint, subject + )) + .basic_auth(&self.username, Some(&self.password)) + .send() + .await? + .json() + .await?; + Ok(fetched.id) + } + + async fn fetch_latest_schema(&self, topic: &str, key: bool) -> Result { + let subject = format!( + "{}{}", + topic, + if key { + TOPIC_KEY_SUFFIX + } else { + TOPIC_VALUE_SUFFIX + } + ); + let version = self.fetch_latest_version(subject.as_str()).await?; + self.fetch_schema(version).await + } +} diff --git a/source-kafka/src/snapshots/source_kafka__discover__tests__avro record with scalar compound key.snap b/source-kafka/src/snapshots/source_kafka__discover__tests__avro record with scalar compound key.snap new file mode 100644 index 0000000000..6b422b8fe1 --- /dev/null +++ b/source-kafka/src/snapshots/source_kafka__discover__tests__avro record with scalar compound key.snap @@ -0,0 +1,55 @@ +--- +source: src/discover.rs +expression: snap +--- +["/firstKey","/secondKey","/thirdKey"] +{ + "type": "object", + "required": [ + "_meta", + "firstKey", + "secondKey" + ], + "properties": { + "_meta": { + "type": "object", + "required": [ + "offset", + "partition" + ], + "properties": { + "offset": { + "description": "The offset of the message within the partition", + "type": "integer" + }, + "partition": { + "description": "The partition the message was read from", + "type": "integer" + }, + "topic": { + "description": "The topic the message was read from", + "type": "string" + } + } + }, + "firstKey": { + "description": "the first key field", + "type": "string" + }, + "secondKey": { + "description": "the second key field", + "type": "integer" + }, + "thirdKey": { + "description": "the third key field, which is an enum with a default value", + "default": "a", + "type": "string", + "enum": [ + "a", + "b", + "c" + ] + } + }, + "x-infer-schema": true +} diff --git a/source-kafka/src/snapshots/source_kafka__discover__tests__nested avro record with scalars.snap b/source-kafka/src/snapshots/source_kafka__discover__tests__nested avro record with scalars.snap new file mode 100644 index 0000000000..b997d249cc --- /dev/null +++ b/source-kafka/src/snapshots/source_kafka__discover__tests__nested avro record with scalars.snap @@ -0,0 +1,59 @@ +--- +source: src/discover.rs +expression: snap +--- +["/firstKey","/nestedRecord/secondKeyNested","/nestedRecord/thirdKeyNested"] +{ + "type": "object", + "required": [ + "_meta", + "firstKey", + "nestedRecord" + ], + "properties": { + "_meta": { + "type": "object", + "required": [ + "offset", + "partition" + ], + "properties": { + "offset": { + "description": "The offset of the message within the partition", + "type": "integer" + }, + "partition": { + "description": "The partition the message was read from", + "type": "integer" + }, + "topic": { + "description": "The topic the message was read from", + "type": "string" + } + } + }, + "firstKey": { + "description": "the first key field", + "type": "string" + }, + "nestedRecord": { + "type": "object", + "required": [ + "secondKeyNested", + "thirdKeyNested" + ], + "properties": { + "secondKeyNested": { + "description": "the second key field", + "type": "integer" + }, + "thirdKeyNested": { + "description": "the third key field", + "type": "string", + "contentEncoding": "base64" + } + } + } + }, + "x-infer-schema": true +} diff --git a/source-kafka/src/snapshots/source_kafka__discover__tests__no key.snap b/source-kafka/src/snapshots/source_kafka__discover__tests__no key.snap new file mode 100644 index 0000000000..8bf370dbbf --- /dev/null +++ b/source-kafka/src/snapshots/source_kafka__discover__tests__no key.snap @@ -0,0 +1,35 @@ +--- +source: src/discover.rs +expression: snap +--- +["/_meta/partition","/_meta/offset"] +{ + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "type": "object", + "required": [ + "offset", + "partition" + ], + "properties": { + "offset": { + "description": "The offset of the message within the partition", + "type": "integer" + }, + "partition": { + "description": "The partition the message was read from", + "type": "integer" + }, + "topic": { + "description": "The topic the message was read from", + "type": "string" + } + } + } + }, + "x-infer-schema": true +} diff --git a/source-kafka/src/snapshots/source_kafka__discover__tests__single non-scalar avro key.snap b/source-kafka/src/snapshots/source_kafka__discover__tests__single non-scalar avro key.snap new file mode 100644 index 0000000000..8bf370dbbf --- /dev/null +++ b/source-kafka/src/snapshots/source_kafka__discover__tests__single non-scalar avro key.snap @@ -0,0 +1,35 @@ +--- +source: src/discover.rs +expression: snap +--- +["/_meta/partition","/_meta/offset"] +{ + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "type": "object", + "required": [ + "offset", + "partition" + ], + "properties": { + "offset": { + "description": "The offset of the message within the partition", + "type": "integer" + }, + "partition": { + "description": "The partition the message was read from", + "type": "integer" + }, + "topic": { + "description": "The topic the message was read from", + "type": "string" + } + } + } + }, + "x-infer-schema": true +} diff --git a/source-kafka/src/snapshots/source_kafka__discover__tests__single nullable scalar avro key.snap b/source-kafka/src/snapshots/source_kafka__discover__tests__single nullable scalar avro key.snap new file mode 100644 index 0000000000..8bf370dbbf --- /dev/null +++ b/source-kafka/src/snapshots/source_kafka__discover__tests__single nullable scalar avro key.snap @@ -0,0 +1,35 @@ +--- +source: src/discover.rs +expression: snap +--- +["/_meta/partition","/_meta/offset"] +{ + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "type": "object", + "required": [ + "offset", + "partition" + ], + "properties": { + "offset": { + "description": "The offset of the message within the partition", + "type": "integer" + }, + "partition": { + "description": "The partition the message was read from", + "type": "integer" + }, + "topic": { + "description": "The topic the message was read from", + "type": "string" + } + } + } + }, + "x-infer-schema": true +} diff --git a/source-kafka/src/snapshots/source_kafka__discover__tests__single scalar avro key.snap b/source-kafka/src/snapshots/source_kafka__discover__tests__single scalar avro key.snap new file mode 100644 index 0000000000..31b4e3c9fc --- /dev/null +++ b/source-kafka/src/snapshots/source_kafka__discover__tests__single scalar avro key.snap @@ -0,0 +1,39 @@ +--- +source: src/discover.rs +expression: snap +--- +["/_key"] +{ + "type": "object", + "required": [ + "_key", + "_meta" + ], + "properties": { + "_key": { + "type": "string" + }, + "_meta": { + "type": "object", + "required": [ + "offset", + "partition" + ], + "properties": { + "offset": { + "description": "The offset of the message within the partition", + "type": "integer" + }, + "partition": { + "description": "The partition the message was read from", + "type": "integer" + }, + "topic": { + "description": "The topic the message was read from", + "type": "string" + } + } + } + }, + "x-infer-schema": true +} diff --git a/source-kafka/tests/snapshots/test__discover.snap b/source-kafka/tests/snapshots/test__discover.snap index c90cb98788..b4e9517d23 100644 --- a/source-kafka/tests/snapshots/test__discover.snap +++ b/source-kafka/tests/snapshots/test__discover.snap @@ -21,21 +21,37 @@ expression: snap } }, "required": [ - "partition", - "offset" + "offset", + "partition" + ], + "type": "object" + }, + "idx": { + "type": "integer" + }, + "nested": { + "properties": { + "sub_id": { + "type": "integer" + } + }, + "required": [ + "sub_id" ], "type": "object" } }, "required": [ - "_meta" + "_meta", + "idx", + "nested" ], "type": "object", "x-infer-schema": true }, "key": [ - "/_meta/partition", - "/_meta/offset" + "/idx", + "/nested/sub_id" ], "recommendedName": "test-topic-1", "resourceConfig": { @@ -64,8 +80,8 @@ expression: snap } }, "required": [ - "partition", - "offset" + "offset", + "partition" ], "type": "object" } @@ -107,8 +123,8 @@ expression: snap } }, "required": [ - "partition", - "offset" + "offset", + "partition" ], "type": "object" } diff --git a/source-kafka/tests/snapshots/test__spec.snap b/source-kafka/tests/snapshots/test__spec.snap index d2bf4ec076..784a14a5bc 100644 --- a/source-kafka/tests/snapshots/test__spec.snap +++ b/source-kafka/tests/snapshots/test__spec.snap @@ -1,6 +1,6 @@ --- source: tests/test.rs -expression: "serde_json::to_string_pretty(&thing).unwrap()" +expression: "serde_json::to_string_pretty(&got).unwrap()" --- { "configSchema": { @@ -94,6 +94,38 @@ expression: "serde_json::to_string_pretty(&thing).unwrap()" "title": "Credentials", "type": "object" }, + "schema_registry": { + "description": "Connection details for interacting with a schema registry. This is necessary for processing messages encoded with Avro.", + "order": 3, + "properties": { + "endpoint": { + "description": "Schema registry API endpoint. For example: https://registry-id.us-east-2.aws.confluent.cloud", + "order": 0, + "title": "Schema Registry Endpoint", + "type": "string" + }, + "password": { + "description": "Schema registry password to use for authentication. If you are using Confluent Cloud, this will be the 'Secret' from your schema registry API key.", + "order": 2, + "secret": true, + "title": "Schema Registry Password", + "type": "string" + }, + "username": { + "description": "Schema registry username to use for authentication. If you are using Confluent Cloud, this will be the 'Key' from your schema registry API key.", + "order": 1, + "title": "Schema Registry Username", + "type": "string" + } + }, + "required": [ + "endpoint", + "password", + "username" + ], + "title": "Schema Registry", + "type": "object" + }, "tls": { "default": "system_certificates", "description": "Controls how should TLS certificates be found or used.", diff --git a/source-kafka/tests/test.flow.yaml b/source-kafka/tests/test.flow.yaml index 5c2c3d73a0..1ec9658bb1 100644 --- a/source-kafka/tests/test.flow.yaml +++ b/source-kafka/tests/test.flow.yaml @@ -10,6 +10,10 @@ captures: - run config: bootstrap_servers: "localhost:9092" + schema_registry: + endpoint: http://localhost:8081 + username: user + password: password bindings: - resource: topic: test-topic-1 diff --git a/source-kafka/tests/test.rs b/source-kafka/tests/test.rs index f35a86cc90..e230b47f8a 100644 --- a/source-kafka/tests/test.rs +++ b/source-kafka/tests/test.rs @@ -1,10 +1,15 @@ use std::time::Duration; use anyhow::Result; +use apache_avro::Schema; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; use rdkafka::message::{Header, OwnedHeaders}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::ClientConfig; +use schema_registry_converter::async_impl::avro::AvroEncoder; +use schema_registry_converter::async_impl::schema_registry::SrSettings; +use schema_registry_converter::schema_registry_common::SubjectNameStrategy; +use serde::Serialize; use serde_json::json; #[test] @@ -124,7 +129,50 @@ async fn test_capture_resume() { insta::assert_snapshot!(snap); } +#[derive(Serialize)] +struct AvroKey { + idx: i32, + nested: NestedAvroRecord, +} + +#[derive(Serialize)] +struct NestedAvroRecord { + sub_id: i32, +} + async fn setup_topics(bootstrap_servers: &str) { + let avro_key_schema = json!({ + "type": "record", + "name": "AvroKey", + "fields": [ + {"name": "idx", "type": "int"}, + { + "name": "nested", + "type": { + "type": "record", + "name": "NestedAvroRecord", + "fields": [ + {"name": "sub_id", "type": "int"} + ] + } + } + ] + }); + + let avro_key_schema = Schema::parse(&avro_key_schema).unwrap(); + + let http = reqwest::Client::default(); + assert!(http + .post("http://localhost:8081/subjects/test-topic-1-key/versions") + .json(&json!({"schema": avro_key_schema.canonical_form()})) + .send() + .await + .unwrap() + .status() + .is_success()); + + let avro_encoder = AvroEncoder::new(SrSettings::new(String::from("http://localhost:8081"))); + let admin: AdminClient<_> = ClientConfig::new() .set("bootstrap.servers", bootstrap_servers) .create() @@ -157,7 +205,36 @@ async fn setup_topics(bootstrap_servers: &str) { .create() .unwrap(); - for topic in ["test-topic-1", "test-topic-2", "test-topic-3"] { + for idx in 0..9 { + let topic = "test-topic-1"; + let key = avro_encoder + .encode_struct( + AvroKey { + idx, + nested: NestedAvroRecord { sub_id: idx }, + }, + &SubjectNameStrategy::TopicNameStrategy(topic.to_string(), true), + ) + .await + .unwrap(); + + producer + .send( + FutureRecord::to(topic) + .partition(idx % 3) + .key(&key) + .payload(&json!({"payload": idx}).to_string()) + .headers(OwnedHeaders::new().insert(Header { + key: "header-key", + value: Some(&format!("header-value-{}", idx)), + })), + None, + ) + .await + .unwrap(); + } + + for topic in ["test-topic-2", "test-topic-3"] { for idx in 0..9 { producer .send(