diff --git a/Cargo.toml b/Cargo.toml index 139e58b..a64ff38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,7 +127,7 @@ harness = false [features] default = ["benchmarking"] benchmarking = ["tempdir"] -test-python = [] +no_auto_embedding_field_on_creation = [] [workspace] members = [] diff --git a/config.jsonc b/config.jsonc deleted file mode 100644 index 58dc7b7..0000000 --- a/config.jsonc +++ /dev/null @@ -1,79 +0,0 @@ -{ - // http bind address - "http": { - "host": "0.0.0.0", - "port": 8080, - "allow_cors": true, - "with_prometheus": true, - }, - - "writer_side": { - "output": "in-memory", - - "config": { - "data_dir": "./.data/writer", - - // The maximum number of embeddings that can be stored in the queue - // before the writer starts to be blocked - // NB: the elements are in memory, so be careful with this value - "embedding_queue_limit": 50, - } - }, - "reader_side": { - "input": "in-memory", - - "config": { - "data_dir": "./.data/reader", - } - }, - - "ai_server": { - "scheme": "http", - "host": "127.0.0.1", - "port": 50051, - "api_key": "", - "max_connections": 15, - - "embeddings": { - "default_model_group": "en", - "dynamically_load_models": false, - "execution_providers": [ - "CPUExecutionProvider" - ], - "total_threads": 8 - }, - - "LLMs": { - "content_expansion": { - "id": "Qwen/Qwen2.5-3B-Instruct", - "tensor_parallel_size": 1, - "use_cpu": true, - "sampling_params": { - "temperature": 0.2, - "top_p": 0.95, - "max_tokens": 256 - } - }, - "google_query_translator": { - "id": "Qwen/Qwen2.5-3B-Instruct", - "tensor_parallel_size": 1, - "use_cpu": true, - "sampling_params": { - "temperature": 0.2, - "top_p": 0.95, - "max_tokens": 20 - } - }, - "answer": { - "id": "Qwen/Qwen2.5-3B-Instruct", - "tensor_parallel_size": 1, - "use_cpu": true, - "sampling_params": { - "temperature": 0, - "top_p": 0.95, - "max_tokens": 2048 - } - }, - }, - }, -} \ No newline at end of file diff --git a/config.yaml b/config.yaml index bf5464a..5ac7487 100644 --- a/config.yaml +++ b/config.yaml @@ -12,11 +12,18 @@ writer_side: # before the writer starts to be blocked # NB: the elements are in memory, so be careful with this value embedding_queue_limit: 50 + # The number of the document insertions after the write side will commit the changes + insert_batch_commit_size: 1_000 + # The default embedding model used to calculate the embeddings + # if not specified in the collection creation + default_embedding_model: BGESmall reader_side: input: in-memory config: data_dir: ./.data/reader + # The number of the write operation after the read side will commit the changes + insert_batch_commit_size: 300 ai_server: scheme: http diff --git a/prometheus.yml b/prometheus.yml index efcd2d3..db994ff 100644 --- a/prometheus.yml +++ b/prometheus.yml @@ -4,4 +4,4 @@ scrape_configs: - job_name: node static_configs: - targets: - - 192.168.1.18:8080 + - 192.168.1.14:8080 diff --git a/src/collection_manager/sides/read/collection.rs b/src/collection_manager/sides/read/collection.rs index 42efea2..266d1e8 100644 --- a/src/collection_manager/sides/read/collection.rs +++ b/src/collection_manager/sides/read/collection.rs @@ -24,7 +24,7 @@ use crate::{ CollectionWriteOperation, DocumentFieldIndexOperation, Offset, OramaModelSerializable, }, }, - file_utils::BufferedFile, + file_utils::{create_or_overwrite, BufferedFile}, indexes::{ bool::BoolIndex, number::{NumberFilter, NumberIndex, NumberIndexConfig}, @@ -53,14 +53,14 @@ pub struct CollectionReader { fields: DashMap, // indexes - vector_index: VectorIndex, + vector_index: Arc, fields_per_model: DashMap>, - string_index: StringIndex, + string_index: Arc, text_parser_per_field: DashMap)>, - number_index: NumberIndex, - bool_index: BoolIndex, + number_index: Arc, + bool_index: Arc, // TODO: textparser -> vec offset_storage: OffsetStorage, } @@ -74,13 +74,17 @@ impl CollectionReader { ) -> Result { let vector_index = VectorIndex::try_new(VectorIndexConfig {}) .context("Cannot create vector index during collection creation")?; + let vector_index = Arc::new(vector_index); let string_index = StringIndex::new(StringIndexConfig {}); + let string_index = Arc::new(string_index); let number_index = NumberIndex::try_new(NumberIndexConfig {}) .context("Cannot create number index during collection creation")?; + let number_index = Arc::new(number_index); let bool_index = BoolIndex::new(); + let bool_index = Arc::new(bool_index); Ok(Self { id, @@ -127,16 +131,19 @@ impl CollectionReader { } pub async fn load(&mut self, collection_data_dir: PathBuf) -> Result<()> { - self.string_index + Arc::get_mut(&mut self.string_index) + .expect("string_index is shared") .load(collection_data_dir.join("strings")) .context("Cannot load string index")?; - self.number_index + Arc::get_mut(&mut self.number_index) + .expect("number_index is shared") .load(collection_data_dir.join("numbers")) .context("Cannot load number index")?; self.vector_index .load(collection_data_dir.join("vectors")) .context("Cannot load vectors index")?; - self.bool_index + Arc::get_mut(&mut self.bool_index) + .expect("bool_index is shared") .load(collection_data_dir.join("bools")) .context("Cannot load bool index")?; @@ -183,19 +190,58 @@ impl CollectionReader { Ok(()) } - pub fn commit(&self, data_dir: PathBuf) -> Result<()> { - self.string_index - .commit(data_dir.join("strings")) - .context("Cannot commit string index")?; - self.number_index - .commit(data_dir.join("numbers")) - .context("Cannot commit number index")?; - self.vector_index - .commit(data_dir.join("vectors")) - .context("Cannot commit vectors index")?; - self.bool_index - .commit(data_dir.join("bools")) - .context("Cannot commit bool index")?; + pub async fn commit(&self, data_dir: PathBuf) -> Result<()> { + // The following code performs the commit of the indexes on disk. + // Because we live inside a async runtime and the commit operation is blocking, + // we need to spawn a blocking task to perform the commit operation. + // Anyway, we keep the sequential order of the commit operations, + // not because it is important but because every commit operation allocates memory + // So, to avoid to allocate too much memory, we prefer to perform the commit sequentially. + // TODO: understand if we could perform the commit in parallel + + let string_index = self.string_index.clone(); + let string_dir = data_dir.join("strings"); + tokio::task::spawn_blocking(move || { + string_index + .commit(string_dir) + .context("Cannot commit string index") + }) + .await + .context("Cannot spawn blocking task")? + .context("Cannot commit string index")?; + + let number_index = self.number_index.clone(); + let number_dir = data_dir.join("numbers"); + tokio::task::spawn_blocking(move || { + number_index + .commit(number_dir) + .context("Cannot commit number index") + }) + .await + .context("Cannot spawn blocking task")? + .context("Cannot commit number index")?; + + let vector_index = self.vector_index.clone(); + let vector_dir = data_dir.join("vectors"); + tokio::task::spawn_blocking(move || { + vector_index + .commit(vector_dir) + .context("Cannot commit vector index") + }) + .await + .context("Cannot spawn blocking task")? + .context("Cannot commit vector index")?; + + let bool_index = self.bool_index.clone(); + let bool_dir = data_dir.join("bools"); + tokio::task::spawn_blocking(move || { + bool_index + .commit(bool_dir) + .context("Cannot commit bool index") + }) + .await + .context("Cannot spawn blocking task")? + .context("Cannot commit bool index")?; let dump = dump::CollectionInfo::V1(dump::CollectionInfoV1 { id: self.id.clone(), @@ -231,10 +277,9 @@ impl CollectionReader { }); let coll_desc_file_path = data_dir.join("info.json"); - BufferedFile::create_or_overwrite(coll_desc_file_path) - .context("Cannot create info.json file")? - .write_json_data(&dump) - .with_context(|| format!("Cannot serialize collection info for {:?}", self.id))?; + create_or_overwrite(coll_desc_file_path, &dump) + .await + .context("Cannot create info.json file")?; Ok(()) } @@ -545,21 +590,19 @@ impl CollectionReader { .entry(locale) .or_insert_with(|| text_parser.tokenize(term)); - self.string_index - .search( - tokens, - // This option is not required. - // It was introduced because for test purposes we - // could avoid to pass every properties - // Anyway the production code should always pass the properties - // So we could avoid this option - // TODO: remove this option - Some(&[field_id]), - &boost, - &mut scorer, - filtered_doc_ids.as_ref(), - ) - .await?; + self.string_index.search( + tokens, + // This option is not required. + // It was introduced because for test purposes we + // could avoid to pass every properties + // Anyway the production code should always pass the properties + // So we could avoid this option + // TODO: remove this option + Some(&[field_id]), + &boost, + &mut scorer, + filtered_doc_ids.as_ref(), + )?; } Ok(scorer.get_scores()) diff --git a/src/collection_manager/sides/read/collections.rs b/src/collection_manager/sides/read/collections.rs index 8d8e592..63026a0 100644 --- a/src/collection_manager/sides/read/collections.rs +++ b/src/collection_manager/sides/read/collections.rs @@ -1,14 +1,15 @@ use std::{ collections::{HashMap, HashSet}, ops::Deref, - path::PathBuf, sync::Arc, }; use crate::{ ai::AIService, collection_manager::sides::Offset, - file_utils::{create_if_not_exists, BufferedFile}, + file_utils::{ + create_if_not_exists, create_if_not_exists_async, create_or_overwrite, BufferedFile, + }, nlp::NLPService, offset_storage::OffsetStorage, types::CollectionId, @@ -19,12 +20,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::{RwLock, RwLockReadGuard}; use tracing::{info, instrument, warn}; -use super::collection::CollectionReader; - -#[derive(Debug, Deserialize, Clone)] -pub struct IndexesConfig { - pub data_dir: PathBuf, -} +use super::{collection::CollectionReader, IndexesConfig}; #[derive(Debug)] pub struct CollectionsReader { @@ -121,13 +117,17 @@ impl CollectionsReader { pub async fn commit(&self) -> Result<()> { let data_dir = &self.indexes_config.data_dir; - create_if_not_exists(data_dir).context("Cannot create data directory")?; + create_if_not_exists_async(data_dir) + .await + .context("Cannot create data directory")?; let col = self.collections.read().await; let col = &*col; let collections_dir = data_dir.join("collections"); - create_if_not_exists(&collections_dir).context("Cannot create 'collections' directory")?; + create_if_not_exists_async(&collections_dir) + .await + .context("Cannot create 'collections' directory")?; let collection_ids: Vec<_> = col.keys().cloned().collect(); @@ -136,10 +136,11 @@ impl CollectionsReader { let collection_dir = collections_dir.join(&id.0); - create_if_not_exists(&collection_dir) + create_if_not_exists_async(&collection_dir) + .await .with_context(|| format!("Cannot create directory for collection '{}'", id.0))?; - reader.commit(collection_dir)?; + reader.commit(collection_dir).await?; info!("Collection {:?} committed", id); } @@ -148,10 +149,9 @@ impl CollectionsReader { collection_ids: collection_ids.into_iter().collect(), }); - BufferedFile::create_or_overwrite(data_dir.join("info.json")) - .context("Cannot create info.json file")? - .write_json_data(&collections_info) - .context("Cannot serialize info.json file")?; + create_or_overwrite(data_dir.join("info.json"), &collections_info) + .await + .context("Cannot create info.json file")?; Ok(()) } diff --git a/src/collection_manager/sides/read/document_storage.rs b/src/collection_manager/sides/read/document_storage.rs index d5682ac..b797a55 100644 --- a/src/collection_manager/sides/read/document_storage.rs +++ b/src/collection_manager/sides/read/document_storage.rs @@ -1,11 +1,11 @@ use serde::{de::Unexpected, Deserialize, Serialize}; -use std::{collections::HashMap, fmt::Debug, path::PathBuf, sync::RwLock}; +use std::{collections::HashMap, fmt::Debug, path::PathBuf}; use tracing::{debug, trace, warn}; use anyhow::{anyhow, Context, Ok, Result}; use crate::{ - file_utils::BufferedFile, + file_utils::{create_or_overwrite, read_file}, types::{DocumentId, RawJSONDocument}, }; @@ -18,20 +18,20 @@ use crate::{ #[derive(Debug)] struct CommittedDiskDocumentStorage { path: PathBuf, - cache: RwLock>, + cache: tokio::sync::RwLock>, } impl CommittedDiskDocumentStorage { - fn try_new(path: PathBuf) -> Result { - let cache: RwLock> = Default::default(); + fn new(path: PathBuf) -> Self { + let cache: tokio::sync::RwLock> = Default::default(); - Ok(Self { path, cache }) + Self { path, cache } } - fn get_documents_by_ids(&self, doc_ids: &[DocumentId]) -> Result>> { - let lock = match self.cache.read() { - std::result::Result::Ok(lock) => lock, - std::result::Result::Err(e) => e.into_inner(), - }; + async fn get_documents_by_ids( + &self, + doc_ids: &[DocumentId], + ) -> Result>> { + let lock = self.cache.read().await; let mut from_cache: HashMap = doc_ids .iter() .filter_map(|id| lock.get(id).map(|d| (*id, d.clone()))) @@ -41,13 +41,18 @@ impl CommittedDiskDocumentStorage { trace!(doc_len=?doc_ids.len(), "Read document"); let mut result = Vec::with_capacity(doc_ids.len()); for id in doc_ids { + trace!(?id, "Reading document"); if let Some(d) = from_cache.remove(id) { + trace!("In cache. skip"); result.push(Some(d)); continue; } let doc_path = self.path.join(format!("{}", id.0)); - match std::fs::exists(&doc_path) { + trace!(?doc_path, "Check on FS"); + + let exists = tokio::fs::try_exists(&doc_path).await; + match exists { Err(e) => { return Err(anyhow!( "Error while checking if the document exists: {:?}", @@ -55,22 +60,27 @@ impl CommittedDiskDocumentStorage { )); } std::result::Result::Ok(false) => { + trace!("Not found on FS"); result.push(None); continue; } std::result::Result::Ok(true) => {} }; - trace!(?doc_path, "Reading file"); - let doc: RawJSONDocumentWrapper = BufferedFile::open(doc_path) - .context("Cannot open document file")? - .read_json_data() - .context("Cannot read document data")?; - - let mut lock = match self.cache.write() { - std::result::Result::Ok(lock) => lock, - std::result::Result::Err(e) => e.into_inner(), + let doc: RawJSONDocumentWrapper = match read_file(doc_path).await { + std::result::Result::Err(_) => { + // It could happen the `commit` method creates the file (so the previous check passes) + // but not with the full content written. + // In that case, `read_file` will fail. + // If it happens, this arm is triggered. + trace!("Error on read from FS"); + result.push(None); + continue; + } + std::result::Result::Ok(doc) => doc, }; + + let mut lock = self.cache.write().await; lock.insert(*id, doc.0.clone()); drop(lock); @@ -80,15 +90,14 @@ impl CommittedDiskDocumentStorage { Ok(result) } - fn add(&self, docs: Vec<(DocumentId, RawJSONDocument)>) -> Result<()> { + async fn add(&self, docs: Vec<(DocumentId, RawJSONDocument)>) -> Result<()> { for (doc_id, doc) in docs { let doc_path = self.path.join(format!("{}", doc_id.0)); let doc = RawJSONDocumentWrapper(doc); - BufferedFile::create(doc_path) - .context("Cannot create document file")? - .write_json_data(&doc) + create_or_overwrite(doc_path, &doc) + .await .context("Cannot write document data")?; } @@ -103,7 +112,7 @@ pub struct DocumentStorageConfig { #[derive(Debug)] pub struct DocumentStorage { - uncommitted: RwLock>, + uncommitted: tokio::sync::RwLock>, committed: CommittedDiskDocumentStorage, } @@ -113,16 +122,12 @@ impl DocumentStorage { Ok(Self { uncommitted: Default::default(), - committed: CommittedDiskDocumentStorage::try_new(config.data_dir) - .context("Cannot create CommittedDiskDocumentStorage")?, + committed: CommittedDiskDocumentStorage::new(config.data_dir), }) } pub async fn add_document(&self, doc_id: DocumentId, doc: RawJSONDocument) -> Result<()> { - let mut uncommitted = match self.uncommitted.write() { - std::result::Result::Ok(uncommitted) => uncommitted, - std::result::Result::Err(e) => e.into_inner(), - }; + let mut uncommitted = self.uncommitted.write().await; if uncommitted.insert(doc_id, doc).is_some() { warn!("Document {:?} already exists. Overwritten.", doc_id); } @@ -134,17 +139,16 @@ impl DocumentStorage { &self, doc_ids: Vec, ) -> Result>> { - debug!("Get documents"); - let committed = self.committed.get_documents_by_ids(&doc_ids)?; + debug!("Get from committed documents"); + let committed = self.committed.get_documents_by_ids(&doc_ids).await?; - let uncommitted = match self.uncommitted.read() { - std::result::Result::Ok(uncommitted) => uncommitted, - std::result::Result::Err(e) => e.into_inner(), - }; + trace!("Get from uncommitted documents"); + let uncommitted = self.uncommitted.read().await; let uncommitted: Vec<_> = doc_ids .into_iter() .map(|doc_id| uncommitted.get(&doc_id).cloned()) .collect(); + trace!("Get from uncommitted documents done"); let result = committed .into_iter() @@ -165,21 +169,21 @@ impl DocumentStorage { Ok(result) } - pub fn commit(&self) -> Result<()> { + pub async fn commit(&self) -> Result<()> { + trace!("Commit documents"); // This implementation is wrong: // in the mean time we "dran" + "collection" + "write on FS" // The documents aren't reachable. So the search output will not contain them. // We should follow the same path of the indexes. // TODO: fix me - let mut uncommitted = match self.uncommitted.write() { - std::result::Result::Ok(uncommitted) => uncommitted, - std::result::Result::Err(e) => e.into_inner(), - }; - let uncommitted: Vec<_> = uncommitted.drain().collect(); + let mut lock = self.uncommitted.write().await; + let uncommitted: Vec<_> = lock.drain().collect(); + drop(lock); self.committed .add(uncommitted) + .await .context("Cannot commit documents")?; Ok(()) diff --git a/src/collection_manager/sides/read/mod.rs b/src/collection_manager/sides/read/mod.rs index a9d7493..59a5279 100644 --- a/src/collection_manager/sides/read/mod.rs +++ b/src/collection_manager/sides/read/mod.rs @@ -2,13 +2,15 @@ mod collection; mod collections; mod document_storage; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; use anyhow::{Context, Result}; use collections::CollectionsReader; -pub use collections::IndexesConfig; use document_storage::{DocumentStorage, DocumentStorageConfig}; use ordered_float::NotNan; +use serde::Deserialize; +use tokio::sync::RwLock; +use tracing::trace; use crate::{ ai::AIService, @@ -24,9 +26,18 @@ use crate::{ use super::{CollectionWriteOperation, Offset, WriteOperation}; +#[derive(Debug, Deserialize, Clone)] +pub struct IndexesConfig { + pub data_dir: PathBuf, + #[serde(default = "default_insert_batch_commit_size")] + pub insert_batch_commit_size: u64, +} + pub struct ReadSide { collections: CollectionsReader, document_storage: DocumentStorage, + operation_counter: RwLock, + insert_batch_commit_size: u64, } impl ReadSide { @@ -40,9 +51,13 @@ impl ReadSide { }) .context("Cannot create document storage")?; + let insert_batch_commit_size = indexes_config.insert_batch_commit_size; + Ok(Self { collections: CollectionsReader::try_new(ai_service, nlp_service, indexes_config)?, document_storage, + operation_counter: Default::default(), + insert_batch_commit_size, }) } @@ -61,6 +76,7 @@ impl ReadSide { self.document_storage .commit() + .await .context("Cannot commit document storage")?; Ok(()) @@ -87,11 +103,13 @@ impl ReadSide { let top_results: Vec = top_n(token_scores, limit.0); + trace!("Top results: {:?}", top_results); let docs = self .document_storage .get_documents_by_ids(top_results.iter().map(|m| m.document_id).collect()) .await?; + trace!("Calculates hits"); let hits: Vec<_> = top_results .into_iter() .zip(docs) @@ -150,6 +168,20 @@ impl ReadSide { } } + let mut lock = self.operation_counter.write().await; + *lock += 1; + let should_commit = if *lock >= self.insert_batch_commit_size { + *lock = 0; + true + } else { + false + }; + drop(lock); + + if should_commit { + self.commit().await?; + } + Ok(()) } @@ -188,6 +220,10 @@ fn top_n(map: HashMap, n: usize) -> Vec { result } +fn default_insert_batch_commit_size() -> u64 { + 300 +} + #[cfg(test)] mod tests { use crate::collection_manager::sides::read::{ diff --git a/src/collection_manager/sides/write/collections.rs b/src/collection_manager/sides/write/collections.rs index 1242fbe..5533542 100644 --- a/src/collection_manager/sides/write/collections.rs +++ b/src/collection_manager/sides/write/collections.rs @@ -1,10 +1,8 @@ use std::collections::HashMap; use std::ops::Deref; -use std::path::PathBuf; use std::sync::Arc; use anyhow::{anyhow, Context, Ok, Result}; -use serde::Deserialize; use tokio::sync::{RwLock, RwLockReadGuard}; use tracing::{info, instrument}; @@ -19,7 +17,7 @@ use crate::collection_manager::dto::{ }; use super::{collection::CollectionWriter, embedding::EmbeddingCalculationRequest, WriteOperation}; -use super::{OperationSender, OramaModelSerializable}; +use super::{CollectionsWriterConfig, OperationSender}; pub struct CollectionsWriter { collections: RwLock>, @@ -27,15 +25,6 @@ pub struct CollectionsWriter { embedding_sender: tokio::sync::mpsc::Sender, } -#[derive(Debug, Deserialize, Clone)] -pub struct CollectionsWriterConfig { - pub data_dir: PathBuf, - #[serde(default = "embedding_queue_limit_default")] - pub embedding_queue_limit: usize, - #[serde(default = "embedding_model_default")] - pub default_embedding_model: OramaModelSerializable, -} - impl CollectionsWriter { pub fn new( config: CollectionsWriterConfig, @@ -81,36 +70,41 @@ impl CollectionsWriter { self.embedding_sender.clone(), ); + let typed_fields = if !cfg!(feature = "no_auto_embedding_field_on_creation") { + let model = embeddings + .as_ref() + .and_then(|embeddings| embeddings.model.as_ref()) + .unwrap_or(&self.config.default_embedding_model); + let model = model.0; + let document_fields = embeddings + .map(|embeddings| embeddings.document_fields) + .map(DocumentFields::Properties) + .unwrap_or(DocumentFields::AllStringProperties); + let typed_field = TypedField::Embedding(EmbeddingTypedField { + model, + document_fields, + }); + HashMap::from_iter([("___orama_auto_embedding".to_string(), typed_field)]) + } else { + HashMap::new() + }; + + let mut collections = self.collections.write().await; + if collections.contains_key(&id) { + // This error should be typed. + // TODO: create a custom error type + return Err(anyhow!(format!("Collection \"{}\" already exists", id.0))); + } + + // Send event & Register field should be inside the lock transaction sender .send(WriteOperation::CreateCollection { id: id.clone() }) .context("Cannot send create collection")?; - - let model = embeddings - .as_ref() - .and_then(|embeddings| embeddings.model.as_ref()) - .unwrap_or(&self.config.default_embedding_model); - let model = model.0; - let document_fields = embeddings - .map(|embeddings| embeddings.document_fields) - .map(DocumentFields::Properties) - .unwrap_or(DocumentFields::AllStringProperties); - let typed_field = TypedField::Embedding(EmbeddingTypedField { - model, - document_fields, - }); - let typed_fields = HashMap::from_iter([("aa".to_string(), typed_field)]); - collection .register_fields(typed_fields, sender.clone(), hooks_runtime) .await .context("Cannot register fields")?; - let mut collections = self.collections.write().await; - if collections.contains_key(&id) { - // This error should be typed. - // todo: create a custom error type - return Err(anyhow!(format!("Collection \"{}\" already exists", id.0))); - } collections.insert(id, collection); drop(collections); @@ -230,14 +224,6 @@ impl Deref for CollectionWriteLock<'_> { } } -fn embedding_queue_limit_default() -> usize { - 50 -} - -fn embedding_model_default() -> OramaModelSerializable { - OramaModelSerializable(crate::ai::OramaModel::BgeSmall) -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/collection_manager/sides/write/mod.rs b/src/collection_manager/sides/write/mod.rs index c9501d4..aa6ec84 100644 --- a/src/collection_manager/sides/write/mod.rs +++ b/src/collection_manager/sides/write/mod.rs @@ -16,10 +16,10 @@ use std::{ use super::hooks::{HookName, HooksRuntime}; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; use tracing::{info, warn}; use collections::CollectionsWriter; -pub use collections::CollectionsWriterConfig; use embedding::{start_calculate_embedding_loop, EmbeddingCalculationRequest}; pub use operation::*; @@ -35,6 +35,17 @@ use crate::{ types::{CollectionId, DocumentId, DocumentList}, }; +#[derive(Debug, Deserialize, Clone)] +pub struct CollectionsWriterConfig { + pub data_dir: PathBuf, + #[serde(default = "embedding_queue_limit_default")] + pub embedding_queue_limit: usize, + #[serde(default = "embedding_model_default")] + pub default_embedding_model: OramaModelSerializable, + #[serde(default = "default_insert_batch_commit_size")] + pub insert_batch_commit_size: u64, +} + pub struct WriteSide { sender: OperationSender, collections: CollectionsWriter, @@ -42,6 +53,9 @@ pub struct WriteSide { data_dir: PathBuf, hook_runtime: Arc, nlp_service: Arc, + + operation_counter: RwLock, + insert_batch_commit_size: u64, } impl WriteSide { @@ -54,6 +68,8 @@ impl WriteSide { ) -> WriteSide { let data_dir = config.data_dir.clone(); + let insert_batch_commit_size = config.insert_batch_commit_size; + let (sx, rx) = tokio::sync::mpsc::channel::(config.embedding_queue_limit); @@ -66,6 +82,9 @@ impl WriteSide { data_dir, hook_runtime, nlp_service, + + operation_counter: Default::default(), + insert_batch_commit_size, } } @@ -128,8 +147,12 @@ impl WriteSide { collection_id: CollectionId, document_list: DocumentList, ) -> Result<()> { - info!("Inserting batch of {} documents", document_list.len()); + let document_count = document_list.len(); + info!(?document_count, "Inserting batch of documents"); + // This counter is not atomic with the insert operation. + // This means we increment the counter even if the insert operation fails. + // TODO: think better ADDED_DOCUMENTS_COUNTER .create(AddedDocumentsLabels { collection: collection_id.0.clone(), @@ -175,6 +198,20 @@ impl WriteSide { .context("Cannot process document")?; } + let mut lock = self.operation_counter.write().await; + *lock += document_count as u64; + let should_commit = if *lock >= self.insert_batch_commit_size { + *lock = 0; + true + } else { + false + }; + drop(lock); + + if should_commit { + self.commit().await?; + } + Ok(()) } @@ -247,3 +284,15 @@ struct WriteSideInfoV1 { document_count: u64, offset: Offset, } + +fn embedding_queue_limit_default() -> usize { + 50 +} + +fn embedding_model_default() -> OramaModelSerializable { + OramaModelSerializable(crate::ai::OramaModel::BgeSmall) +} + +fn default_insert_batch_commit_size() -> u64 { + 1_000 +} diff --git a/src/file_utils.rs b/src/file_utils.rs index 0a98bd1..ab6538a 100644 --- a/src/file_utils.rs +++ b/src/file_utils.rs @@ -4,8 +4,34 @@ use std::{ }; use anyhow::{Context, Result}; +use tokio::io::AsyncWriteExt; use tracing::trace; +pub async fn create_if_not_exists_async>(p: P) -> Result<()> { + let p: PathBuf = p.as_ref().to_path_buf(); + + let output = tokio::fs::try_exists(&p).await; + match output { + Err(e) => { + return Err(anyhow::anyhow!( + "Error while checking if the directory exists: {:?}", + e + )); + } + Ok(true) => { + trace!("Directory exists. Skip creation."); + } + Ok(false) => { + trace!("Directory does not exist. Creating it."); + tokio::fs::create_dir_all(p) + .await + .context("Cannot create directory")?; + } + } + + Ok(()) +} + pub fn create_if_not_exists>(p: P) -> Result<()> { let p: PathBuf = p.as_ref().to_path_buf(); @@ -63,6 +89,33 @@ pub fn list_directory_in_path>(p: P) -> Result(path: PathBuf, data: &T) -> Result<()> { + let mut file = tokio::fs::File::create(&path) + .await + .with_context(|| format!("Cannot create file at {:?}", path))?; + let v = serde_json::to_vec(data) + .with_context(|| format!("Cannot write json data to {:?}", path))?; + file.write_all(&v) + .await + .with_context(|| format!("Cannot write json data to {:?}", path))?; + file.flush() + .await + .with_context(|| format!("Cannot flush file {:?}", path))?; + file.sync_all() + .await + .with_context(|| format!("Cannot sync_all file {:?}", path))?; + + Ok(()) +} + +pub async fn read_file(path: PathBuf) -> Result { + let vec = tokio::fs::read(&path) + .await + .with_context(|| format!("Cannot open file at {:?}", path))?; + serde_json::from_slice(&vec) + .with_context(|| format!("Cannot deserialize json data from {:?}", path)) +} + pub struct BufferedFile; impl BufferedFile { pub fn create(path: PathBuf) -> Result { diff --git a/src/indexes/string/merger.rs b/src/indexes/string/merger.rs index dd3f163..6a14d2e 100644 --- a/src/indexes/string/merger.rs +++ b/src/indexes/string/merger.rs @@ -15,7 +15,7 @@ use crate::types::DocumentId; use super::document_lengths::DocumentLengthsPerDocument; use super::posting_storage::PostingIdStorage; use super::uncommitted::{DataToCommit, Positions, TotalDocumentsWithTermInField}; -use super::CommittedStringFieldIndex; +use super::{CommittedStringFieldIndex, StringIndexFieldInfo}; struct FTSIter<'stream> { stream: Option>, @@ -37,18 +37,20 @@ impl Iterator for FTSIter<'_> { pub fn merge( data_to_commit: DataToCommit<'_>, committed: &CommittedStringFieldIndex, - document_length_new_path: PathBuf, - fst_new_path: PathBuf, - posting_new_path: PathBuf, - global_info_new_path: PathBuf, - posting_id_new_path: PathBuf, + string_index_field_info: StringIndexFieldInfo, ) -> Result<()> { + let StringIndexFieldInfo { + document_length_path, + fst_path, + posting_path, + global_info_path, + posting_id_path, + .. + } = string_index_field_info; + committed .document_lengths_per_document - .merge( - data_to_commit.get_document_lengths(), - document_length_new_path, - ) + .merge(data_to_commit.get_document_lengths(), document_length_path) .context("Cannot merge document lengths")?; let max_posting_id = committed.posting_id_generator.load(Ordering::Relaxed); @@ -59,22 +61,22 @@ pub fn merge( posting_id_generator.clone(), uncommitted_iter, committed.get_info().fst_path, - fst_new_path, + fst_path, ) .context("Cannot merge iterators")?; committed .storage - .apply_delta(storage_updates, posting_new_path)?; + .apply_delta(storage_updates, posting_path)?; let global_info = data_to_commit.global_info() + committed.get_global_info(); - BufferedFile::create(global_info_new_path) + BufferedFile::create(global_info_path) .context("Cannot create file for global info")? .write_json_data(&global_info) .context("Cannot serialize global info to file")?; let posting_id = posting_id_generator.load(Ordering::Relaxed); - BufferedFile::create(posting_id_new_path) + BufferedFile::create(posting_id_path) .context("Cannot create file for posting_id")? .write_json_data(&posting_id) .context("Cannot serialize posting_id to file")?; diff --git a/src/indexes/string/mod.rs b/src/indexes/string/mod.rs index 74d1c5e..db93ebc 100644 --- a/src/indexes/string/mod.rs +++ b/src/indexes/string/mod.rs @@ -20,7 +20,7 @@ use crate::{ sides::{InsertStringTerms, Offset}, }, field_id_hashmap::FieldIdHashMap, - file_utils::BufferedFile, + file_utils::{create_if_not_exists, BufferedFile}, types::DocumentId, }; @@ -128,7 +128,7 @@ impl StringIndex { info!("Dumping all fields {:?} at {:?}", all_fields, new_path); - std::fs::create_dir_all(new_path.clone()) + create_if_not_exists(new_path.clone()) .with_context(|| format!("Cannot create directory at {:?}", new_path))?; let mut string_index_info = StringIndexInfoV1 { @@ -169,8 +169,8 @@ impl StringIndex { .join(format!("field-{}", field_id.0)) .join(format!("offset-{}", offset.0)); - std::fs::create_dir_all(field_new_path.clone()) - .with_context(|| format!("Cannot create directory at {:?}", field_new_path))?; + create_if_not_exists(field_new_path.clone()) + .with_context(|| format!("Cannot create directory for field {:?}", field_id))?; let fst_path = field_new_path.join("fst.bin"); let posting_path = field_new_path.join("posting.bin"); @@ -178,46 +178,38 @@ impl StringIndex { let global_info_path = field_new_path.join("global_info.bin"); let posting_id_path = field_new_path.join("posting_id.bin"); + let string_index_field_info = StringIndexFieldInfo { + field_id, + fst_path, + document_length_path, + posting_path, + global_info_path, + posting_id_path, + offset, + }; + + let string_index_field_info_copy = string_index_field_info.clone(); match committed { Some(committed) => { info!("Merging field_id: {:?}", field_id); - - merger::merge( - data_to_commit, - &committed, - document_length_path.clone(), - fst_path.clone(), - posting_path.clone(), - global_info_path.clone(), - posting_id_path.clone(), - ) - .with_context(|| format!("Cannot merge field_id: {:?}", field_id))?; + merger::merge(data_to_commit, &committed, string_index_field_info_copy) + .with_context(|| format!("Cannot merge field_id: {:?}", field_id))?; } None => { info!("Dumping new field_id: {:?}", field_id); merger::create( data_to_commit, - document_length_path.clone(), - fst_path.clone(), - posting_path.clone(), - global_info_path.clone(), - posting_id_path.clone(), + string_index_field_info_copy.document_length_path, + string_index_field_info_copy.fst_path, + string_index_field_info_copy.posting_path, + string_index_field_info_copy.global_info_path, + string_index_field_info_copy.posting_id_path, ) .with_context(|| format!("Cannot create field_id: {:?}", field_id))?; } }; - let string_index_field_info = StringIndexFieldInfo { - field_id, - fst_path, - document_length_path, - posting_path, - global_info_path, - posting_id_path, - offset, - }; - let committed = CommittedStringFieldIndex::try_new(string_index_field_info.clone()) .context("Cannot reload committed field")?; @@ -280,7 +272,7 @@ impl StringIndex { Ok(()) } - pub async fn search( + pub fn search( &self, tokens: &[String], search_on: Option<&[FieldId]>, @@ -392,29 +384,25 @@ mod tests { .await?; let mut scorer = BM25Scorer::new(); - string_index - .search( - &["hello".to_string()], - None, - &Default::default(), - &mut scorer, - None, - ) - .await?; + string_index.search( + &["hello".to_string()], + None, + &Default::default(), + &mut scorer, + None, + )?; let before_output = scorer.get_scores(); string_index.commit(generate_new_path())?; let mut scorer = BM25Scorer::new(); - string_index - .search( - &["hello".to_string()], - None, - &Default::default(), - &mut scorer, - None, - ) - .await?; + string_index.search( + &["hello".to_string()], + None, + &Default::default(), + &mut scorer, + None, + )?; let after_output = scorer.get_scores(); assert_approx_eq!( @@ -437,15 +425,13 @@ mod tests { )]), )?; let mut scorer = BM25Scorer::new(); - string_index - .search( - &["hello".to_string()], - None, - &Default::default(), - &mut scorer, - None, - ) - .await?; + string_index.search( + &["hello".to_string()], + None, + &Default::default(), + &mut scorer, + None, + )?; let after_insert_output = scorer.get_scores(); assert_eq!(after_insert_output.len(), 3); @@ -456,15 +442,13 @@ mod tests { string_index.commit(generate_new_path())?; let mut scorer = BM25Scorer::new(); - string_index - .search( - &["hello".to_string()], - None, - &Default::default(), - &mut scorer, - None, - ) - .await?; + string_index.search( + &["hello".to_string()], + None, + &Default::default(), + &mut scorer, + None, + )?; let after_insert_commit_output = scorer.get_scores(); assert_eq!(after_insert_commit_output.len(), 3); @@ -495,15 +479,13 @@ mod tests { .await?; let mut scorer = BM25Scorer::new(); - string_index - .search( - &["hello".to_string()], - None, - &Default::default(), - &mut scorer, - None, - ) - .await?; + string_index.search( + &["hello".to_string()], + None, + &Default::default(), + &mut scorer, + None, + )?; let before_scores = scorer.get_scores(); let new_path = generate_new_path(); @@ -517,15 +499,13 @@ mod tests { }; let mut scorer = BM25Scorer::new(); - new_string_index - .search( - &["hello".to_string()], - None, - &Default::default(), - &mut scorer, - None, - ) - .await?; + new_string_index.search( + &["hello".to_string()], + None, + &Default::default(), + &mut scorer, + None, + )?; let scores = scorer.get_scores(); // Compare scores diff --git a/src/indexes/vector/committed.rs b/src/indexes/vector/committed.rs index 15ab7d4..ccc7310 100644 --- a/src/indexes/vector/committed.rs +++ b/src/indexes/vector/committed.rs @@ -10,7 +10,7 @@ use hora::{ index::{hnsw_idx::HNSWIndex, hnsw_params::HNSWParams}, }; use serde::{Deserialize, Serialize}; -use tracing::warn; +use tracing::{trace, warn}; use crate::{ collection_manager::sides::Offset, file_utils::create_if_not_exists, types::DocumentId, @@ -106,10 +106,13 @@ impl CommittedVectorFieldIndex { } pub fn commit(&mut self, data_dir: PathBuf) -> Result<()> { + trace!("Building index"); + self.index .build(Metric::Euclidean) .map_err(|e| anyhow!("Cannot build index: {}", e))?; + trace!("create parent dir"); let parent_dir = data_dir .parent() .ok_or_else(|| anyhow!("Cannot get parent dir"))?; @@ -121,6 +124,7 @@ impl CommittedVectorFieldIndex { return Err(anyhow!("Cannot convert path to string")); } }; + trace!("dumping index"); self.index .dump(data_dir) .map_err(|e| anyhow!("Cannot dump index: {}", e)) diff --git a/src/indexes/vector/mod.rs b/src/indexes/vector/mod.rs index 158d772..a7500ab 100644 --- a/src/indexes/vector/mod.rs +++ b/src/indexes/vector/mod.rs @@ -55,14 +55,12 @@ impl VectorIndex { if vectors.is_empty() { continue; } - let uncommitted = self .uncommitted .entry(field_id) .or_insert_with(|| UncommittedVectorFieldIndex::new(vectors[0].len(), offset)); uncommitted.insert(offset, (doc_id, vectors))?; } - Ok(()) } @@ -92,6 +90,7 @@ impl VectorIndex { info!("Committing vector index with fields: {:?}", all_field_ids); for field_id in all_field_ids { + trace!("Committing field {:?}", field_id); let uncommitted = self.uncommitted.get(&field_id); let uncommitted = match uncommitted { @@ -106,10 +105,10 @@ impl VectorIndex { let offset = taken.current_offset(); - let mut committed = self - .committed - .entry(field_id) - .or_insert_with(|| CommittedVectorFieldIndex::new(taken.dimension, offset)); + let mut committed = self.committed.entry(field_id).or_insert_with(|| { + trace!(dimentsion=?taken.dimension, "Creating new committed index"); + CommittedVectorFieldIndex::new(taken.dimension, offset) + }); for (doc_id, vectors) in taken.data() { for (_, vector) in vectors { diff --git a/src/tests.rs b/src/tests.rs index 31f0d47..3b4fe7e 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,7 +1,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use anyhow::Result; @@ -43,12 +43,18 @@ fn create_oramacore_config() -> OramacoreConfig { data_dir: generate_new_path(), embedding_queue_limit: 50, default_embedding_model: OramaModelSerializable(crate::ai::OramaModel::BgeSmall), + // Lot of tests commit to test it. + // So, we put an high value to avoid problems. + insert_batch_commit_size: 10_000, }, }, reader_side: ReadSideConfig { input: SideChannelType::InMemory, config: IndexesConfig { data_dir: generate_new_path(), + // Lot of tests commit to test it. + // So, we put an high value to avoid problems. + insert_batch_commit_size: 10_000, }, }, } @@ -1268,6 +1274,78 @@ async fn test_commit_and_load2() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn test_read_commit_should_not_block_search() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let mut config = create_oramacore_config(); + config.reader_side.config.insert_batch_commit_size = 10; + + let (write_side, read_side) = create(config.clone()).await?; + + let collection_id = CollectionId("test-collection".to_string()); + write_side + .create_collection( + json!({ + "id": collection_id.0.clone(), + "embeddings": { + "model_name": "gte-small", + "document_fields": ["name"], + }, + }) + .try_into()?, + ) + .await?; + + insert_docs( + write_side.clone(), + collection_id.clone(), + (0..1_000).map(|i| { + json!({ + "id": i.to_string(), + "text": "text ".repeat(i + 1), + }) + }), + ) + .await?; + + let commit_future = async { + sleep(Duration::from_millis(5)).await; + let commit_start = Instant::now(); + read_side.commit().await.unwrap(); + let commit_end = Instant::now(); + (commit_start, commit_end) + }; + let search_future = async { + sleep(Duration::from_millis(10)).await; + let search_start = Instant::now(); + read_side + .search( + collection_id.clone(), + json!({ + "term": "text", + }) + .try_into() + .unwrap(), + ) + .await + .unwrap(); + let search_end = Instant::now(); + (search_start, search_end) + }; + + let ((commit_start, commit_end), (search_start, search_end)) = + tokio::join!(commit_future, search_future,); + + // The commit should start before the search start + assert!(commit_start < search_start); + // The commit should end after the search start + assert!(commit_end > search_start); + // The commit should end after the search end + assert!(commit_end > search_end); + + Ok(()) +} + async fn create_collection(write_side: Arc, collection_id: CollectionId) -> Result<()> { write_side .create_collection( diff --git a/tests/search.rs b/tests/search.rs index c0d53a0..550166b 100644 --- a/tests/search.rs +++ b/tests/search.rs @@ -74,12 +74,14 @@ async fn start_server() { data_dir: generate_new_path(), embedding_queue_limit: 50, default_embedding_model: OramaModelSerializable(OramaModel::BgeSmall), + insert_batch_commit_size: 10, }, }, reader_side: ReadSideConfig { input: oramacore::SideChannelType::InMemory, config: IndexesConfig { data_dir: generate_new_path(), + insert_batch_commit_size: 10, }, }, }) diff --git a/tests/test_search_without_webserver.rs b/tests/test_search_without_webserver.rs index 904c0a0..fea2c8e 100644 --- a/tests/test_search_without_webserver.rs +++ b/tests/test_search_without_webserver.rs @@ -44,12 +44,14 @@ async fn start_server() -> Result<(Arc, Arc)> { data_dir: generate_new_path(), embedding_queue_limit: 50, default_embedding_model: OramaModelSerializable(OramaModel::BgeSmall), + insert_batch_commit_size: 10_000, }, }, reader_side: ReadSideConfig { input: oramacore::SideChannelType::InMemory, config: IndexesConfig { data_dir: generate_new_path(), + insert_batch_commit_size: 10_000, }, }, })