Skip to content

Commit

Permalink
Feature flag to enable new texts
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino committed Sep 19, 2024
1 parent baf737a commit b1078a1
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 9 deletions.
73 changes: 69 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nucliadb_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ path = "src/bin/writer.rs"
[dependencies]
axum = "0.6"
axum-server = "0.5"

mrflagly = { version = "0.2.9", default-features = false }
tonic = "0.11"
tonic-health = "0.11"
futures-core = "0.3.17"
Expand Down
4 changes: 3 additions & 1 deletion nucliadb_node/src/cache/writer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub struct ShardWriterCache {
pub shards_path: PathBuf,
cache: Mutex<InnerCache>,
metadata_manager: Arc<ShardsMetadataManager>,
settings: Settings,
}

impl ShardWriterCache {
Expand All @@ -114,6 +115,7 @@ impl ShardWriterCache {
cache: Mutex::new(InnerCache::new(settings.max_open_shards)),
shards_path: settings.shards_path(),
metadata_manager: Arc::new(ShardsMetadataManager::new(settings.shards_path())),
settings,
}
}

Expand All @@ -123,7 +125,7 @@ impl ShardWriterCache {

pub fn create(&self, new: NewShard) -> NodeResult<Arc<ShardWriter>> {
let shard_id = new.shard_id.clone();
let (shard, metadata) = ShardWriter::new(new, &self.shards_path)?;
let (shard, metadata) = ShardWriter::new(new, &self.shards_path, &self.settings)?;
let shard = Arc::new(shard);

self.metadata_manager.add_metadata(metadata);
Expand Down
34 changes: 34 additions & 0 deletions nucliadb_node/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! providers (to parse from CLI for example).
use anyhow::anyhow;
use mrflagly::{FlagService, FlagServiceOptions};
use nucliadb_core::tracing::Level;
use object_store::ObjectStore;
use serde::de::Unexpected;
Expand Down Expand Up @@ -89,22 +90,51 @@ pub fn load_settings() -> NodeResult<Settings> {
const SENTRY_ENVS: [&str; 2] = ["stage", "prod"];
const DEFAULT_ENV: &str = "stage";

// Feature flags
pub mod feature_flags {
pub const TEXTS3: &str = "nucliadb_node_texts3";
}
const DEFAULT_FEATURE_FLAGS: &str = r#"{"nucliadb_node_texts3": {"rollout": 100}}"#;

#[derive(Clone)]
pub struct Settings {
env: Arc<EnvSettings>,
pub object_store: Arc<dyn ObjectStore>,
pub flags: Arc<FlagService>,
}

impl From<EnvSettings> for Settings {
fn from(value: EnvSettings) -> Self {
let object_store = build_object_store_driver(&value);
let flags = Arc::new(build_flag_service(&value));
Self {
env: Arc::new(value),
object_store,
flags,
}
}
}

fn build_flag_service(settings: &EnvSettings) -> FlagService {
if let Some(flag_settings_url) = &settings.flag_settings_url {
FlagService::new(FlagServiceOptions {
finder_type: mrflagly::FlagFinderType::URL,
url: Some(flag_settings_url.clone()),
data: None,
env_var: None,
refresh_interval: 300,
})
} else {
FlagService::new(FlagServiceOptions {
finder_type: mrflagly::FlagFinderType::JSON,
url: None,
data: Some(DEFAULT_FEATURE_FLAGS.to_string()),
env_var: None,
refresh_interval: 300,
})
}
}

pub fn build_object_store_driver(settings: &EnvSettings) -> Arc<dyn ObjectStore> {
info!("File backend: {:?}", settings.file_backend);
match settings.file_backend {
Expand Down Expand Up @@ -258,6 +288,9 @@ pub struct EnvSettings {
pub s3_indexing_bucket: String,
pub s3_endpoint: Option<String>,
pub azure_account_url: Option<String>,

// Mr.Flagly
pub flag_settings_url: Option<String>,
}

impl EnvSettings {
Expand Down Expand Up @@ -340,6 +373,7 @@ impl Default for EnvSettings {
s3_indexing_bucket: Default::default(),
s3_endpoint: None,
azure_account_url: Default::default(),
flag_settings_url: None,
}
}
}
Expand Down
23 changes: 20 additions & 3 deletions nucliadb_node/src/shards/shard_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use super::indexes::{ShardIndexes, DEFAULT_VECTORS_INDEX_NAME};
use super::metadata::ShardMetadata;
use super::versioning::{self, Versions};
use crate::disk_structure::{self, *};
use crate::settings::{feature_flags, Settings};
use crate::telemetry::run_with_telemetry;

const MAX_LABEL_LENGTH: usize = 32768; // Tantivy max is 2^16 - 4
Expand Down Expand Up @@ -105,7 +106,7 @@ impl ShardWriter {
}

#[measure(actor = "shard", metric = "new")]
pub fn new(new: NewShard, shards_path: &Path) -> NodeResult<(Self, Arc<ShardMetadata>)> {
pub fn new(new: NewShard, shards_path: &Path, settings: &Settings) -> NodeResult<(Self, Arc<ShardMetadata>)> {
let span = tracing::Span::current();

if new.vector_configs.is_empty() {
Expand All @@ -119,6 +120,14 @@ impl ShardWriter {

std::fs::create_dir(&shard_path)?;

let ff_context = Some(HashMap::from([("kbid".to_string(), metadata.kbid())]));
let texts3_enabled = settings.flags.enabled(feature_flags::TEXTS3, false, ff_context);
let texts_version = if texts3_enabled {
3
} else {
2
};

let versions = Versions {
paragraphs: versioning::PARAGRAPHS_VERSION,
vectors: versioning::VECTORS_VERSION,
Expand All @@ -133,7 +142,7 @@ impl ShardWriter {
let tsc = TextConfig {
path: indexes.texts_path(),
};
let text_task = || Some(nucliadb_texts3::writer::TextWriterService::create(tsc));
let text_task = || Some(create_texts_writer(texts_version, tsc));
let info = info_span!(parent: &span, "text start");
let text_task = || run_with_telemetry(info, text_task);

Expand Down Expand Up @@ -199,7 +208,7 @@ impl ShardWriter {
path: shard_path,
metadata: Arc::clone(&metadata),
indexes: RwLock::new(ShardWriterIndexes {
texts_index: Box::new(fields.unwrap()),
texts_index: fields.unwrap(),
paragraphs_index: Box::new(paragraphs.unwrap()),
vectors_indexes: vectors,
relations_index: Box::new(relations.unwrap()),
Expand Down Expand Up @@ -693,6 +702,14 @@ pub fn open_texts_writer(version: u32, config: &TextConfig) -> NodeResult<TextsW
}
}

pub fn create_texts_writer(version: u32, config: TextConfig) -> NodeResult<TextsWriterPointer> {
match version {
2 => nucliadb_texts2::writer::TextWriterService::create(config).map(|i| Box::new(i) as TextsWriterPointer),
3 => nucliadb_texts3::writer::TextWriterService::create(config).map(|i| Box::new(i) as TextsWriterPointer),
v => Err(node_error!("Invalid text writer version {v}")),
}
}

pub fn open_relations_writer(version: u32, config: &RelationConfig) -> NodeResult<RelationsWriterPointer> {
match version {
2 => nucliadb_relations2::writer::RelationsWriterService::open(config)
Expand Down
3 changes: 3 additions & 0 deletions nucliadb_node/src/shards/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tempfile;
use uuid::Uuid;

use crate::disk_structure;
use crate::settings::EnvSettings;
use crate::shards::indexes::DEFAULT_VECTORS_INDEX_NAME;
use crate::shards::reader::ShardReader;

Expand All @@ -41,13 +42,15 @@ fn test_vectorsets() -> NodeResult<()> {
let shard_id = "shard".to_string();
let kbid = "kbid".to_string();

let settings = EnvSettings::default().into();
let (writer, _metadata) = ShardWriter::new(
crate::shards::writer::NewShard {
kbid: kbid.clone(),
shard_id: shard_id.clone(),
vector_configs: HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), VectorConfig::default())]),
},
&shards_path,
&settings,
)?;
writer.create_vectors_index(
"myvectorset".to_string(),
Expand Down

0 comments on commit b1078a1

Please sign in to comment.