From a5ecca976ec3abf97c8c3db4f78c230b4553b509 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sun, 19 May 2024 20:45:53 +0100 Subject: [PATCH] proxy: bump parquet (#7782) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary of changes Updates the parquet lib. one change left that we need is in an open PR against upstream, hopefully we can remove the git dependency by 52.0.0 https://github.com/apache/arrow-rs/pull/5773 I'm not sure why the parquet files got a little bit bigger. I tested them and they still open fine. 🤷 side effect of the update, chrono updated and added yet another deprecation warning (hence why the safekeepers change) --- Cargo.lock | 29 ++++++++++---- Cargo.toml | 8 ++-- proxy/src/context/parquet.rs | 73 +++++++++++++++++++----------------- safekeeper/src/broker.rs | 2 +- workspace_hack/Cargo.toml | 4 +- 5 files changed, 67 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1edd53feaa1..e6060c82f581 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1072,9 +1072,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1082,7 +1082,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.48.0", + "windows-targets 0.52.4", ] [[package]] @@ -1109,7 +1109,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b" dependencies = [ "ciborium-io", - "half", + "half 1.8.2", ] [[package]] @@ -2278,6 +2278,17 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", + "num-traits", +] + [[package]] name = "hash32" version = "0.3.1" @@ -3902,12 +3913,13 @@ dependencies = [ [[package]] name = "parquet" -version = "49.0.0" -source = "git+https://github.com/neondatabase/arrow-rs?branch=neon-fix-bugs#8a0bc58aa67b98aabbd8eee7c6ca4281967ff9e9" +version = "51.0.0" +source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829" dependencies = [ "ahash", "bytes", "chrono", + "half 2.4.1", "hashbrown 0.14.5", "num", "num-bigint", @@ -3916,12 +3928,13 @@ dependencies = [ "thrift", "twox-hash", "zstd", + "zstd-sys", ] [[package]] name = "parquet_derive" -version = "49.0.0" -source = "git+https://github.com/neondatabase/arrow-rs?branch=neon-fix-bugs#8a0bc58aa67b98aabbd8eee7c6ca4281967ff9e9" +version = "51.0.0" +source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829" dependencies = [ "parquet", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index b59a5dcd6d83..2a7dea447e2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,8 +122,8 @@ opentelemetry = "0.20.0" opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] } opentelemetry-semantic-conventions = "0.12.0" parking_lot = "0.12" -parquet = { version = "49.0.0", default-features = false, features = ["zstd"] } -parquet_derive = "49.0.0" +parquet = { version = "51.0.0", default-features = false, features = ["zstd"] } +parquet_derive = "51.0.0" pbkdf2 = { version = "0.12.1", features = ["simple", "std"] } pin-project-lite = "0.2" procfs = "0.14" @@ -244,8 +244,8 @@ tonic-build = "0.9" tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" } # bug fixes for UUID -parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs" } -parquet_derive = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs" } +parquet = { git = "https://github.com/apache/arrow-rs", branch = "master" } +parquet_derive = { git = "https://github.com/apache/arrow-rs", branch = "master" } ################# Binary contents sections diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 8104fe608759..392821c430b6 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -307,7 +307,7 @@ where } async fn upload_parquet( - w: SerializedFileWriter>, + mut w: SerializedFileWriter>, len: i64, storage: &GenericRemoteStorage, ) -> anyhow::Result> { @@ -319,11 +319,15 @@ async fn upload_parquet( // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry. // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253 - let (writer, metadata) = tokio::task::spawn_blocking(move || w.finish()) + let (mut buffer, metadata) = + tokio::task::spawn_blocking(move || -> parquet::errors::Result<_> { + let metadata = w.finish()?; + let buffer = std::mem::take(w.inner_mut().get_mut()); + Ok((buffer, metadata)) + }) .await .unwrap()?; - let mut buffer = writer.into_inner(); let data = buffer.split().freeze(); let compression = len as f64 / len_uncompressed as f64; @@ -474,10 +478,11 @@ mod tests { RequestData { session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(), peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(), - timestamp: chrono::NaiveDateTime::from_timestamp_millis( + timestamp: chrono::DateTime::from_timestamp_millis( rng.gen_range(1703862754..1803862754), ) - .unwrap(), + .unwrap() + .naive_utc(), application_name: Some("test".to_owned()), username: Some(hex::encode(rng.gen::<[u8; 4]>())), endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())), @@ -560,15 +565,15 @@ mod tests { assert_eq!( file_stats, [ - (1315008, 3, 6000), - (1315001, 3, 6000), - (1315061, 3, 6000), - (1315018, 3, 6000), - (1315148, 3, 6000), - (1314990, 3, 6000), - (1314782, 3, 6000), - (1315018, 3, 6000), - (438575, 1, 2000) + (1315314, 3, 6000), + (1315307, 3, 6000), + (1315367, 3, 6000), + (1315324, 3, 6000), + (1315454, 3, 6000), + (1315296, 3, 6000), + (1315088, 3, 6000), + (1315324, 3, 6000), + (438713, 1, 2000) ] ); @@ -598,11 +603,11 @@ mod tests { assert_eq!( file_stats, [ - (1221738, 5, 10000), - (1227888, 5, 10000), - (1229682, 5, 10000), - (1229044, 5, 10000), - (1220322, 5, 10000) + (1222212, 5, 10000), + (1228362, 5, 10000), + (1230156, 5, 10000), + (1229518, 5, 10000), + (1220796, 5, 10000) ] ); @@ -634,11 +639,11 @@ mod tests { assert_eq!( file_stats, [ - (1207385, 5, 10000), - (1207116, 5, 10000), - (1207409, 5, 10000), - (1207397, 5, 10000), - (1207652, 5, 10000) + (1207859, 5, 10000), + (1207590, 5, 10000), + (1207883, 5, 10000), + (1207871, 5, 10000), + (1208126, 5, 10000) ] ); @@ -663,15 +668,15 @@ mod tests { assert_eq!( file_stats, [ - (1315008, 3, 6000), - (1315001, 3, 6000), - (1315061, 3, 6000), - (1315018, 3, 6000), - (1315148, 3, 6000), - (1314990, 3, 6000), - (1314782, 3, 6000), - (1315018, 3, 6000), - (438575, 1, 2000) + (1315314, 3, 6000), + (1315307, 3, 6000), + (1315367, 3, 6000), + (1315324, 3, 6000), + (1315454, 3, 6000), + (1315296, 3, 6000), + (1315088, 3, 6000), + (1315324, 3, 6000), + (438713, 1, 2000) ] ); @@ -708,7 +713,7 @@ mod tests { // files are smaller than the size threshold, but they took too long to fill so were flushed early assert_eq!( file_stats, - [(659240, 2, 3001), (658954, 2, 3000), (658750, 2, 2999)] + [(659462, 2, 3001), (659176, 2, 3000), (658972, 2, 2999)] ); tmpdir.close().unwrap(); diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 98f58d3e497f..ea16ce450f3f 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -319,7 +319,7 @@ async fn task_stats(stats: Arc) { let now = BrokerStats::now_millis(); if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 { - let ts = chrono::NaiveDateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp"); + let ts = chrono::DateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp"); info!("no broker updates for some time, last update: {:?}", ts); } } diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index b605757f64c4..75825624508c 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -51,7 +51,7 @@ num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "libm"] } once_cell = { version = "1" } -parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs", default-features = false, features = ["zstd"] } +parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] } prost = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] } regex = { version = "1" } @@ -102,7 +102,7 @@ num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "libm"] } once_cell = { version = "1" } -parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs", default-features = false, features = ["zstd"] } +parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] } prost = { version = "0.11" } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }