From 0ead44e52d6ddf985b0e5e7008ba60cc15be2253 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Wed, 3 Jan 2024 15:37:53 -0800 Subject: [PATCH] Arrow json decoding (#477) --- Cargo.lock | 353 +++++++++--------- Cargo.toml | 2 + arroyo-df/src/external.rs | 3 +- arroyo-df/src/lib.rs | 25 +- arroyo-df/src/tables.rs | 128 +++---- arroyo-df/src/types.rs | 33 +- arroyo-formats/Cargo.toml | 2 + arroyo-formats/src/json.rs | 37 +- arroyo-formats/src/lib.rs | 120 ++++-- arroyo-formats/src/old.rs | 2 +- arroyo-rpc/src/api_types/connections.rs | 61 ++- arroyo-rpc/src/formats.rs | 6 + arroyo-rpc/src/lib.rs | 6 + arroyo-types/src/lib.rs | 47 ++- arroyo-worker/Cargo.toml | 4 +- .../src/connectors/kafka/source/mod.rs | 15 +- arroyo-worker/src/connectors/sse.rs | 25 +- arroyo-worker/src/engine.rs | 141 +++++-- 18 files changed, 622 insertions(+), 388 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c8280fc6..9f15b23d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,9 +43,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" +checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if", "const-random", @@ -150,9 +150,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.76" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355" +checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" dependencies = [ "backtrace", ] @@ -222,7 +222,7 @@ version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04a8801ebb147ad240b2d978d3ab9f73c9ccd4557ba6a03e7800496770ed10e0" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow-arith 46.0.0", "arrow-array 46.0.0", "arrow-buffer 46.0.0", @@ -243,7 +243,7 @@ name = "arrow" version = "49.0.0" source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=49.0.0/parquet_bytes#9fb8f0aac9a462e2ef1e70680ff7cdde6daff38c" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow-arith 49.0.0", "arrow-array 49.0.0", "arrow-buffer 49.0.0", @@ -295,7 +295,7 @@ version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "226fdc6c3a4ae154a74c24091d36a90b514f0ed7112f5b8322c1d8f354d8e20d" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow-buffer 46.0.0", "arrow-data 46.0.0", "arrow-schema 46.0.0", @@ -310,7 +310,7 @@ name = "arrow-array" version = "49.0.0" source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=49.0.0/parquet_bytes#9fb8f0aac9a462e2ef1e70680ff7cdde6daff38c" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow-buffer 49.0.0", "arrow-data 49.0.0", "arrow-schema 49.0.0", @@ -491,8 +491,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "49.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82565c91fd627922ebfe2810ee4e8346841b6f9361b87505a9acea38b614fee" +source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=49.0.0/json#bc9aa2e6e69e50b9eb8709e950426f7351b9c3ac" dependencies = [ "arrow-array 49.0.0", "arrow-buffer 49.0.0", @@ -544,7 +543,7 @@ version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e32afc1329f7b372463b21c6ca502b07cf237e1ed420d87706c1770bb0ebd38" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow-array 46.0.0", "arrow-buffer 46.0.0", "arrow-data 46.0.0", @@ -559,7 +558,7 @@ version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow-array 49.0.0", "arrow-buffer 49.0.0", "arrow-data 49.0.0", @@ -601,7 +600,7 @@ version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow-array 49.0.0", "arrow-buffer 49.0.0", "arrow-data 49.0.0", @@ -647,7 +646,7 @@ version = "0.9.0-dev" dependencies = [ "anyhow", "bollard", - "clap 4.4.11", + "clap 4.4.12", "open", "reqwest", "tokio", @@ -699,7 +698,7 @@ dependencies = [ "schemars", "serde", "serde_json", - "syn 2.0.42", + "syn 2.0.46", "thiserror", "time", "tokio", @@ -803,7 +802,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", - "syn 2.0.42", + "syn 2.0.46", "thiserror", "time", "tokio", @@ -837,7 +836,7 @@ dependencies = [ "serde", "serde_json", "strum", - "syn 2.0.42", + "syn 2.0.46", "tokio", "toml 0.7.8", "tonic", @@ -875,7 +874,7 @@ dependencies = [ "serde", "serde_json", "serde_json_path", - "syn 2.0.42", + "syn 2.0.46", "tokio", "tokio-stream", "tracing", @@ -891,6 +890,8 @@ dependencies = [ "apache-avro", "arrow 49.0.0", "arrow-array 49.0.0", + "arrow-json 49.0.0", + "arrow-schema 49.0.0", "arroyo-rpc", "arroyo-types", "bincode 2.0.0-rc.3", @@ -909,7 +910,7 @@ version = "0.9.0-dev" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -1069,7 +1070,7 @@ dependencies = [ name = "arroyo-worker" version = "0.9.0-dev" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "anyhow", "apache-avro", "arrow 49.0.0", @@ -1163,7 +1164,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" dependencies = [ "concurrent-queue", - "event-listener 4.0.1", + "event-listener 4.0.2", "event-listener-strategy", "futures-core", "pin-project-lite", @@ -1270,7 +1271,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7125e42787d53db9dd54261812ef17e937c95a51e4d291373b670342fa44310c" dependencies = [ - "event-listener 4.0.1", + "event-listener 4.0.2", "event-listener-strategy", "pin-project-lite", ] @@ -1386,7 +1387,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -1397,13 +1398,13 @@ checksum = "e1d90cd0b264dfdd8eb5bad0a2c217c1f88fa96a8573f40e7b12de23fb468f46" [[package]] name = "async-trait" -version = "0.1.75" +version = "0.1.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" +checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -1805,7 +1806,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -1830,7 +1831,7 @@ dependencies = [ "cfg-if", "libc", "miniz_oxide", - "object 0.32.1", + "object 0.32.2", "rustc-demangle", ] @@ -2133,9 +2134,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e23185c0e21df6ed832a12e2bda87c7d1def6842881fb634a8511ced741b0d76" +checksum = "91d7b79e99bfaa0d47da0687c43aa3b7381938a62ad3a6498599039321f660b7" dependencies = [ "chrono", "chrono-tz-build", @@ -2181,9 +2182,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.11" +version = "4.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfaff671f6b22ca62406885ece523383b9b64022e341e53e009a62ebc47a45f2" +checksum = "dcfab8ba68f3668e89f6ff60f5b205cea56aa7b769451a59f34b8682f51c056d" dependencies = [ "clap_builder", "clap_derive 4.4.7", @@ -2191,9 +2192,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.11" +version = "4.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a216b506622bb1d316cd51328dce24e07bdff4a6128a47c7e7fad11878d5adbb" +checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9" dependencies = [ "anstream", "anstyle", @@ -2223,7 +2224,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -2395,7 +2396,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d21d8c5cf5940c13b1d82977fc5d4230714ce76d991c5c00b8a1c791c691dcf" dependencies = [ "chumsky", - "clap 4.4.11", + "clap 4.4.12", "codegen_template", "heck", "indexmap 1.9.3", @@ -2591,9 +2592,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.9" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c3242926edf34aec4ac3a77108ad4854bffaa2e4ddc1824124ce59231302d5" +checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2" dependencies = [ "cfg-if", "crossbeam-utils", @@ -2612,21 +2613,20 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.16" +version = "0.9.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2fe95351b870527a5d09bf563ed3c97c0cffb87cf1c78a591bf48bb218d9aa" +checksum = "0e3681d554572a651dda4186cd47240627c3d0114d45a95f6ad27f2f22e7548d" dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset 0.9.0", ] [[package]] name = "crossbeam-utils" -version = "0.8.17" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" +checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" dependencies = [ "cfg-if", ] @@ -2712,7 +2712,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e" dependencies = [ "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -2760,7 +2760,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -2782,7 +2782,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -2816,7 +2816,7 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "193fd1e7628278d0641c5122860f9a7fd6a1d77d055838d12f55d15bbe28d4d0" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow 49.0.0", "arrow-array 49.0.0", "arrow-schema 49.0.0", @@ -2863,7 +2863,7 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "548bc49c4a489e3de474813831ea556dc9d368f9ed8d867b1493da42e8e9f613" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow 49.0.0", "arrow-array 49.0.0", "arrow-buffer 49.0.0", @@ -2904,7 +2904,7 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33c473f72d8d81a532e63f6e562ed66dd9209dfd8e433d9712abd42444ee161e" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow 49.0.0", "arrow-array 49.0.0", "datafusion-common", @@ -2938,7 +2938,7 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1ca7e35ca22f9dc506c2375b92054b03ccf91afe25c0a90b395a1473a09735" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow 49.0.0", "arrow-array 49.0.0", "arrow-buffer 49.0.0", @@ -2972,7 +2972,7 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddde97adefcca3a55257c646ffee2a95b6cac66f74d1146a6e3a6dbb37830631" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow 49.0.0", "arrow-array 49.0.0", "arrow-buffer 49.0.0", @@ -3140,9 +3140,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", "serde", @@ -3385,7 +3385,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -3436,9 +3436,9 @@ dependencies = [ [[package]] name = "event-listener" -version = "4.0.1" +version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84f2cdcf274580f2d63697192d744727b3198894b1bf02923643bf59e2c26712" +checksum = "218a870470cce1469024e9fb66b901aa983929d81304a1cdb299f28118e550d5" dependencies = [ "concurrent-queue", "parking", @@ -3451,7 +3451,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" dependencies = [ - "event-listener 4.0.1", + "event-listener 4.0.2", "pin-project-lite", ] @@ -3746,7 +3746,7 @@ checksum = "71466f33efdca5973620ca6de3ad69c17e98b6fdf2c40cf52bb0fae6fd3e21ba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -3885,9 +3885,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -3900,9 +3900,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -3910,15 +3910,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -3927,9 +3927,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-lite" @@ -3961,26 +3961,26 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-timer" @@ -3990,9 +3990,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -4158,7 +4158,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", ] [[package]] @@ -4167,7 +4167,7 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "allocator-api2", ] @@ -4452,9 +4452,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.58" +version = "0.1.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" +checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -4593,13 +4593,13 @@ dependencies = [ [[package]] name = "is-terminal" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" +checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ "hermit-abi 0.3.3", "rustix 0.38.28", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4870,7 +4870,7 @@ version = "0.84.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6834a4a1f53a8528d5f346cdd141a77dbda31beb33dab4bf24fa4ecf6c508744" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "async-trait", "backoff", "derivative", @@ -5251,9 +5251,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.4" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "memfd" @@ -5282,15 +5282,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "memoffset" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" -dependencies = [ - "autocfg", -] - [[package]] name = "miette" version = "5.10.0" @@ -5320,7 +5311,7 @@ checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -5635,9 +5626,9 @@ dependencies = [ [[package]] name = "object" -version = "0.32.1" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" dependencies = [ "memchr", ] @@ -5746,7 +5737,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -5803,12 +5794,12 @@ dependencies = [ [[package]] name = "os_pipe" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177" +checksum = "57119c3b893986491ec9aa85056780d3a0f3cf4da7cc09dd3650dbd6c6738fb9" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -5913,7 +5904,7 @@ version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad2cba786ae07da4d73371a88b9e0f9d3ffac1a9badc83922e0e15814f5c5fa" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow-array 46.0.0", "arrow-buffer 46.0.0", "arrow-cast 46.0.0", @@ -5946,7 +5937,7 @@ name = "parquet" version = "49.0.0" source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=49.0.0/parquet_bytes#9fb8f0aac9a462e2ef1e70680ff7cdde6daff38c" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "arrow-array 49.0.0", "arrow-buffer 49.0.0", "arrow-cast 49.0.0", @@ -6116,7 +6107,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -6233,7 +6224,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -6304,12 +6295,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "prettyplease" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" +checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5" dependencies = [ "proc-macro2", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -6357,9 +6348,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.71" +version = "1.0.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" +checksum = "2de98502f212cfcea8d0bb305bd0f49d7ebdd75b64ba0a68f937d888f4e0d6db" dependencies = [ "unicode-ident", ] @@ -6446,7 +6437,7 @@ dependencies = [ "prost 0.12.3", "prost-types", "regex", - "syn 2.0.42", + "syn 2.0.46", "tempfile", "which", ] @@ -6474,7 +6465,7 @@ dependencies = [ "itertools 0.11.0", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -6586,9 +6577,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -6786,7 +6777,7 @@ dependencies = [ "quote", "refinery-core", "regex", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -7167,9 +7158,9 @@ dependencies = [ [[package]] name = "rust-embed" -version = "8.1.0" +version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "810294a8a4a0853d4118e3b94bb079905f2107c7fe979d8f0faae98765eb6378" +checksum = "a82c0bbc10308ed323529fd3c1dce8badda635aa319a5ff0e6466f33b8101e3f" dependencies = [ "rust-embed-impl", "rust-embed-utils", @@ -7178,23 +7169,23 @@ dependencies = [ [[package]] name = "rust-embed-impl" -version = "8.1.0" +version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfc144a1273124a67b8c1d7cd19f5695d1878b31569c0512f6086f0f4676604e" +checksum = "6227c01b1783cdfee1bcf844eb44594cd16ec71c35305bf1c9fb5aade2735e16" dependencies = [ "proc-macro2", "quote", "rust-embed-utils", "shellexpand", - "syn 2.0.42", + "syn 2.0.46", "walkdir", ] [[package]] name = "rust-embed-utils" -version = "8.1.0" +version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "816ccd4875431253d6bb54b804bcff4369cbde9bae33defde25fdf6c2ef91d40" +checksum = "8cb0a25bfbb2d4b4402179c2cf030387d9990857ce08a32592c6238db9fa8665" dependencies = [ "sha2 0.10.8", "walkdir", @@ -7365,11 +7356,11 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -7471,9 +7462,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.20" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" +checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" dependencies = [ "serde", ] @@ -7492,9 +7483,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.193" +version = "1.0.194" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "0b114498256798c94a0689e1a15fec6005dee8ac1f41de56404b67afc2a4b773" dependencies = [ "serde_derive", ] @@ -7511,13 +7502,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.194" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "a3385e45322e8f9931410f01b3031ec534c3947d0e94c18049af4d9f9907d4e0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -7533,9 +7524,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.108" +version = "1.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +checksum = "6fbd975230bada99c8bb618e0c365c2eefa219158d5c6c29610fd09ff1833257" dependencies = [ "indexmap 2.1.0", "itoa", @@ -7593,14 +7584,14 @@ checksum = "bb9387330da43020c17237e22c76bd19c93305c75d99ec962c58f385c7e1f5ad" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] name = "serde_path_to_error" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +checksum = "ebd154a240de39fdebcf5775d2675c204d7c13cf39a4c697be6493c8e734337c" dependencies = [ "itoa", "serde", @@ -7608,13 +7599,13 @@ dependencies = [ [[package]] name = "serde_repr" -version = "0.1.17" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145" +checksum = "0b2e6b945e9d3df726b65d6ee24060aff8e3533d431f677a9695db04eff9dfdb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -7635,7 +7626,7 @@ dependencies = [ "proc-macro2", "quote", "serde", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -7676,14 +7667,14 @@ dependencies = [ "darling 0.20.3", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] name = "serde_yaml" -version = "0.9.29" +version = "0.9.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15e0ef66bf939a7c890a0bf6d5a733c70202225f9888a89ed5c62298b019129" +checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38" dependencies = [ "indexmap 2.1.0", "itoa", @@ -7933,13 +7924,13 @@ dependencies = [ [[package]] name = "sqlparser_derive" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9c2e1dde0efa87003e7923d94a90f46e3274ad1649f51de96812be561f041f" +checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.46", ] [[package]] @@ -8003,7 +7994,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -8076,9 +8067,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.42" +version = "2.0.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b7d0a2c048d661a1a59fcd7355baa232f7ed34e0ee4df2eef3c1c1c0d3852d8" +checksum = "89456b690ff72fddcecf231caedbe615c59480c93358a93dfae7fc29e3ebbf0e" dependencies = [ "proc-macro2", "quote", @@ -8114,21 +8105,21 @@ dependencies = [ [[package]] name = "target-lexicon" -version = "0.12.12" +version = "0.12.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c39fd04924ca3a864207c66fc2cd7d22d7c016007f9ce846cbb9326331930a" +checksum = "69758bda2e78f098e4ccb393021a0963bb3442eac05f135c30f61b7370bbafae" [[package]] name = "tempfile" -version = "3.8.1" +version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" +checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" dependencies = [ "cfg-if", "fastrand 2.0.1", "redox_syscall 0.4.1", "rustix 0.38.28", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -8168,7 +8159,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -8179,7 +8170,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", "test-case-core", ] @@ -8202,22 +8193,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.51" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" +checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.51" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" +checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -8334,7 +8325,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -8565,7 +8556,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -8691,7 +8682,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -8807,7 +8798,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -8837,7 +8828,7 @@ dependencies = [ "regress", "schemars", "serde_json", - "syn 2.0.42", + "syn 2.0.46", "thiserror", "unicode-ident", ] @@ -8853,7 +8844,7 @@ dependencies = [ "serde", "serde_json", "serde_tokenstream", - "syn 2.0.42", + "syn 2.0.46", "typify-impl", ] @@ -8998,7 +8989,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -9010,7 +9001,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] @@ -9132,7 +9123,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", "wasm-bindgen-shared", ] @@ -9166,7 +9157,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -9427,7 +9418,7 @@ dependencies = [ "log", "mach", "memfd", - "memoffset 0.8.0", + "memoffset", "paste", "rand", "rustix 0.37.27", @@ -9574,11 +9565,11 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-core" -version = "0.51.1" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -9781,9 +9772,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.30" +version = "0.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b5c3db89721d50d0e2a673f5043fc4722f76dcc352d7b1ab8b8288bed4ed2c5" +checksum = "8434aeec7b290e8da5c3f0d628cb0eac6cabcb31d14bb74f779a08109a5914d6" dependencies = [ "memchr", ] @@ -9871,7 +9862,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.46", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 66ef5e994..2bc225fa2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ tonic-reflection = { version = "0.10" } arrow = { version = "49.0.0" } arrow-array = { version = "49.0.0" } arrow-schema = { version = "49.0.0" } +arrow-json = { version = "49.0.0" } object_store = { version = "0.8.0" } parquet = { version = "49.0.0" } @@ -52,4 +53,5 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/par arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/parquet_bytes'} arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/parquet_bytes'} arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/parquet_bytes'} +arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/json' } object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '0.8.0/put_part_api'} diff --git a/arroyo-df/src/external.rs b/arroyo-df/src/external.rs index 00085e6db..fbe5c3467 100644 --- a/arroyo-df/src/external.rs +++ b/arroyo-df/src/external.rs @@ -1,3 +1,4 @@ +use arrow_schema::FieldRef; use arroyo_datastream::logical::LogicalNode; use std::time::Duration; @@ -14,7 +15,7 @@ pub enum ProcessingMode { #[derive(Clone, Debug)] pub struct SqlSource { pub id: Option, - pub struct_def: StructDef, + pub struct_def: Vec, pub config: ConnectorOp, pub processing_mode: ProcessingMode, pub idle_time: Option, diff --git a/arroyo-df/src/lib.rs b/arroyo-df/src/lib.rs index 29e8ecc71..38353cef4 100644 --- a/arroyo-df/src/lib.rs +++ b/arroyo-df/src/lib.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, bail, Result}; use arrow::array::ArrayRef; use arrow::datatypes::{self, DataType, Field}; -use arrow_schema::{Schema, TimeUnit}; +use arrow_schema::{FieldRef, Schema, TimeUnit}; use arroyo_connectors::Connection; use arroyo_datastream::WindowType; @@ -399,10 +399,11 @@ pub fn parse_dependencies(definition: &str) -> Result { }; } -fn create_table_source(table_name: String, fields: Vec) -> Arc { - let schema = add_timestamp_field_if_missing_arrow(Arc::new( - datatypes::Schema::new_with_metadata(fields, HashMap::new()), - )); +fn create_table_source(table_name: String, fields: Vec) -> Arc { + let schema = add_timestamp_field_if_missing_arrow(Arc::new(Schema::new_with_metadata( + fields, + HashMap::new(), + ))); let table_provider = LogicalBatchInput { table_name, schema }; let wrapped = Arc::new(table_provider); let provider = DefaultTableSource::new(wrapped); @@ -806,7 +807,11 @@ impl TreeNodeRewriter for QueryToGraphVisitor { .fields() .iter() .map(|field| { - Field::new(field.name(), field.data_type().clone(), field.is_nullable()) + Arc::new(Field::new( + field.name(), + field.data_type().clone(), + field.is_nullable(), + )) }) .collect(), ); @@ -887,11 +892,11 @@ impl TreeNodeRewriter for QueryToGraphVisitor { .fields() .iter() .map(|field| { - Field::new( + Arc::new(Field::new( field.name(), field.data_type().clone(), field.is_nullable(), - ) + )) }) .collect(), ), @@ -1027,6 +1032,10 @@ pub async fn parse_and_get_arrow_program( }; } + if inserts.is_empty() { + bail!("The provided SQL does not contain a query"); + } + let mut rewriter = QueryToGraphVisitor::default(); for insert in inserts { let plan = match insert { diff --git a/arroyo-df/src/tables.rs b/arroyo-df/src/tables.rs index 69cb1b111..04f7568cb 100644 --- a/arroyo-df/src/tables.rs +++ b/arroyo-df/src/tables.rs @@ -1,14 +1,16 @@ use std::str::FromStr; +use std::sync::Arc; use std::{collections::HashMap, time::Duration}; use anyhow::{anyhow, bail, Result}; -use arrow_schema::{DataType, Field}; +use arrow_schema::{DataType, Field, FieldRef}; use arroyo_connectors::{connector_for_type, Connection}; use arroyo_datastream::ConnectorOp; use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, SchemaDefinition, SourceField, }; use arroyo_rpc::formats::{BadData, Format, Framing}; +use arroyo_types::ArroyoExtensionType; use datafusion::sql::sqlparser::ast::Query; use datafusion::{ optimizer::{analyzer::Analyzer, optimizer::Optimizer, OptimizerContext}, @@ -53,11 +55,8 @@ pub struct ConnectorTable { #[derive(Debug, Clone)] pub enum FieldSpec { - StructField(StructField), - VirtualField { - field: StructField, - expression: Expr, - }, + StructField(Field), + VirtualField { field: Field, expression: Expr }, } impl FieldSpec { @@ -67,7 +66,7 @@ impl FieldSpec { FieldSpec::VirtualField { .. } => true, } } - fn struct_field(&self) -> &StructField { + fn struct_field(&self) -> &Field { match self { FieldSpec::StructField(f) => f, FieldSpec::VirtualField { field, .. } => field, @@ -75,8 +74,8 @@ impl FieldSpec { } } -impl From for FieldSpec { - fn from(value: StructField) -> Self { +impl From for FieldSpec { + fn from(value: Field) -> Self { FieldSpec::StructField(value) } } @@ -136,7 +135,8 @@ impl From for ConnectorTable { .iter() .map(|f| { let struct_field: StructField = f.clone().into(); - struct_field.into() + let field: Field = struct_field.into(); + field.into() }) .collect(), type_name: schema_type(&value.name, &value.schema), @@ -165,14 +165,11 @@ impl ConnectorTable { fields = fields .into_iter() .map(|field_spec| match &field_spec { - FieldSpec::StructField(struct_field) => match struct_field.data_type { - TypeDef::DataType(DataType::Timestamp(_, None), nullable) => { - let mut coerced = struct_field.clone(); - coerced.data_type = TypeDef::DataType( + FieldSpec::StructField(struct_field) => match struct_field.data_type() { + DataType::Timestamp(_, None) => { + FieldSpec::StructField(struct_field.clone().with_data_type( DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None), - nullable, - ); - FieldSpec::StructField(coerced) + )) } _ => field_spec, }, @@ -197,8 +194,8 @@ impl ConnectorTable { struct_field.clone().try_into().map_err(|_| { anyhow!( "field '{}' has a type '{:?}' that cannot be used in a connection table", - struct_field.name, - struct_field.data_type + struct_field.name(), + struct_field.data_type() ) }) }) @@ -277,11 +274,8 @@ impl ConnectorTable { .fields .iter() .find(|f| { - f.struct_field().name == *field_name - && matches!( - f.struct_field().data_type, - TypeDef::DataType(DataType::Timestamp(..), _) - ) + f.struct_field().name() == field_name + && matches!(f.struct_field().data_type(), DataType::Timestamp(..)) }) .ok_or_else(|| { anyhow!( @@ -290,8 +284,7 @@ impl ConnectorTable { ) })?; - Ok(Some(Expr::Column(Column::new( - field.struct_field().alias.as_ref().cloned(), + Ok(Some(Expr::Column(Column::from_name( field.struct_field().name(), )))) } else { @@ -306,11 +299,8 @@ impl ConnectorTable { .fields .iter() .find(|f| { - f.struct_field().name == *field_name - && matches!( - f.struct_field().data_type, - TypeDef::DataType(DataType::Timestamp(..), _) - ) + f.struct_field().name() == field_name + && matches!(f.struct_field().data_type(), DataType::Timestamp(..)) }) .ok_or_else(|| { anyhow!( @@ -319,8 +309,7 @@ impl ConnectorTable { ) })?; - Ok(Some(Expr::Column(Column::new( - field.struct_field().alias.as_ref().cloned(), + Ok(Some(Expr::Column(Column::from_name( field.struct_field().name(), )))) } else { @@ -362,18 +351,14 @@ impl ConnectorTable { let source = SqlSource { id: self.id, - struct_def: StructDef::new( - self.type_name.clone(), - self.type_name.is_none(), - self.fields - .iter() - .filter_map(|field| match field { - FieldSpec::StructField(struct_field) => Some(struct_field.clone()), - FieldSpec::VirtualField { .. } => None, - }) - .collect(), - self.format.clone(), - ), + struct_def: self + .fields + .iter() + .filter_map(|field| match field { + FieldSpec::StructField(struct_field) => Some(Arc::new(struct_field.clone())), + FieldSpec::VirtualField { .. } => None, + }) + .collect(), config: self.connector_op(), processing_mode: self.processing_mode(), idle_time: self.idle_time, @@ -469,7 +454,7 @@ pub enum Table { ConnectorTable(ConnectorTable), MemoryTable { name: String, - fields: Vec, + fields: Vec, }, TableFromQuery { name: String, @@ -499,14 +484,16 @@ impl Table { .iter() .map(|column| { let name = column.name.value.to_string(); - let data_type = convert_data_type(&column.data_type)?; + let (data_type, extension) = convert_data_type(&column.data_type)?; let nullable = !column .options .iter() .any(|option| matches!(option.option, ColumnOption::NotNull)); - let struct_field = - StructField::new(name, None, TypeDef::DataType(data_type, nullable)); + let struct_field = ArroyoExtensionType::add_metadata( + extension, + Field::new(name, data_type, nullable), + ); let generating_expression = column.options.iter().find_map(|option| { if let ColumnOption::Generated { @@ -522,27 +509,25 @@ impl Table { }) .collect::>>()?; - let physical_struct: StructDef = StructDef::for_fields( - struct_field_pairs - .iter() - .filter_map( - |(field, generating_expression)| match generating_expression { - Some(_) => None, - None => Some(field.clone()), - }, - ) - .collect(), - ); + let physical_fields: Vec<_> = struct_field_pairs + .iter() + .filter_map( + |(field, generating_expression)| match generating_expression { + Some(_) => None, + None => Some(field.clone()), + }, + ) + .collect(); let _physical_schema = DFSchema::new_with_metadata( - physical_struct - .fields + physical_fields .iter() .map(|f| { - let TypeDef::DataType(data_type, nullable) = f.data_type.clone() else { - bail!("expect data type for generated column") - }; - Ok(DFField::new_unqualified(&f.name, data_type, nullable)) + Ok(DFField::new_unqualified( + &f.name(), + f.data_type().clone(), + f.is_nullable(), + )) }) .collect::>>()?, HashMap::new(), @@ -615,7 +600,7 @@ impl Table { name, fields: fields .into_iter() - .map(|f| f.struct_field().clone()) + .map(|f| Arc::new(f.struct_field().clone())) .collect(), })) } @@ -701,18 +686,16 @@ impl Table { Ok(()) } - pub fn get_fields(&self) -> Vec { + pub fn get_fields(&self) -> Vec { match self { - Table::MemoryTable { fields, .. } => { - fields.iter().map(|field| field.clone().into()).collect() - } + Table::MemoryTable { fields, .. } => fields.clone(), Table::ConnectorTable(ConnectorTable { fields, inferred_fields, .. }) => inferred_fields .as_ref() - .map(|fs| fs.iter().map(qualified_field).collect()) + .map(|fs| fs.iter().map(qualified_field).map(Arc::new).collect()) .unwrap_or_else(|| { fields .iter() @@ -724,6 +707,7 @@ impl Table { .fields() .iter() .map(qualified_field) + .map(Arc::new) .collect(), } } diff --git a/arroyo-df/src/types.rs b/arroyo-df/src/types.rs index a31e09a9c..775e4dd68 100644 --- a/arroyo-df/src/types.rs +++ b/arroyo-df/src/types.rs @@ -25,6 +25,7 @@ use crate::avro; use arroyo_rpc::api_types::connections::{ FieldType, PrimitiveType, SourceField, SourceFieldType, StructType, }; +use arroyo_types::ArroyoExtensionType; use datafusion_common::{DFField, DFSchemaRef, ScalarValue}; use proc_macro2::{Ident, TokenStream}; use quote::{format_ident, quote}; @@ -1624,15 +1625,21 @@ pub(crate) fn data_type_as_syn_type(data_type: &DataType) -> syn::Type { // Pulled from DataFusion -pub(crate) fn convert_data_type(sql_type: &SQLDataType) -> Result { +pub(crate) fn convert_data_type( + sql_type: &SQLDataType, +) -> Result<(DataType, Option)> { match sql_type { SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) | SQLDataType::Array(ArrayElemTypeDef::SquareBracket(inner_sql_type)) => { - let data_type = convert_simple_data_type(inner_sql_type)?; - - Ok(DataType::List(Arc::new(Field::new( - "field", data_type, true, - )))) + let (data_type, extension) = convert_simple_data_type(inner_sql_type)?; + + Ok(( + DataType::List(Arc::new(ArroyoExtensionType::add_metadata( + extension, + Field::new("field", data_type, true), + ))), + None, + )) } SQLDataType::Array(ArrayElemTypeDef::None) => { bail!("Arrays with unspecified type is not supported") @@ -1641,8 +1648,14 @@ pub(crate) fn convert_data_type(sql_type: &SQLDataType) -> Result { } } -fn convert_simple_data_type(sql_type: &SQLDataType) -> Result { - match sql_type { +fn convert_simple_data_type( + sql_type: &SQLDataType, +) -> Result<(DataType, Option)> { + if matches!(sql_type, SQLDataType::JSON) { + return Ok((DataType::Utf8, Some(ArroyoExtensionType::JSON))); + } + + let dt = match sql_type { SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean), SQLDataType::TinyInt(_) => Ok(DataType::Int8), SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16), @@ -1700,7 +1713,9 @@ fn convert_simple_data_type(sql_type: &SQLDataType) -> Result { // adds/changes the `SQLDataType` the compiler will tell us on upgrade // and avoid bugs like https://github.com/apache/arrow-datafusion/issues/3059 _ => bail!("Unsupported SQL type {sql_type:?}"), - } + }; + + Ok((dt?, None)) } /// Returns a validated `DataType` for the specified precision and diff --git a/arroyo-formats/Cargo.toml b/arroyo-formats/Cargo.toml index 72df4b2c5..cbeec11de 100644 --- a/arroyo-formats/Cargo.toml +++ b/arroyo-formats/Cargo.toml @@ -13,7 +13,9 @@ serde = {version = "1.0", features = ["derive"]} serde_json = "1.0" utoipa = "3" arrow = { workspace = true } +arrow-schema = { workspace = true } arrow-array = { workspace = true} +arrow-json = { workspace = true } tokio = { version = "1", features = ["full"] } tracing = "0.1" anyhow = "1" diff --git a/arroyo-formats/src/json.rs b/arroyo-formats/src/json.rs index 47bcfd511..94699795f 100644 --- a/arroyo-formats/src/json.rs +++ b/arroyo-formats/src/json.rs @@ -1,13 +1,15 @@ -use arrow::datatypes::{Field, Fields}; +use arrow::datatypes::{Field, Fields, SchemaRef}; +use arrow_array::builder::{ArrayBuilder, StringBuilder}; use arroyo_rpc::formats::JsonFormat; -use serde::de::DeserializeOwned; use serde_json::{json, Value}; use std::collections::HashMap; -pub fn deserialize_slice_json( +pub fn deserialize_slice_json( + schema: &SchemaRef, + buffer: &mut Vec>, format: &JsonFormat, msg: &[u8], -) -> Result { +) -> Result<(), String> { let msg = if format.confluent_schema_registry { &msg[5..] } else { @@ -15,6 +17,14 @@ pub fn deserialize_slice_json( }; if format.unstructured { + let (idx, _) = schema + .column_with_name("value") + .expect("no 'value' column for RawString format"); + let array = buffer[idx] + .as_any_mut() + .downcast_mut::() + .expect("'value' column has incorrect type"); + let j = if format.include_schema { // we need to deserialize it to pull out the payload let v: Value = serde_json::from_slice(&msg) @@ -23,23 +33,18 @@ pub fn deserialize_slice_json( "`include_schema` set to true, but record does not have a payload field".to_string() })?; - json! { - { "value": serde_json::to_string(payload).unwrap() } - } + array.append_value(serde_json::to_string(payload).unwrap()); } else { - json! { - { "value": String::from_utf8_lossy(msg) } - } + array.append_value( + String::from_utf8(msg.to_vec()).map_err(|_| "data is not valid UTF-8")?, + ); }; - - // TODO: this is inefficient, because we know that T is RawJson in this case and can much more directly - // produce that value. However, without specialization I don't know how to get the compiler to emit - // the optimized code for that case. - Ok(serde_json::from_value(j).unwrap()) } else { serde_json::from_slice(msg) - .map_err(|e| format!("Failed to deserialize JSON into schema: {:?}", e)) + .map_err(|e| format!("Failed to deserialize JSON into schema: {:?}", e))?; } + + Ok(()) } #[derive(Debug)] diff --git a/arroyo-formats/src/lib.rs b/arroyo-formats/src/lib.rs index 70c4155d1..36cce4a41 100644 --- a/arroyo-formats/src/lib.rs +++ b/arroyo-formats/src/lib.rs @@ -1,21 +1,22 @@ extern crate core; use anyhow::bail; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field}; use arrow_array::builder::{ArrayBuilder, StringBuilder, TimestampNanosecondBuilder}; use arrow_array::cast::AsArray; use arrow_array::{Array, RecordBatch, StringArray}; -use arroyo_rpc::formats::{AvroFormat, Format, Framing, FramingMethod}; +use arrow_schema::Schema; +use arroyo_rpc::formats::{AvroFormat, BadData, Format, Framing, FramingMethod, JsonFormat}; use arroyo_rpc::schema_resolver::{FailingSchemaResolver, FixedSchemaResolver, SchemaResolver}; -use arroyo_rpc::ArroyoSchema; -use arroyo_types::{to_nanos, Data, Debezium, RawJson, SourceError}; +use arroyo_rpc::{ArroyoSchema, TIMESTAMP_FIELD}; +use arroyo_types::{should_flush, to_nanos, Data, Debezium, RawJson, SourceError}; use serde::de::DeserializeOwned; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; -use std::time::SystemTime; +use std::time::{Instant, SystemTime}; use tokio::sync::Mutex; pub mod avro; @@ -194,25 +195,6 @@ impl SchemaData for () { } } -// A custom deserializer for json, that takes a json::Value and reserializes it as a string -// where it can then be accessed using SQL JSON functions -- this is currently a bit inefficient -// since we need an owned string. -pub fn deserialize_raw_json<'de, D>(f: D) -> Result -where - D: Deserializer<'de>, -{ - let raw: Box = Box::deserialize(f)?; - Ok(raw.to_string()) -} - -pub fn deserialize_raw_json_opt<'de, D>(f: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - let raw: Box = Box::deserialize(f)?; - Ok(Some(raw.to_string())) -} - pub struct FramingIterator<'a> { framing: Option>, buf: &'a [u8], @@ -264,17 +246,25 @@ impl<'a> Iterator for FramingIterator<'a> { } } -#[derive(Clone)] pub struct ArrowDeserializer { format: Arc, framing: Option>, schema: ArroyoSchema, + bad_data: BadData, + json_decoder: Option<(arrow::json::reader::Decoder, TimestampNanosecondBuilder)>, + buffered_count: usize, + buffered_since: Instant, schema_registry: Arc>>, schema_resolver: Arc, } impl ArrowDeserializer { - pub fn new(format: Format, schema: ArroyoSchema, framing: Option) -> Self { + pub fn new( + format: Format, + schema: ArroyoSchema, + framing: Option, + bad_data: BadData, + ) -> Self { let resolver = if let Format::Avro(AvroFormat { reader_schema: Some(schema), .. @@ -286,21 +276,37 @@ impl ArrowDeserializer { Arc::new(FailingSchemaResolver::new()) as Arc }; - Self::with_schema_resolver(format, framing, schema, resolver) + Self::with_schema_resolver(format, framing, schema, bad_data, resolver) } pub fn with_schema_resolver( format: Format, framing: Option, schema: ArroyoSchema, + bad_data: BadData, schema_resolver: Arc, ) -> Self { Self { + json_decoder: matches!(format, Format::Json(..)).then(|| { + // exclude the timestamp field + ( + arrow_json::reader::ReaderBuilder::new(Arc::new( + schema.schema_without_timestamp(), + )) + .with_strict_mode(false) + .build_decoder() + .unwrap(), + TimestampNanosecondBuilder::new(), + ) + }), format: Arc::new(format), framing: framing.map(|f| Arc::new(f)), schema, schema_registry: Arc::new(Mutex::new(HashMap::new())), + bad_data, schema_resolver, + buffered_count: 0, + buffered_since: Instant::now(), } } @@ -335,33 +341,65 @@ impl ArrowDeserializer { } } + pub fn should_flush(&self) -> bool { + should_flush(self.buffered_count, self.buffered_since) + } + + pub fn flush_buffer(&mut self) -> Option> { + let (decoder, timestamp) = self.json_decoder.as_mut()?; + self.buffered_since = Instant::now(); + self.buffered_count = 0; + Some( + decoder + .flush() + .map_err(|e| SourceError::bad_data(format!("JSON does not match schema: {:?}", e))) + .transpose()? + .map(|batch| { + let mut columns = batch.columns().to_vec(); + columns.insert(self.schema.timestamp_index, Arc::new(timestamp.finish())); + RecordBatch::try_new(self.schema.schema.clone(), columns).unwrap() + }), + ) + } + fn deserialize_single( &mut self, buffer: &mut Vec>, msg: &[u8], timestamp: SystemTime, ) -> Result<(), SourceError> { - let result = match &*self.format { + match &*self.format { + Format::RawString(_) + | Format::Json(JsonFormat { + unstructured: true, .. + }) => self.deserialize_raw_string(buffer, msg), Format::Json(json) => { - todo!("json") - //json::deserialize_slice_json(json, msg) + let msg = if json.confluent_schema_registry { + &msg[5..] + } else { + msg + }; + + let Some((decoder, timestamp_builder)) = &mut self.json_decoder else { + panic!("json decoder not initialized"); + }; + + decoder + .decode(msg) + .map_err(|e| SourceError::bad_data(format!("invalid JSON: {:?}", e)))?; + timestamp_builder.append_value(to_nanos(timestamp) as i64); + self.buffered_count += 1; } - Format::Avro(_) => unreachable!("avro should be handled by here"), + Format::Avro(_) => unreachable!("this should not be called for avro"), Format::Parquet(_) => todo!("parquet is not supported as an input format"), - Format::RawString(_) => self.deserialize_raw_string(buffer, msg), } - .map_err(|e: String| SourceError::bad_data(format!("Failed to deserialize: {:?}", e)))?; self.add_timestamp(buffer, timestamp); Ok(()) } - fn deserialize_raw_string( - &mut self, - buffer: &mut Vec>, - msg: &[u8], - ) -> Result<(), String> { + fn deserialize_raw_string(&mut self, buffer: &mut Vec>, msg: &[u8]) { let (col, _) = self .schema .schema @@ -372,8 +410,6 @@ impl ArrowDeserializer { .downcast_mut::() .expect("'value' column has incorrect type") .append_value(String::from_utf8_lossy(msg)); - - Ok(()) } fn add_timestamp(&mut self, buffer: &mut Vec>, timestamp: SystemTime) { @@ -383,6 +419,10 @@ impl ArrowDeserializer { .expect("_timestamp column has incorrect type") .append_value(to_nanos(timestamp) as i64); } + + pub fn bad_data(&self) -> &BadData { + &self.bad_data + } } pub struct DataSerializer { diff --git a/arroyo-formats/src/old.rs b/arroyo-formats/src/old.rs index 322bc797f..c046f5ebd 100644 --- a/arroyo-formats/src/old.rs +++ b/arroyo-formats/src/old.rs @@ -92,7 +92,7 @@ impl DataDeserializer { pub fn deserialize_single(&self, msg: &[u8]) -> Result { match &*self.format { - Format::Json(json) => json::deserialize_slice_json(json, msg), + Format::Json(json) => unimplemented!(), Format::Avro(_) => unreachable!("avro should be handled by here"), Format::Parquet(_) => todo!("parquet is not supported as an input format"), Format::RawString(_) => deserialize_raw_string(msg), diff --git a/arroyo-rpc/src/api_types/connections.rs b/arroyo-rpc/src/api_types/connections.rs index c589613e3..3f89d16f8 100644 --- a/arroyo-rpc/src/api_types/connections.rs +++ b/arroyo-rpc/src/api_types/connections.rs @@ -1,5 +1,7 @@ use crate::formats::{BadData, Format, Framing}; +use crate::primitive_to_sql; use anyhow::bail; +use arrow_schema::{DataType, Field, TimeUnit}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; @@ -68,7 +70,7 @@ impl TryFrom for ConnectionType { } } -#[derive(Serialize, Deserialize, Clone, Debug, ToSchema, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug, ToSchema, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum PrimitiveType { Int32, @@ -116,6 +118,63 @@ pub struct SourceField { pub nullable: bool, } +impl TryFrom for SourceField { + type Error = String; + + fn try_from(f: Field) -> Result { + let field_type = match f.data_type() { + DataType::Boolean => FieldType::Primitive(PrimitiveType::Bool), + DataType::Int32 => FieldType::Primitive(PrimitiveType::Int32), + DataType::Int64 => FieldType::Primitive(PrimitiveType::Int64), + DataType::UInt32 => FieldType::Primitive(PrimitiveType::UInt32), + DataType::UInt64 => FieldType::Primitive(PrimitiveType::UInt64), + DataType::Float32 => FieldType::Primitive(PrimitiveType::F32), + DataType::Float64 => FieldType::Primitive(PrimitiveType::F64), + DataType::Binary | DataType::LargeBinary => FieldType::Primitive(PrimitiveType::Bytes), + DataType::Timestamp(TimeUnit::Millisecond, _) => { + FieldType::Primitive(PrimitiveType::UnixMillis) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + FieldType::Primitive(PrimitiveType::UnixMicros) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + FieldType::Primitive(PrimitiveType::UnixNanos) + } + DataType::Utf8 => FieldType::Primitive(PrimitiveType::String), + DataType::Struct(fields) => { + let fields: Result<_, String> = fields + .into_iter() + .map(|f| (**f).clone().try_into()) + .collect(); + + let st = StructType { + name: None, + fields: fields?, + }; + + FieldType::Struct(st) + } + dt => { + return Err(format!("Unsupported data type {:?}", dt)); + } + }; + + let sql_name = match &field_type { + FieldType::Primitive(pt) => Some(primitive_to_sql(*pt).to_string()), + _ => None, + }; + + Ok(SourceField { + field_name: f.name().clone(), + field_type: SourceFieldType { + r#type: field_type, + sql_name, + }, + nullable: f.is_nullable(), + }) + } +} + #[derive(Serialize, Deserialize, Clone, Debug, ToSchema)] #[serde(rename_all = "snake_case")] pub enum SchemaDefinition { diff --git a/arroyo-rpc/src/formats.rs b/arroyo-rpc/src/formats.rs index 6e88c73c2..b6e2cc0a6 100644 --- a/arroyo-rpc/src/formats.rs +++ b/arroyo-rpc/src/formats.rs @@ -267,6 +267,12 @@ pub enum BadData { Drop {}, } +impl Default for BadData { + fn default() -> Self { + BadData::Fail {} + } +} + impl BadData { pub fn from_opts(opts: &mut HashMap) -> Result, String> { let Some(method) = opts.remove("bad_data") else { diff --git a/arroyo-rpc/src/lib.rs b/arroyo-rpc/src/lib.rs index d52569d6d..2787104a8 100644 --- a/arroyo-rpc/src/lib.rs +++ b/arroyo-rpc/src/lib.rs @@ -224,4 +224,10 @@ impl ArroyoSchema { key_indices, }) } + + pub fn schema_without_timestamp(&self) -> Schema { + let mut schema = (*self.schema).clone(); + schema.remove(self.timestamp_index); + schema + } } diff --git a/arroyo-types/src/lib.rs b/arroyo-types/src/lib.rs index 3c33a0b38..8d86db937 100644 --- a/arroyo-types/src/lib.rs +++ b/arroyo-types/src/lib.rs @@ -13,8 +13,8 @@ use std::fmt::Debug; use std::hash::Hash; use std::ops::{Range, RangeInclusive}; use std::str::FromStr; -use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::sync::{Arc, OnceLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[derive(Copy, Hash, Debug, Clone, Eq, PartialEq, Encode, Decode, PartialOrd, Ord, Deserialize)] pub struct Window { @@ -1096,6 +1096,37 @@ impl TryFrom<&str> for DatePart { } } +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum ArroyoExtensionType { + JSON, +} + +impl ArroyoExtensionType { + pub fn from_map(map: &HashMap) -> Option { + match map.get("ARROW:extension:name")?.as_str() { + "arroyo.json" => Some(Self::JSON), + _ => None, + } + } + + pub fn add_metadata(v: Option, field: Field) -> Field { + if let Some(v) = v { + let mut m = HashMap::new(); + match v { + ArroyoExtensionType::JSON => { + m.insert( + "ARROW:extension:name".to_string(), + "arroyo.json".to_string(), + ); + } + } + field.with_metadata(m) + } else { + field + } + } +} + impl TryFrom<&str> for DateTruncPrecision { type Error = String; @@ -1132,6 +1163,18 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive { start..=end } +pub fn should_flush(size: usize, time: Instant) -> bool { + static FLUSH_SIZE: OnceLock = OnceLock::new(); + let flush_size = + FLUSH_SIZE.get_or_init(|| u32_config(BATCH_SIZE_ENV, DEFAULT_BATCH_SIZE as u32) as usize); + + static FLUSH_LINGER: OnceLock = OnceLock::new(); + let flush_linger = + FLUSH_LINGER.get_or_init(|| duration_millis_config(BATCH_LINGER_MS_ENV, DEFAULT_LINGER)); + + size > 0 && (size > *flush_size || time.elapsed() >= *flush_linger) +} + #[cfg(test)] mod tests { use super::*; diff --git a/arroyo-worker/Cargo.toml b/arroyo-worker/Cargo.toml index fd1328979..c4c8570ff 100644 --- a/arroyo-worker/Cargo.toml +++ b/arroyo-worker/Cargo.toml @@ -46,10 +46,13 @@ hex = "0.4" url = "2.4.0" ordered-float = "3" deltalake = {version = "=0.16.4", features = ["s3", "arrow"] } + arrow = { workspace = true } arrow-schema = {workspace = true, features = ["serde"]} parquet = { workspace = true, features = ["async"]} arrow-array = { workspace = true} +arrow-json = { workspace = true } + aws-sdk-kinesis = { version = "0.21", default-features = false, features = ["rt-tokio", "native-tls"] } aws-config = { version = "0.51", default-features = false, features = ["rt-tokio", "native-tls"] } uuid = {version = "1.4.1", features = ["v4"]} @@ -87,7 +90,6 @@ datafusion-expr = "34.0.0" datafusion-physical-expr = "34.0" datafusion-common = "34.0" datafusion-execution = "34.0" -arrow-json = "49.0.0" base64 = "0.21.5" [dev-dependencies] diff --git a/arroyo-worker/src/connectors/kafka/source/mod.rs b/arroyo-worker/src/connectors/kafka/source/mod.rs index f271d5df2..fe430e63e 100644 --- a/arroyo-worker/src/connectors/kafka/source/mod.rs +++ b/arroyo-worker/src/connectors/kafka/source/mod.rs @@ -1,7 +1,6 @@ use crate::engine::ArrowContext; use crate::operator::{ArrowOperatorConstructor, BaseOperator}; use crate::SourceFinishType; -use arroyo_formats::ArrowDeserializer; use arroyo_rpc::formats::{BadData, Format, Framing}; use arroyo_rpc::grpc::api::ConnectorOp; use arroyo_rpc::grpc::{api, TableDescriptor}; @@ -171,13 +170,10 @@ impl KafkaSourceFunc { .await; } - let mut deserializer = ArrowDeserializer::with_schema_resolver( + ctx.initialize_deserializer_with_resolver( self.format.clone(), self.framing.clone(), - ctx.out_schema - .as_ref() - .expect("kafka source must have an out schema") - .clone(), + self.bad_data.clone(), self.schema_resolver.clone(), ); @@ -194,11 +190,10 @@ impl KafkaSourceFunc { .ok_or_else(|| UserError::new("Failed to read timestamp from Kafka record", "The message read from Kafka did not contain a message timestamp"))?; - let errors = deserializer.deserialize_slice(ctx.buffer(), &v, from_millis(timestamp as u64)).await; - ctx.collect_source_errors(errors, &self.bad_data).await?; + ctx.deserialize_slice(&v, from_millis(timestamp as u64)).await?; if ctx.should_flush() { - ctx.flush_buffer().await; + ctx.flush_buffer().await?; } offsets.insert(msg.partition(), msg.offset()); @@ -212,7 +207,7 @@ impl KafkaSourceFunc { } _ = flush_ticker.tick() => { if ctx.should_flush() { - ctx.flush_buffer().await; + ctx.flush_buffer().await?; } } control_message = ctx.control_rx.recv() => { diff --git a/arroyo-worker/src/connectors/sse.rs b/arroyo-worker/src/connectors/sse.rs index 21be663b6..26e0d49ba 100644 --- a/arroyo-worker/src/connectors/sse.rs +++ b/arroyo-worker/src/connectors/sse.rs @@ -1,7 +1,6 @@ use crate::engine::ArrowContext; use crate::operator::{ArrowOperatorConstructor, BaseOperator}; use crate::SourceFinishType; -use arroyo_formats::ArrowDeserializer; use arroyo_rpc::formats::{BadData, Format, Framing}; use arroyo_rpc::grpc::{api, StopMode, TableDescriptor}; use arroyo_rpc::{var_str::VarStr, ControlMessage, ControlResp, OperatorConfig}; @@ -144,6 +143,12 @@ impl SSESourceFunc { } async fn run_int(&mut self, ctx: &mut ArrowContext) -> Result { + ctx.initialize_deserializer( + self.format.clone(), + self.framing.clone(), + self.bad_data.clone(), + ); + let mut client = eventsource_client::ClientBuilder::for_url(&self.url).unwrap(); if let Some(id) = &self.state.last_id { @@ -157,15 +162,6 @@ impl SSESourceFunc { let mut stream = client.build().stream(); let events: HashSet<_> = self.events.iter().cloned().collect(); - let mut deserializer = ArrowDeserializer::new( - self.format.clone(), - ctx.out_schema - .as_ref() - .expect("source must have an out schema") - .clone(), - self.framing.clone(), - ); - let mut flush_ticker = tokio::time::interval(Duration::from_millis(50)); flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -183,12 +179,11 @@ impl SSESourceFunc { } if events.is_empty() || events.contains(&event.event_type) { - let errors = deserializer.deserialize_slice(ctx.buffer(), - &event.data.as_bytes(), SystemTime::now()).await; - ctx.collect_source_errors(errors, &self.bad_data).await?; + ctx.deserialize_slice( + &event.data.as_bytes(), SystemTime::now()).await?; if ctx.should_flush() { - ctx.flush_buffer().await; + ctx.flush_buffer().await?; } } } @@ -220,7 +215,7 @@ impl SSESourceFunc { } _ = flush_ticker.tick() => { if ctx.should_flush() { - ctx.flush_buffer().await; + ctx.flush_buffer().await?; } } } diff --git a/arroyo-worker/src/engine.rs b/arroyo-worker/src/engine.rs index 701a29749..f20c23da0 100644 --- a/arroyo-worker/src/engine.rs +++ b/arroyo-worker/src/engine.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::{Debug, Formatter}; +use std::io::Write; use std::{mem, thread}; use std::sync::{Arc, OnceLock}; @@ -29,16 +30,18 @@ use crate::{RateLimiter, METRICS_PUSH_INTERVAL, PROMETHEUS_PUSH_GATEWAY, TIMER_T use arroyo_datastream::logical::{ LogicalEdge, LogicalEdgeType, LogicalGraph, LogicalNode, OperatorName, }; +use arroyo_formats::ArrowDeserializer; pub use arroyo_macro::StreamNode; -use arroyo_rpc::formats::BadData; +use arroyo_rpc::formats::{BadData, Format, Framing}; use arroyo_rpc::grpc::{ api, CheckpointMetadata, TableDeleteBehavior, TableDescriptor, TableType, TableWriteBehavior, TaskAssignment, TaskCheckpointEventType, }; +use arroyo_rpc::schema_resolver::SchemaResolver; use arroyo_rpc::{CompactionResult, ControlMessage, ControlResp}; use arroyo_state::{BackingStore, StateBackend, StateStore}; use arroyo_types::{ - duration_millis_config, from_micros, range_for_server, u32_config, ArrowMessage, + duration_millis_config, from_micros, range_for_server, should_flush, u32_config, ArrowMessage, CheckpointBarrier, Data, Key, SourceError, TaskInfo, UserError, Watermark, WorkerId, BATCH_LINGER_MS_ENV, BATCH_SIZE_ENV, DEFAULT_BATCH_SIZE, DEFAULT_LINGER, HASH_SEEDS, }; @@ -138,15 +141,7 @@ impl ContextBuffer { } pub fn should_flush(&self) -> bool { - static FLUSH_SIZE: OnceLock = OnceLock::new(); - let flush_size = FLUSH_SIZE - .get_or_init(|| u32_config(BATCH_SIZE_ENV, DEFAULT_BATCH_SIZE as u32) as usize); - - static FLUSH_LINGER: OnceLock = OnceLock::new(); - let flush_linger = FLUSH_LINGER - .get_or_init(|| duration_millis_config(BATCH_LINGER_MS_ENV, DEFAULT_LINGER)); - - self.size() > 0 && (self.size() > *flush_size || self.created.elapsed() >= *flush_linger) + should_flush(self.size(), self.created) } pub fn finish(self) -> RecordBatch { @@ -169,7 +164,9 @@ pub struct ArrowContext { pub out_schema: Option, pub collector: ArrowCollector, buffer: Option, + buffered_error: Option, error_rate_limiter: RateLimiter, + deserializer: Option, } #[derive(Clone)] @@ -397,6 +394,8 @@ impl ArrowContext { state, buffer: out_schema.map(|t| ContextBuffer::new(t.schema)), error_rate_limiter: RateLimiter::new(), + deserializer: None, + buffered_error: None, } } @@ -460,20 +459,37 @@ impl ArrowContext { todo!("timer") } - pub async fn flush_buffer(&mut self) { - let Some(buffer) = self.buffer.take() else { - return; - }; + pub async fn flush_buffer(&mut self) -> Result<(), UserError> { + if self.buffer.is_none() { + return Ok(()); + } - if buffer.size() == 0 { - self.buffer = Some(buffer); - return; + if self.buffer.as_ref().unwrap().size() > 0 { + let buffer = self.buffer.take().unwrap(); + self.collector.collect(buffer.finish()).await; + self.buffer = Some(ContextBuffer::new( + self.out_schema.as_ref().map(|t| t.schema.clone()).unwrap(), + )); } - self.collector.collect(buffer.finish()).await; - self.buffer = Some(ContextBuffer::new( - self.out_schema.as_ref().map(|t| t.schema.clone()).unwrap(), - )); + if let Some(deserializer) = self.deserializer.as_mut() { + if let Some(buffer) = deserializer.flush_buffer() { + match buffer { + Ok(batch) => { + self.collector.collect(batch).await; + } + Err(e) => { + self.collect_source_errors(vec![e]).await?; + } + } + } + } + + if let Some(error) = self.buffered_error.take() { + return Err(error); + } + + Ok(()) } pub async fn collect(&mut self, record: RecordBatch) { @@ -485,6 +501,11 @@ impl ArrowContext { .as_ref() .map(|b| b.should_flush()) .unwrap_or(false) + || self + .deserializer + .as_ref() + .map(|d| d.should_flush()) + .unwrap_or(false) } pub fn buffer(&mut self) -> &mut Vec> { @@ -495,7 +516,9 @@ impl ArrowContext { } pub async fn broadcast(&mut self, message: ArrowMessage) { - self.flush_buffer().await; + if let Err(e) = self.flush_buffer().await { + self.buffered_error.replace(e); + } self.collector.broadcast(message).await; } @@ -527,7 +550,7 @@ impl ArrowContext { checkpoint_epoch: barrier.epoch, operator_id: self.task_info.operator_id.clone(), subtask_index: self.task_info.task_index as u32, - time: std::time::SystemTime::now(), + time: SystemTime::now(), event_type, })) .await @@ -538,17 +561,73 @@ impl ArrowContext { self.state.load_compacted(compaction).await; } - /// Handling errors and rate limiting error reporting. - /// Considers the `bad_data` option to determine whether to drop or fail on bad data. - pub async fn collect_source_errors( + pub fn initialize_deserializer( &mut self, - errors: Vec, - bad_data: &Option, + format: Format, + framing: Option, + bad_data: Option, + ) { + if self.deserializer.is_some() { + panic!("Deserialize already initialized"); + } + + self.deserializer = Some(ArrowDeserializer::new( + format, + self.out_schema.as_ref().expect("no out schema").clone(), + framing, + bad_data.unwrap_or_default(), + )); + } + + pub fn initialize_deserializer_with_resolver( + &mut self, + format: Format, + framing: Option, + bad_data: Option, + schema_resolver: Arc, + ) { + self.deserializer = Some(ArrowDeserializer::with_schema_resolver( + format, + framing, + self.out_schema.as_ref().expect("no out schema").clone(), + bad_data.unwrap_or_default(), + schema_resolver, + )); + } + + pub async fn deserialize_slice( + &mut self, + msg: &[u8], + time: SystemTime, ) -> Result<(), UserError> { + let deserializer = self + .deserializer + .as_mut() + .expect("deserializer not initialized!"); + let errors = deserializer + .deserialize_slice( + &mut self.buffer.as_mut().expect("no out schema").buffer, + msg, + time, + ) + .await; + self.collect_source_errors(errors).await?; + + Ok(()) + } + + /// Handling errors and rate limiting error reporting. + /// Considers the `bad_data` option to determine whether to drop or fail on bad data. + async fn collect_source_errors(&mut self, errors: Vec) -> Result<(), UserError> { + let bad_data = self + .deserializer + .as_ref() + .expect("deserializer not initialized") + .bad_data(); for error in errors { match error { SourceError::BadData { details } => match bad_data { - Some(BadData::Drop {}) => { + BadData::Drop {} => { self.error_rate_limiter .rate_limit(|| async { warn!("Dropping invalid data: {}", details.clone()); @@ -567,7 +646,7 @@ impl ArrowContext { .for_task(&self.task_info) .inc(); } - Some(BadData::Fail {}) | None => { + BadData::Fail {} => { return Err(UserError::new("Deserialization error", details)); } },