Skip to content

Commit

Permalink
Merge pull request #88 from oramasearch/feat/allow-search-during-commit
Browse files Browse the repository at this point in the history
Some improvements
  • Loading branch information
allevo authored Jan 24, 2025
2 parents fc88de3 + e9095c7 commit 1a4c234
Show file tree
Hide file tree
Showing 18 changed files with 500 additions and 334 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ harness = false
[features]
default = ["benchmarking"]
benchmarking = ["tempdir"]
test-python = []
no_auto_embedding_field_on_creation = []

[workspace]
members = []
79 changes: 0 additions & 79 deletions config.jsonc

This file was deleted.

7 changes: 7 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@ writer_side:
# before the writer starts to be blocked
# NB: the elements are in memory, so be careful with this value
embedding_queue_limit: 50
# The number of the document insertions after the write side will commit the changes
insert_batch_commit_size: 1_000
# The default embedding model used to calculate the embeddings
# if not specified in the collection creation
default_embedding_model: BGESmall

reader_side:
input: in-memory
config:
data_dir: ./.data/reader
# The number of the write operation after the read side will commit the changes
insert_batch_commit_size: 300

ai_server:
scheme: http
Expand Down
2 changes: 1 addition & 1 deletion prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ scrape_configs:
- job_name: node
static_configs:
- targets:
- 192.168.1.18:8080
- 192.168.1.14:8080
123 changes: 83 additions & 40 deletions src/collection_manager/sides/read/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
CollectionWriteOperation, DocumentFieldIndexOperation, Offset, OramaModelSerializable,
},
},
file_utils::BufferedFile,
file_utils::{create_or_overwrite, BufferedFile},
indexes::{
bool::BoolIndex,
number::{NumberFilter, NumberIndex, NumberIndexConfig},
Expand Down Expand Up @@ -53,14 +53,14 @@ pub struct CollectionReader {
fields: DashMap<String, (FieldId, TypedField)>,

// indexes
vector_index: VectorIndex,
vector_index: Arc<VectorIndex>,
fields_per_model: DashMap<OramaModel, Vec<FieldId>>,

string_index: StringIndex,
string_index: Arc<StringIndex>,
text_parser_per_field: DashMap<FieldId, (Locale, Arc<TextParser>)>,

number_index: NumberIndex,
bool_index: BoolIndex,
number_index: Arc<NumberIndex>,
bool_index: Arc<BoolIndex>,
// TODO: textparser -> vec<field_id>
offset_storage: OffsetStorage,
}
Expand All @@ -74,13 +74,17 @@ impl CollectionReader {
) -> Result<Self> {
let vector_index = VectorIndex::try_new(VectorIndexConfig {})
.context("Cannot create vector index during collection creation")?;
let vector_index = Arc::new(vector_index);

let string_index = StringIndex::new(StringIndexConfig {});
let string_index = Arc::new(string_index);

let number_index = NumberIndex::try_new(NumberIndexConfig {})
.context("Cannot create number index during collection creation")?;
let number_index = Arc::new(number_index);

let bool_index = BoolIndex::new();
let bool_index = Arc::new(bool_index);

Ok(Self {
id,
Expand Down Expand Up @@ -127,16 +131,19 @@ impl CollectionReader {
}

pub async fn load(&mut self, collection_data_dir: PathBuf) -> Result<()> {
self.string_index
Arc::get_mut(&mut self.string_index)
.expect("string_index is shared")
.load(collection_data_dir.join("strings"))
.context("Cannot load string index")?;
self.number_index
Arc::get_mut(&mut self.number_index)
.expect("number_index is shared")
.load(collection_data_dir.join("numbers"))
.context("Cannot load number index")?;
self.vector_index
.load(collection_data_dir.join("vectors"))
.context("Cannot load vectors index")?;
self.bool_index
Arc::get_mut(&mut self.bool_index)
.expect("bool_index is shared")
.load(collection_data_dir.join("bools"))
.context("Cannot load bool index")?;

Expand Down Expand Up @@ -183,19 +190,58 @@ impl CollectionReader {
Ok(())
}

pub fn commit(&self, data_dir: PathBuf) -> Result<()> {
self.string_index
.commit(data_dir.join("strings"))
.context("Cannot commit string index")?;
self.number_index
.commit(data_dir.join("numbers"))
.context("Cannot commit number index")?;
self.vector_index
.commit(data_dir.join("vectors"))
.context("Cannot commit vectors index")?;
self.bool_index
.commit(data_dir.join("bools"))
.context("Cannot commit bool index")?;
pub async fn commit(&self, data_dir: PathBuf) -> Result<()> {
// The following code performs the commit of the indexes on disk.
// Because we live inside a async runtime and the commit operation is blocking,
// we need to spawn a blocking task to perform the commit operation.
// Anyway, we keep the sequential order of the commit operations,
// not because it is important but because every commit operation allocates memory
// So, to avoid to allocate too much memory, we prefer to perform the commit sequentially.
// TODO: understand if we could perform the commit in parallel

let string_index = self.string_index.clone();
let string_dir = data_dir.join("strings");
tokio::task::spawn_blocking(move || {
string_index
.commit(string_dir)
.context("Cannot commit string index")
})
.await
.context("Cannot spawn blocking task")?
.context("Cannot commit string index")?;

let number_index = self.number_index.clone();
let number_dir = data_dir.join("numbers");
tokio::task::spawn_blocking(move || {
number_index
.commit(number_dir)
.context("Cannot commit number index")
})
.await
.context("Cannot spawn blocking task")?
.context("Cannot commit number index")?;

let vector_index = self.vector_index.clone();
let vector_dir = data_dir.join("vectors");
tokio::task::spawn_blocking(move || {
vector_index
.commit(vector_dir)
.context("Cannot commit vector index")
})
.await
.context("Cannot spawn blocking task")?
.context("Cannot commit vector index")?;

let bool_index = self.bool_index.clone();
let bool_dir = data_dir.join("bools");
tokio::task::spawn_blocking(move || {
bool_index
.commit(bool_dir)
.context("Cannot commit bool index")
})
.await
.context("Cannot spawn blocking task")?
.context("Cannot commit bool index")?;

let dump = dump::CollectionInfo::V1(dump::CollectionInfoV1 {
id: self.id.clone(),
Expand Down Expand Up @@ -231,10 +277,9 @@ impl CollectionReader {
});

let coll_desc_file_path = data_dir.join("info.json");
BufferedFile::create_or_overwrite(coll_desc_file_path)
.context("Cannot create info.json file")?
.write_json_data(&dump)
.with_context(|| format!("Cannot serialize collection info for {:?}", self.id))?;
create_or_overwrite(coll_desc_file_path, &dump)
.await
.context("Cannot create info.json file")?;

Ok(())
}
Expand Down Expand Up @@ -545,21 +590,19 @@ impl CollectionReader {
.entry(locale)
.or_insert_with(|| text_parser.tokenize(term));

self.string_index
.search(
tokens,
// This option is not required.
// It was introduced because for test purposes we
// could avoid to pass every properties
// Anyway the production code should always pass the properties
// So we could avoid this option
// TODO: remove this option
Some(&[field_id]),
&boost,
&mut scorer,
filtered_doc_ids.as_ref(),
)
.await?;
self.string_index.search(
tokens,
// This option is not required.
// It was introduced because for test purposes we
// could avoid to pass every properties
// Anyway the production code should always pass the properties
// So we could avoid this option
// TODO: remove this option
Some(&[field_id]),
&boost,
&mut scorer,
filtered_doc_ids.as_ref(),
)?;
}

Ok(scorer.get_scores())
Expand Down
32 changes: 16 additions & 16 deletions src/collection_manager/sides/read/collections.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::{
collections::{HashMap, HashSet},
ops::Deref,
path::PathBuf,
sync::Arc,
};

use crate::{
ai::AIService,
collection_manager::sides::Offset,
file_utils::{create_if_not_exists, BufferedFile},
file_utils::{
create_if_not_exists, create_if_not_exists_async, create_or_overwrite, BufferedFile,
},
nlp::NLPService,
offset_storage::OffsetStorage,
types::CollectionId,
Expand All @@ -19,12 +20,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::{RwLock, RwLockReadGuard};
use tracing::{info, instrument, warn};

use super::collection::CollectionReader;

#[derive(Debug, Deserialize, Clone)]
pub struct IndexesConfig {
pub data_dir: PathBuf,
}
use super::{collection::CollectionReader, IndexesConfig};

#[derive(Debug)]
pub struct CollectionsReader {
Expand Down Expand Up @@ -121,13 +117,17 @@ impl CollectionsReader {
pub async fn commit(&self) -> Result<()> {
let data_dir = &self.indexes_config.data_dir;

create_if_not_exists(data_dir).context("Cannot create data directory")?;
create_if_not_exists_async(data_dir)
.await
.context("Cannot create data directory")?;

let col = self.collections.read().await;
let col = &*col;

let collections_dir = data_dir.join("collections");
create_if_not_exists(&collections_dir).context("Cannot create 'collections' directory")?;
create_if_not_exists_async(&collections_dir)
.await
.context("Cannot create 'collections' directory")?;

let collection_ids: Vec<_> = col.keys().cloned().collect();

Expand All @@ -136,10 +136,11 @@ impl CollectionsReader {

let collection_dir = collections_dir.join(&id.0);

create_if_not_exists(&collection_dir)
create_if_not_exists_async(&collection_dir)
.await
.with_context(|| format!("Cannot create directory for collection '{}'", id.0))?;

reader.commit(collection_dir)?;
reader.commit(collection_dir).await?;

info!("Collection {:?} committed", id);
}
Expand All @@ -148,10 +149,9 @@ impl CollectionsReader {
collection_ids: collection_ids.into_iter().collect(),
});

BufferedFile::create_or_overwrite(data_dir.join("info.json"))
.context("Cannot create info.json file")?
.write_json_data(&collections_info)
.context("Cannot serialize info.json file")?;
create_or_overwrite(data_dir.join("info.json"), &collections_info)
.await
.context("Cannot create info.json file")?;

Ok(())
}
Expand Down
Loading

0 comments on commit 1a4c234

Please sign in to comment.