diff --git a/Cargo.lock b/Cargo.lock index bfd18dfbf9..cb446bec2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -357,6 +357,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata 0.1.10", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -1124,6 +1135,28 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "httptest" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8b44a11846bda8c9fe9194f9924db7132c34635c7ce020f180f6c5d46d2308f" +dependencies = [ + "bstr", + "bytes", + "crossbeam-channel", + "form_urlencoded", + "futures", + "http 0.2.12", + "hyper 0.14.28", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", +] + [[package]] name = "humantime" version = "2.1.0" @@ -1394,6 +1427,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" + [[package]] name = "kv" version = "0.24.0" @@ -1579,6 +1618,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mrflagly" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58e76b314a1e3d1fdf11a06ec8d1ccd9fc7cda2f2dfb049884060d63259e359a" +dependencies = [ + "httptest", + "json", + "ureq", +] + [[package]] name = "multimap" version = "0.10.0" @@ -1686,6 +1736,7 @@ dependencies = [ "log", "lru 0.12.3", "md5", + "mrflagly", "nucliadb_core", "nucliadb_paragraphs3", "nucliadb_procs", @@ -2312,7 +2363,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -2332,7 +2383,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.66", @@ -2366,7 +2417,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.2", "pyo3-build-config", "pyo3-ffi", "pyo3-macros", @@ -2608,7 +2659,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 0.25.4", "winreg 0.50.0", ] @@ -3959,10 +4010,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d11a831e3c0b56e438a28308e7c810799e3c118417f342d30ecec080105395cd" dependencies = [ "base64 0.22.1", + "flate2", "log", "native-tls", "once_cell", + "rustls 0.22.4", + "rustls-pki-types", + "rustls-webpki 0.102.4", "url", + "webpki-roots 0.26.3", ] [[package]] @@ -4215,6 +4271,15 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "winapi" version = "0.2.8" diff --git a/nucliadb_node/Cargo.toml b/nucliadb_node/Cargo.toml index b12b94a310..a926b9a049 100644 --- a/nucliadb_node/Cargo.toml +++ b/nucliadb_node/Cargo.toml @@ -22,7 +22,7 @@ path = "src/bin/writer.rs" [dependencies] axum = "0.6" axum-server = "0.5" - +mrflagly = { version = "0.2.9", default-features = false } tonic = "0.11" tonic-health = "0.11" futures-core = "0.3.17" diff --git a/nucliadb_node/src/cache/writer_cache.rs b/nucliadb_node/src/cache/writer_cache.rs index a34bb229da..313f88edbf 100644 --- a/nucliadb_node/src/cache/writer_cache.rs +++ b/nucliadb_node/src/cache/writer_cache.rs @@ -106,6 +106,7 @@ pub struct ShardWriterCache { pub shards_path: PathBuf, cache: Mutex, metadata_manager: Arc, + settings: Settings, } impl ShardWriterCache { @@ -114,6 +115,7 @@ impl ShardWriterCache { cache: Mutex::new(InnerCache::new(settings.max_open_shards)), shards_path: settings.shards_path(), metadata_manager: Arc::new(ShardsMetadataManager::new(settings.shards_path())), + settings, } } @@ -123,7 +125,7 @@ impl ShardWriterCache { pub fn create(&self, new: NewShard) -> NodeResult> { let shard_id = new.shard_id.clone(); - let (shard, metadata) = ShardWriter::new(new, &self.shards_path)?; + let (shard, metadata) = ShardWriter::new(new, &self.shards_path, &self.settings)?; let shard = Arc::new(shard); self.metadata_manager.add_metadata(metadata); diff --git a/nucliadb_node/src/settings.rs b/nucliadb_node/src/settings.rs index 2a23306196..be2eca9b2c 100644 --- a/nucliadb_node/src/settings.rs +++ b/nucliadb_node/src/settings.rs @@ -31,6 +31,7 @@ //! providers (to parse from CLI for example). use anyhow::anyhow; +use mrflagly::{FlagService, FlagServiceOptions}; use nucliadb_core::tracing::Level; use object_store::ObjectStore; use serde::de::Unexpected; @@ -89,22 +90,51 @@ pub fn load_settings() -> NodeResult { const SENTRY_ENVS: [&str; 2] = ["stage", "prod"]; const DEFAULT_ENV: &str = "stage"; +// Feature flags +pub mod feature_flags { + pub const TEXTS3: &str = "nucliadb_node_texts3"; +} +const DEFAULT_FEATURE_FLAGS: &str = r#"{"nucliadb_node_texts3": {"rollout": 100}}"#; + #[derive(Clone)] pub struct Settings { env: Arc, pub object_store: Arc, + pub flags: Arc, } impl From for Settings { fn from(value: EnvSettings) -> Self { let object_store = build_object_store_driver(&value); + let flags = Arc::new(build_flag_service(&value)); Self { env: Arc::new(value), object_store, + flags, } } } +fn build_flag_service(settings: &EnvSettings) -> FlagService { + if let Some(flag_settings_url) = &settings.flag_settings_url { + FlagService::new(FlagServiceOptions { + finder_type: mrflagly::FlagFinderType::URL, + url: Some(flag_settings_url.clone()), + data: None, + env_var: None, + refresh_interval: 300, + }) + } else { + FlagService::new(FlagServiceOptions { + finder_type: mrflagly::FlagFinderType::JSON, + url: None, + data: Some(DEFAULT_FEATURE_FLAGS.to_string()), + env_var: None, + refresh_interval: 300, + }) + } +} + pub fn build_object_store_driver(settings: &EnvSettings) -> Arc { info!("File backend: {:?}", settings.file_backend); match settings.file_backend { @@ -258,6 +288,9 @@ pub struct EnvSettings { pub s3_indexing_bucket: String, pub s3_endpoint: Option, pub azure_account_url: Option, + + // Mr.Flagly + pub flag_settings_url: Option, } impl EnvSettings { @@ -340,6 +373,7 @@ impl Default for EnvSettings { s3_indexing_bucket: Default::default(), s3_endpoint: None, azure_account_url: Default::default(), + flag_settings_url: None, } } } diff --git a/nucliadb_node/src/shards/shard_writer.rs b/nucliadb_node/src/shards/shard_writer.rs index 4d16bfe50c..1c4888ef97 100644 --- a/nucliadb_node/src/shards/shard_writer.rs +++ b/nucliadb_node/src/shards/shard_writer.rs @@ -37,6 +37,7 @@ use super::indexes::{ShardIndexes, DEFAULT_VECTORS_INDEX_NAME}; use super::metadata::ShardMetadata; use super::versioning::{self, Versions}; use crate::disk_structure::{self, *}; +use crate::settings::{feature_flags, Settings}; use crate::telemetry::run_with_telemetry; const MAX_LABEL_LENGTH: usize = 32768; // Tantivy max is 2^16 - 4 @@ -105,7 +106,7 @@ impl ShardWriter { } #[measure(actor = "shard", metric = "new")] - pub fn new(new: NewShard, shards_path: &Path) -> NodeResult<(Self, Arc)> { + pub fn new(new: NewShard, shards_path: &Path, settings: &Settings) -> NodeResult<(Self, Arc)> { let span = tracing::Span::current(); if new.vector_configs.is_empty() { @@ -119,6 +120,14 @@ impl ShardWriter { std::fs::create_dir(&shard_path)?; + let ff_context = Some(HashMap::from([("kbid".to_string(), metadata.kbid())])); + let texts3_enabled = settings.flags.enabled(feature_flags::TEXTS3, false, ff_context); + let texts_version = if texts3_enabled { + 3 + } else { + 2 + }; + let versions = Versions { paragraphs: versioning::PARAGRAPHS_VERSION, vectors: versioning::VECTORS_VERSION, @@ -133,7 +142,7 @@ impl ShardWriter { let tsc = TextConfig { path: indexes.texts_path(), }; - let text_task = || Some(nucliadb_texts3::writer::TextWriterService::create(tsc)); + let text_task = || Some(create_texts_writer(texts_version, tsc)); let info = info_span!(parent: &span, "text start"); let text_task = || run_with_telemetry(info, text_task); @@ -199,7 +208,7 @@ impl ShardWriter { path: shard_path, metadata: Arc::clone(&metadata), indexes: RwLock::new(ShardWriterIndexes { - texts_index: Box::new(fields.unwrap()), + texts_index: fields.unwrap(), paragraphs_index: Box::new(paragraphs.unwrap()), vectors_indexes: vectors, relations_index: Box::new(relations.unwrap()), @@ -693,6 +702,14 @@ pub fn open_texts_writer(version: u32, config: &TextConfig) -> NodeResult NodeResult { + match version { + 2 => nucliadb_texts2::writer::TextWriterService::create(config).map(|i| Box::new(i) as TextsWriterPointer), + 3 => nucliadb_texts3::writer::TextWriterService::create(config).map(|i| Box::new(i) as TextsWriterPointer), + v => Err(node_error!("Invalid text writer version {v}")), + } +} + pub fn open_relations_writer(version: u32, config: &RelationConfig) -> NodeResult { match version { 2 => nucliadb_relations2::writer::RelationsWriterService::open(config) diff --git a/nucliadb_node/src/shards/tests.rs b/nucliadb_node/src/shards/tests.rs index 91348256bb..a7e3aadf2f 100644 --- a/nucliadb_node/src/shards/tests.rs +++ b/nucliadb_node/src/shards/tests.rs @@ -28,6 +28,7 @@ use tempfile; use uuid::Uuid; use crate::disk_structure; +use crate::settings::EnvSettings; use crate::shards::indexes::DEFAULT_VECTORS_INDEX_NAME; use crate::shards::reader::ShardReader; @@ -41,6 +42,7 @@ fn test_vectorsets() -> NodeResult<()> { let shard_id = "shard".to_string(); let kbid = "kbid".to_string(); + let settings = EnvSettings::default().into(); let (writer, _metadata) = ShardWriter::new( crate::shards::writer::NewShard { kbid: kbid.clone(), @@ -48,6 +50,7 @@ fn test_vectorsets() -> NodeResult<()> { vector_configs: HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), VectorConfig::default())]), }, &shards_path, + &settings, )?; writer.create_vectors_index( "myvectorset".to_string(),