From 9cc749deb200c1e5f2428f0563dd62bcdeafd317 Mon Sep 17 00:00:00 2001 From: imDema Date: Mon, 27 May 2024 15:20:07 +0200 Subject: [PATCH] Minor fixes, formatting, dependency bump --- Cargo.lock | 268 +++++++++++++++++++----------------- Cargo.toml | 14 +- examples/avro_rw.rs | 20 ++- src/operator/boxed.rs | 10 +- src/operator/mod.rs | 4 +- src/operator/sink/avro.rs | 15 +- src/operator/sink/mod.rs | 2 +- src/operator/source/avro.rs | 22 +-- src/operator/source/mod.rs | 4 +- tools/plot-profile.ipynb | 115 ++++++++++++++++ 10 files changed, 304 insertions(+), 170 deletions(-) create mode 100644 tools/plot-profile.ipynb diff --git a/Cargo.lock b/Cargo.lock index 6002209..fdcf06f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,47 +59,48 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.13" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", + "is_terminal_polyfill", "utf8parse", ] [[package]] name = "anstyle" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" +checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" [[package]] name = "anstyle-parse" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" dependencies = [ "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" dependencies = [ "anstyle", "windows-sys 0.52.0", @@ -139,14 +140,14 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] name = "autocfg" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "backtrace" @@ -165,9 +166,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.22.0" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bincode" @@ -225,9 +226,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.94" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" +checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f" [[package]] name = "cfg-if" @@ -293,7 +294,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] @@ -315,9 +316,9 @@ dependencies = [ [[package]] name = "colorchoice" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" [[package]] name = "core2" @@ -403,9 +404,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.19" +version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" [[package]] name = "crunchy" @@ -446,9 +447,9 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.8" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391" +checksum = "83b2eb4d90d12bdda5ed17de686c2acb4c57914f8f921b8da7e112b5a36f3fe1" dependencies = [ "darling_core", "darling_macro", @@ -456,26 +457,26 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.8" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f" +checksum = "622687fe0bac72a04e5599029151f5796111b90f1baaa9b544d807a5e31cd120" dependencies = [ "fnv", "ident_case", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] name = "darling_macro" -version = "0.20.8" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" +checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" dependencies = [ "darling_core", "quote", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] @@ -494,7 +495,7 @@ dependencies = [ "hashbrown", "lock_api", "once_cell", - "parking_lot_core 0.9.9", + "parking_lot_core 0.9.10", ] [[package]] @@ -510,9 +511,9 @@ dependencies = [ [[package]] name = "deunicode" -version = "1.4.4" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322ef0094744e63628e6f0eb2295517f79276a5b342a4c2ff3042566ca181d4e" +checksum = "339544cc9e2c4dc3fc7149fd630c5f22263a4fdf18a98afd0075784968b5cf00" [[package]] name = "digest" @@ -532,9 +533,9 @@ checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" [[package]] name = "either" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" +checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" [[package]] name = "env_filter" @@ -567,9 +568,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" dependencies = [ "libc", "windows-sys 0.52.0", @@ -587,9 +588,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "flume" @@ -665,7 +666,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] @@ -719,9 +720,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", "js-sys", @@ -754,9 +755,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.3" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", "allocator-api2", @@ -804,9 +805,9 @@ dependencies = [ [[package]] name = "instant" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" dependencies = [ "cfg-if", ] @@ -822,6 +823,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" + [[package]] name = "itertools" version = "0.10.5" @@ -879,9 +886,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.153" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libflate" @@ -909,9 +916,9 @@ dependencies = [ [[package]] name = "libmimalloc-sys" -version = "0.1.35" +version = "0.1.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3979b5c37ece694f1f5e51e7ecc871fdb0f517ed04ee45f88d15d6d553cb9664" +checksum = "0e7bb23d733dfcc8af652a78b7bf232f0e967710d044732185e561e47c0336b6" dependencies = [ "cc", "libc", @@ -933,9 +940,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.16" +version = "1.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e143b5e666b2695d28f6bca6497720813f699c9602dd7f5cac91008b8ada7f9" +checksum = "c15da26e5af7e25c90b37a2d75cdbf940cf4a55316de9d84c679c9b8bfabf82e" dependencies = [ "cc", "libc", @@ -945,15 +952,15 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "lock_api" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ "autocfg", "scopeguard", @@ -978,24 +985,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b1293f16528a05b60d2f0114522ed24d6dc16207570bc0c9476bda164b9675" dependencies = [ "once_cell", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "thread_local", ] [[package]] name = "mimalloc" -version = "0.1.39" +version = "0.1.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa01922b5ea280a911e323e4d2fd24b7fe5cc4042e0d2cda3c40775cdc4bdc9c" +checksum = "e9186d86b79b52f4a77af65604b51225e8db1d6ee7e3f41aec1e40829c71a176" dependencies = [ "libmimalloc-sys", ] [[package]] name = "miniz_oxide" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +checksum = "87dfd01fe195c66b572b37921ad8803d010623c0aca821bea2302239d155cdae" dependencies = [ "adler", ] @@ -1062,9 +1069,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", ] @@ -1131,12 +1138,12 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", - "parking_lot_core 0.9.9", + "parking_lot_core 0.9.10", ] [[package]] @@ -1155,15 +1162,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.4.1", + "redox_syscall 0.5.1", "smallvec", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -1186,9 +1193,9 @@ checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" [[package]] name = "plotters" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2c224ba00d7cadd4d5c660deaf2098e5e80e07846537c51f9cfa4be50c1fd45" +checksum = "a15b6eccb8484002195a3e44fe65a4ce8e93a625797a063735536fd59cb01cf3" dependencies = [ "num-traits", "plotters-backend", @@ -1199,15 +1206,15 @@ dependencies = [ [[package]] name = "plotters-backend" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e76628b4d3a7581389a35d5b6e2139607ad7c75b17aed325f210aa91f4a9609" +checksum = "414cec62c6634ae900ea1c56128dfe87cf63e7caece0852ec76aba307cebadb7" [[package]] name = "plotters-svg" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38f6d39893cca0701371e3c27294f09797214b86f1fb951b89ade8ec04e2abab" +checksum = "81b30686a7d9c3e010b84284bdd26a29f2138574f52f5eb6f794fc0ad924e705" dependencies = [ "plotters-backend", ] @@ -1220,9 +1227,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.80" +version = "1.0.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56dea16b0a29e94408b9aa5e2940a4eedbd128a1ba20e8f7ae60fd3d465af0e" +checksum = "ec96c6a92621310b51366f1e28d05ef11489516e93be030060e5fc12024a49d6" dependencies = [ "unicode-ident", ] @@ -1242,7 +1249,7 @@ dependencies = [ "ahash", "equivalent", "hashbrown", - "parking_lot 0.12.1", + "parking_lot 0.12.3", ] [[package]] @@ -1322,6 +1329,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "regex" version = "1.10.4" @@ -1387,7 +1403,7 @@ dependencies = [ "nanorand", "nexmark", "once_cell", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "quick_cache", "rand", "regex", @@ -1415,15 +1431,15 @@ checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" [[package]] name = "rustc-demangle" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustix" -version = "0.38.32" +version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ "bitflags 2.5.0", "errno", @@ -1440,9 +1456,9 @@ checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" [[package]] name = "ryu" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "same-file" @@ -1461,29 +1477,29 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.197" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" +checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.197" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" +checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] name = "serde_json" -version = "1.0.116" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ "itoa", "ryu", @@ -1492,9 +1508,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" +checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0" dependencies = [ "serde", ] @@ -1542,9 +1558,9 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", "windows-sys 0.52.0", @@ -1599,7 +1615,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] @@ -1615,9 +1631,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.59" +version = "2.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" +checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" dependencies = [ "proc-macro2", "quote", @@ -1638,22 +1654,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.58" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" +checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.58" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" +checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] @@ -1701,14 +1717,14 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] name = "toml" -version = "0.8.12" +version = "0.8.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" +checksum = "a4e43f8cc456c9704c851ae29c67e17ef65d2c30017c17a9765b89c382dc8bba" dependencies = [ "serde", "serde_spanned", @@ -1718,18 +1734,18 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" dependencies = [ "serde", ] [[package]] name = "toml_edit" -version = "0.22.9" +version = "0.22.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e40bb779c5187258fd7aad0eb68cb8706a0a81fa712fbea808ab43c4b8374c4" +checksum = "c127785850e8c20836d49732ae6abfa47616e60bf9d9f57c43c250361a9db96c" dependencies = [ "indexmap", "serde", @@ -1758,7 +1774,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] @@ -1813,7 +1829,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", ] [[package]] @@ -1919,7 +1935,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", "wasm-bindgen-shared", ] @@ -1941,7 +1957,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1991,11 +2007,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" dependencies = [ - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -2145,9 +2161,9 @@ checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winnow" -version = "0.6.6" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352" +checksum = "c3c52e9c97a68071b23e836c9380edae937f17b9c4667bd021973efc689f618d" dependencies = [ "memchr", ] @@ -2163,20 +2179,20 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.32" +version = "0.7.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.32" +version = "0.7.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.66", ] diff --git a/Cargo.toml b/Cargo.toml index 7deb8ed..f3bfc2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,12 +34,12 @@ nanorand = "0.7.0" derivative = "2.2.0" # serialization -serde = { version = "1.0.197", features = ["derive"] } -serde_json = "1.0.116" +serde = { version = "1.0.203", features = ["derive"] } +serde_json = "1.0.117" bincode = "1.3.3" -toml = "0.8.12" +toml = "0.8.13" -thiserror = "1.0.58" +thiserror = "1.0.61" # handy iterators functions @@ -52,7 +52,7 @@ whoami = { version = "1.5.1", optional = true } shell-escape = { version = "0.1.5", optional = true } clap = { version = "4.5.4", features = ["derive"], optional = true } sha2 = { version = "0.10.8", optional = true } -base64 = { version = "0.22.0", optional = true } +base64 = { version = "0.22.1", optional = true } # channel implementation flume = "0.11.0" @@ -69,7 +69,7 @@ coarsetime = "0.1.34" tokio = { version = "1.37.0", features = ["rt"], default-features = false, optional = true } futures = { version = "0.3.30", optional = true } -parking_lot = "0.12.1" +parking_lot = "0.12.3" wyhash = "0.5.0" fxhash = "0.2.1" @@ -88,7 +88,7 @@ rand = { version = "0.8.5", features = ["small_rng"] } tempfile = "3.10.1" criterion = { version = "0.5.1", features = ["html_reports"] } fake = "2.9.2" -mimalloc = { version = "0.1.39", default-features = false } +mimalloc = { version = "0.1.42", default-features = false } tracing-subscriber = "0.3.18" itertools = "0.12.1" diff --git a/examples/avro_rw.rs b/examples/avro_rw.rs index 6add0db..7e3f33a 100644 --- a/examples/avro_rw.rs +++ b/examples/avro_rw.rs @@ -7,10 +7,10 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Parser)] struct Options { - #[clap(short,long)] + #[clap(short, long)] input: Option, - #[clap(short,long)] + #[clap(short, long)] output: PathBuf, } @@ -30,12 +30,20 @@ fn main() { let source = if let Some(input) = opts.input { ctx.stream_avro(input).into_boxed() } else { - ctx.stream_iter((0..100).map(|i| InputType{ s: format!("{i:o}"), num: i })).into_boxed() + ctx.stream_iter((0..100).map(|i| InputType { + s: format!("{i:o}"), + num: i, + })) + .into_boxed() }; - source.inspect(|e| eprintln!("{e:?}")) - .map(|mut e| { e.num *= 2; e }) + source + .inspect(|e| eprintln!("{e:?}")) + .map(|mut e| { + e.num *= 2; + e + }) .write_avro(opts.output); ctx.execute_blocking(); -} \ No newline at end of file +} diff --git a/src/operator/boxed.rs b/src/operator/boxed.rs index d7e46e7..5a0493e 100644 --- a/src/operator/boxed.rs +++ b/src/operator/boxed.rs @@ -48,7 +48,7 @@ where } pub struct BoxedOperator { - pub(crate) op: Box + 'static + Send>, + pub(crate) op: Box + 'static + Send>, } impl Clone for BoxedOperator { @@ -66,10 +66,8 @@ impl Display for BoxedOperator { } impl BoxedOperator { - pub fn new + 'static>(op: Op) -> Self { - Self { - op: Box::new(op), - } + pub fn new + 'static>(op: Op) -> Self { + Self { op: Box::new(op) } } } @@ -95,7 +93,7 @@ where Op::Out: Clone + Send + 'static, { /// Erase operator type using dynamic dispatching. - /// + /// /// Use only when strictly necessary as it is decrimental for performance. pub fn into_boxed(self) -> Stream> { self.add_operator(|prev| BoxedOperator::new(prev)) diff --git a/src/operator/mod.rs b/src/operator/mod.rs index 50296d0..52c16dd 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -6,7 +6,6 @@ use std::fmt::Display; use std::hash::Hash; use std::ops::{AddAssign, Div}; -use std::path::PathBuf; use flume::{unbounded, Receiver}; #[cfg(feature = "tokio")] @@ -26,7 +25,6 @@ use crate::{BatchMode, KeyedStream, Stream}; #[cfg(feature = "tokio")] use self::map_async::MapAsync; use self::map_memo::MapMemo; -use self::sink::avro::AvroSink; use self::sink::collect::Collect; use self::sink::collect_channel::CollectChannelSink; use self::sink::collect_count::CollectCountSink; @@ -60,6 +58,7 @@ use self::{ #[cfg(feature = "timestamp")] mod add_timestamps; mod batch_mode; +mod boxed; pub(crate) mod end; mod filter; mod filter_map; @@ -88,7 +87,6 @@ pub mod source; mod start; pub mod window; mod zip; -mod boxed; /// Marker trait that all the types inside a stream should implement. pub trait Data: Clone + Send + 'static {} diff --git a/src/operator/sink/avro.rs b/src/operator/sink/avro.rs index 91350d3..489cabe 100644 --- a/src/operator/sink/avro.rs +++ b/src/operator/sink/avro.rs @@ -3,12 +3,10 @@ use serde::Serialize; use std::fmt::Display; use std::fs::File; use std::io::BufWriter; -use std::marker::PhantomData; use std::path::PathBuf; use crate::block::{BlockStructure, OperatorKind, OperatorStructure}; -use crate::operator::sink::StreamOutputRef; -use crate::operator::{ExchangeData, Operator, StreamElement}; +use crate::operator::{Operator, StreamElement}; use crate::scheduler::ExecutionMetadata; use crate::Stream; @@ -67,6 +65,7 @@ where .read(true) .write(true) .create(true) + .truncate(true) .open(&self.path) .unwrap_or_else(|err| { panic!( @@ -75,7 +74,6 @@ where ) }); - let buf_writer = BufWriter::new(file); self.writer = Some(buf_writer); } @@ -113,11 +111,11 @@ where } } -impl Stream where +impl Stream +where Op: 'static, - Op::Out: AvroSchema + Serialize + Op::Out: AvroSchema + Serialize, { - /// Apply the given function to all the elements of the stream, consuming the stream. /// /// ## Example @@ -131,8 +129,7 @@ impl Stream where /// /// env.execute_blocking(); /// ``` - pub fn write_avro>(self, path: P) - { + pub fn write_avro>(self, path: P) { self.add_operator(|prev| AvroSink::new(prev, path)) .finalize_block(); } diff --git a/src/operator/sink/mod.rs b/src/operator/sink/mod.rs index 2db1c9d..59b3fcf 100644 --- a/src/operator/sink/mod.rs +++ b/src/operator/sink/mod.rs @@ -5,12 +5,12 @@ use std::sync::{Arc, Mutex}; +pub(super) mod avro; pub(super) mod collect; pub(super) mod collect_channel; pub(super) mod collect_count; pub(super) mod collect_vec; pub(super) mod for_each; -pub(super) mod avro; pub(crate) type StreamOutputRef = Arc>>; diff --git a/src/operator/source/avro.rs b/src/operator/source/avro.rs index 3b87dd6..63eb48d 100644 --- a/src/operator/source/avro.rs +++ b/src/operator/source/avro.rs @@ -1,7 +1,6 @@ use std::fmt::Display; use std::fs::File; -use std::io; -use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; +use std::io::BufReader; use std::marker::PhantomData; use std::path::PathBuf; @@ -85,7 +84,7 @@ impl Deserialize<'a>> Source for AvroSource { impl Deserialize<'a>> Operator for AvroSource { type Out = Out; - fn setup(&mut self, metadata: &mut ExecutionMetadata) { + fn setup(&mut self, _metadata: &mut ExecutionMetadata) { // let global_id = metadata.global_id; // let instances = metadata.replicas.len(); @@ -118,7 +117,10 @@ impl Deserialize<'a>> Operator for AvroSource { match reader.next() { Some(Ok(el)) => { tracing::trace!("avro Value: {el:?}"); - StreamElement::Item(apache_avro::from_value(&el).expect("could not deserialize from avro Value to specified type")) + StreamElement::Item( + apache_avro::from_value(&el) + .expect("could not deserialize from avro Value to specified type"), + ) } Some(Err(e)) => panic!("Error while reading Aveo file: {:?}", e), None => { @@ -163,14 +165,14 @@ impl crate::StreamContext { #[cfg(test)] mod tests { - use std::io::Write; + // use std::io::Write; - use itertools::Itertools; - use serde::{Deserialize, Serialize}; - use tempfile::NamedTempFile; + // use itertools::Itertools; + // use serde::{Deserialize, Serialize}; + // use tempfile::NamedTempFile; - use crate::config::RuntimeConfig; - use crate::environment::StreamContext; + // use crate::config::RuntimeConfig; + // use crate::environment::StreamContext; // use crate::operator::source::AvroSource; // #[test] diff --git a/src/operator/source/mod.rs b/src/operator/source/mod.rs index 32e12e9..a4f7640 100644 --- a/src/operator/source/mod.rs +++ b/src/operator/source/mod.rs @@ -3,22 +3,22 @@ pub use self::csv::*; #[cfg(feature = "tokio")] pub use async_stream::*; +pub use avro::*; pub use channel::*; pub use file::*; pub use iterator::*; pub use parallel_iterator::*; -pub use avro::*; use crate::{block::Replication, operator::Operator}; #[cfg(feature = "tokio")] mod async_stream; +mod avro; mod channel; mod csv; mod file; mod iterator; mod parallel_iterator; -mod avro; /// This trait marks all the operators that can be used as sinks. pub trait Source: Operator { diff --git a/tools/plot-profile.ipynb b/tools/plot-profile.ipynb new file mode 100644 index 0000000..be0599c --- /dev/null +++ b/tools/plot-profile.ipynb @@ -0,0 +1,115 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import seaborn as sns\n", + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n", + "from matplotlib import ticker\n", + "import os\n", + "import json\n", + "\n", + "sns.set_theme(context='paper', style='whitegrid', palette=\"tab10\")" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "ename": "FileNotFoundError", + "evalue": "[Errno 2] No such file or directory: '/tmp/renoir/trace/renoir-trace-1712930868.json'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mFileNotFoundError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[2], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m path \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m/tmp/renoir/trace/renoir-trace-1712930868.json\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[0;32m----> 2\u001b[0m \u001b[38;5;28;01mwith\u001b[39;00m \u001b[38;5;28;43mopen\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43mpath\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mr\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m \u001b[38;5;28;01mas\u001b[39;00m f:\n\u001b[1;32m 3\u001b[0m s \u001b[38;5;241m=\u001b[39m f\u001b[38;5;241m.\u001b[39mread()\n\u001b[1;32m 4\u001b[0m j \u001b[38;5;241m=\u001b[39m json\u001b[38;5;241m.\u001b[39mloads(s)\n", + "File \u001b[0;32m~/.venv/lib/python3.11/site-packages/IPython/core/interactiveshell.py:284\u001b[0m, in \u001b[0;36m_modified_open\u001b[0;34m(file, *args, **kwargs)\u001b[0m\n\u001b[1;32m 277\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m file \u001b[38;5;129;01min\u001b[39;00m {\u001b[38;5;241m0\u001b[39m, \u001b[38;5;241m1\u001b[39m, \u001b[38;5;241m2\u001b[39m}:\n\u001b[1;32m 278\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\n\u001b[1;32m 279\u001b[0m \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mIPython won\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mt let you open fd=\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mfile\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m by default \u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 280\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mas it is likely to crash IPython. If you know what you are doing, \u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 281\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124myou can use builtins\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m open.\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 282\u001b[0m )\n\u001b[0;32m--> 284\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mio_open\u001b[49m\u001b[43m(\u001b[49m\u001b[43mfile\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[0;31mFileNotFoundError\u001b[0m: [Errno 2] No such file or directory: '/tmp/renoir/trace/renoir-trace-1712930868.json'" + ] + } + ], + "source": [ + "path = \"/tmp/renoir/trace/renoir-trace-1712930868.json\"\n", + "with open(path, \"r\") as f:\n", + " s = f.read()\n", + "j = json.loads(s)\n", + "\n", + "\n", + "# print(j[\"profilers\"])\n", + "\n", + "df = pd.DataFrame(columns=[\"thread_name\", \"time\", \"link\", \"items_in\", \"items_out\", \"net_messages_in\", \"net_messages_out\", \"bytes_in\", \"bytes_out\"])\n", + "\n", + "def flatten_measure(profiler, bucket, link):\n", + " d = {\n", + " \"thread_name\": profiler[\"thread_name\"],\n", + " \"time\": bucket[\"start_ms\"],\n", + " \"link\": f\"\"\"{link[\"from\"][\"block_id\"]}.{link[\"from\"][\"host_id\"]}.{link[\"from\"][\"replica_id\"]}-{link[\"to\"][\"block_id\"]}.{link[\"to\"][\"host_id\"]}.{link[\"to\"][\"replica_id\"]}\"\"\",\n", + " \"items_in\": link[\"value\"][\"items_in\"],\n", + " \"items_out\": link[\"value\"][\"items_out\"],\n", + " \"net_messages_in\": link[\"value\"][\"net_messages_in\"],\n", + " \"net_messages_out\": link[\"value\"][\"net_messages_out\"],\n", + " \"bytes_in\": link[\"value\"][\"bytes_in\"],\n", + " \"bytes_out\": link[\"value\"][\"bytes_out\"],\n", + " }\n", + " return d\n", + "\n", + "i = 0\n", + "for prof in j[\"profilers\"]:\n", + " for bucket in prof[\"buckets\"]:\n", + " for link in bucket[\"link_metrics\"]:\n", + " df.loc[i] = flatten_measure(prof, bucket, link)\n", + " i += 1\n", + "\n", + "print(df.head())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'sns' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[1], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43msns\u001b[49m\u001b[38;5;241m.\u001b[39mrelplot(data\u001b[38;5;241m=\u001b[39mdf, x\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mtime\u001b[39m\u001b[38;5;124m\"\u001b[39m, y\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mbytes_out\u001b[39m\u001b[38;5;124m\"\u001b[39m, hue\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mlink\u001b[39m\u001b[38;5;124m\"\u001b[39m, col\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mthread_name\u001b[39m\u001b[38;5;124m\"\u001b[39m, col_wrap\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m4\u001b[39m)\n", + "\u001b[0;31mNameError\u001b[0m: name 'sns' is not defined" + ] + } + ], + "source": [ + "sns.relplot(data=df, x=\"time\", y=\"bytes_out\", hue=\"link\", col=\"thread_name\", col_wrap=4)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}