From 5c3458ce4005cd51067284458ab84df346480756 Mon Sep 17 00:00:00 2001 From: Timon Vonk Date: Sun, 27 Oct 2024 15:45:40 +0100 Subject: [PATCH] fix(indexing)!: Node ID no longer memoized (#414) As @shamb0 pointed out in #392, there is a potential issue where Node ids are get cached before chunking or other transformations, breaking upserts and potentially resulting in data loss. BREAKING CHANGE: This PR reworks Nodes with a builder API and a private id. Hence, manually creating nodes no longer works. In the future, all the fields are likely to follow the same pattern, so that we can decouple the inner fields from the Node's implementation. --- swiftide-core/src/node.rs | 107 +++++++++++++++--- swiftide-indexing/src/loaders/file_loader.rs | 24 ++-- .../src/persist/memory_storage.rs | 15 +-- swiftide-indexing/src/pipeline.rs | 10 +- .../src/transformers/chunk_markdown.rs | 21 ++-- .../src/transformers/chunk_text.rs | 21 ++-- swiftide-indexing/src/transformers/embed.rs | 12 +- .../src/transformers/sparse_embed.rs | 12 +- .../src/qdrant/indexing_node.rs | 52 ++++----- swiftide-integrations/src/qdrant/mod.rs | 2 +- swiftide-integrations/src/redis/node_cache.rs | 7 +- swiftide-integrations/src/redis/persist.rs | 25 +--- .../scraping/html_to_markdown_transformer.rs | 8 +- swiftide-integrations/src/scraping/loader.rs | 17 ++- .../src/treesitter/chunk_code.rs | 14 ++- swiftide-test-utils/src/test_utils.rs | 2 +- 16 files changed, 188 insertions(+), 161 deletions(-) diff --git a/swiftide-core/src/node.rs b/swiftide-core/src/node.rs index ed740313..9cecde6d 100644 --- a/swiftide-core/src/node.rs +++ b/swiftide-core/src/node.rs @@ -25,6 +25,7 @@ use std::{ path::PathBuf, }; +use derive_builder::Builder; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -35,28 +36,52 @@ use crate::{metadata::Metadata, util::debug_long_utf8, Embedding, SparseEmbeddin /// `Node` encapsulates all necessary information for a single unit of data being processed /// in the indexing pipeline. It includes fields for an identifier, file path, data chunk, optional /// vector representation, and metadata. -#[derive(Default, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Default, Clone, Serialize, Deserialize, PartialEq, Builder)] +#[builder(setter(into, strip_option), build_fn(error = "anyhow::Error"))] pub struct Node { - /// Optional identifier for the node. - pub id: Option, /// File path associated with the node. + #[builder(default)] pub path: PathBuf, /// Data chunk contained in the node. pub chunk: String, /// Optional vector representation of embedded data. + #[builder(default)] pub vectors: Option>, /// Optional sparse vector representation of embedded data. + #[builder(default)] pub sparse_vectors: Option>, /// Metadata associated with the node. + #[builder(default)] pub metadata: Metadata, /// Mode of embedding data Chunk and Metadata + #[builder(default)] pub embed_mode: EmbedMode, /// Size of the input this node was originally derived from in bytes + #[builder(default)] pub original_size: usize, /// Offset of the chunk relative to the start of the input this node was originally derived from in bytes + #[builder(default)] pub offset: usize, } +impl NodeBuilder { + pub fn maybe_sparse_vectors( + &mut self, + sparse_vectors: Option>, + ) -> &mut Self { + self.sparse_vectors = Some(sparse_vectors); + self + } + + pub fn maybe_vectors( + &mut self, + vectors: Option>, + ) -> &mut Self { + self.vectors = Some(vectors); + self + } +} + impl Debug for Node { /// Formats the node for debugging purposes. /// @@ -64,7 +89,7 @@ impl Debug for Node { /// The vector field is displayed as the number of elements in the vector if present. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Node") - .field("id", &self.id) + .field("id", &self.id()) .field("path", &self.path) .field("chunk", &debug_long_utf8(&self.chunk, 100)) .field("metadata", &self.metadata) @@ -98,6 +123,26 @@ impl Debug for Node { } impl Node { + /// Builds a new instance of `Node`, returning a `NodeBuilder`. Copies + /// over the fields from the provided `Node`. + pub fn build_from_other(node: &Node) -> NodeBuilder { + NodeBuilder::default() + .path(node.path.clone()) + .chunk(node.chunk.clone()) + .metadata(node.metadata.clone()) + .maybe_vectors(node.vectors.clone()) + .maybe_sparse_vectors(node.sparse_vectors.clone()) + .embed_mode(node.embed_mode) + .original_size(node.original_size) + .offset(node.offset) + .to_owned() + } + + /// Creates a new instance of `NodeBuilder.` + pub fn builder() -> NodeBuilder { + NodeBuilder::default() + } + /// Creates a new instance of `Node` with the specified data chunk. /// /// The other fields are set to their default values. @@ -189,21 +234,11 @@ impl Node { /// Calculates the identifier of the node based on its path and chunk as bytes, returning a /// UUID (v3). /// - /// If previously calculated, the identifier is returned directly. If not, a new identifier is - /// calculated without storing it. + /// WARN: Does not memoize the id. Use sparingly. pub fn id(&self) -> uuid::Uuid { - if let Some(id) = self.id { - return id; - } - let bytes = [self.path.as_os_str().as_bytes(), self.chunk.as_bytes()].concat(); - uuid::Uuid::new_v3(&uuid::Uuid::NAMESPACE_OID, &bytes) - } - /// Updates the identifier of the node. - pub fn update_id(&mut self) { - self.id = None; - self.id = Some(self.id()); + uuid::Uuid::new_v3(&uuid::Uuid::NAMESPACE_OID, &bytes) } } @@ -293,4 +328,44 @@ mod tests { Node::new("Jürgen".repeat(100)); let _ = format!("{node:?}"); } + + #[test] + fn test_build_from_other_without_vectors() { + let original_node = Node::new("test_chunk") + .with_metadata(Metadata::default()) + .with_vectors(HashMap::new()) + .with_sparse_vectors(HashMap::new()) + .to_owned(); + + let builder = Node::build_from_other(&original_node); + let new_node = builder.build().unwrap(); + + assert_eq!(original_node, new_node); + } + + #[test] + fn test_build_from_other_with_vectors() { + let mut vectors = HashMap::new(); + vectors.insert(EmbeddedField::Chunk, Embedding::default()); + + let mut sparse_vectors = HashMap::new(); + sparse_vectors.insert( + EmbeddedField::Chunk, + SparseEmbedding { + indices: vec![], + values: vec![], + }, + ); + + let original_node = Node::new("test_chunk") + .with_metadata(Metadata::default()) + .with_vectors(vectors.clone()) + .with_sparse_vectors(sparse_vectors.clone()) + .to_owned(); + + let builder = Node::build_from_other(&original_node); + let new_node = builder.build().unwrap(); + + assert_eq!(original_node, new_node); + } } diff --git a/swiftide-indexing/src/loaders/file_loader.rs b/swiftide-indexing/src/loaders/file_loader.rs index 7198ca9f..f05d4ff1 100644 --- a/swiftide-indexing/src/loaders/file_loader.rs +++ b/swiftide-indexing/src/loaders/file_loader.rs @@ -72,12 +72,12 @@ impl FileLoader { tracing::debug!("Reading file: {:?}", entry); let content = std::fs::read_to_string(&entry).unwrap(); let original_size = content.len(); - Node { - path: entry, - chunk: content, - original_size, - ..Default::default() - } + Node::builder() + .path(entry) + .chunk(content) + .original_size(original_size) + .build() + .expect("Failed to build node") }) .collect() } @@ -113,12 +113,12 @@ impl Loader for FileLoader { let content = std::fs::read_to_string(entry.path()).context("Failed to read file")?; let original_size = content.len(); - Ok(Node { - path: entry.path().into(), - chunk: content, - original_size, - ..Default::default() - }) + + Node::builder() + .path(entry.path()) + .chunk(content) + .original_size(original_size) + .build() }); IndexingStream::iter(files) diff --git a/swiftide-indexing/src/persist/memory_storage.rs b/swiftide-indexing/src/persist/memory_storage.rs index 95d3249a..222e6fab 100644 --- a/swiftide-indexing/src/persist/memory_storage.rs +++ b/swiftide-indexing/src/persist/memory_storage.rs @@ -16,7 +16,7 @@ use swiftide_core::{ /// /// Great for experimentation and testing. /// -/// By default the storage will use a zero indexed, incremental counter as the key for each node if the node id +/// The storage will use a zero indexed, incremental counter as the key for each node if the node id /// is not set. pub struct MemoryStorage { data: Arc>>, @@ -27,11 +27,8 @@ pub struct MemoryStorage { } impl MemoryStorage { - async fn key(&self, node: &Node) -> String { - match node.id { - Some(id) => id.to_string(), - None => (*self.node_count.read().await).to_string(), - } + async fn key(&self) -> String { + self.node_count.read().await.to_string() } /// Retrieve a node by its key @@ -65,12 +62,10 @@ impl Persist for MemoryStorage { /// /// If the node does not have an id, a simple counter is used as the key. async fn store(&self, node: Node) -> Result { - let key = self.key(&node).await; + let key = self.key().await; self.data.write().await.insert(key, node.clone()); - if node.id.is_none() { - *self.node_count.write().await += 1; - } + (*self.node_count.write().await) += 1; Ok(node) } diff --git a/swiftide-indexing/src/pipeline.rs b/swiftide-indexing/src/pipeline.rs index d1a9988e..803476bf 100644 --- a/swiftide-indexing/src/pipeline.rs +++ b/swiftide-indexing/src/pipeline.rs @@ -777,10 +777,7 @@ mod tests { .returning(|| { vec![ Ok(Node::default()), - Ok(Node { - chunk: "skip".to_string(), - ..Node::default() - }), + Ok(Node::new("skip")), Ok(Node::default()), ] .into() @@ -808,10 +805,7 @@ mod tests { .returning(|| { vec![ Ok(Node::default()), - Ok(Node { - chunk: "will go left".to_string(), - ..Node::default() - }), + Ok(Node::new("will go left")), Ok(Node::default()), ] .into() diff --git a/swiftide-indexing/src/transformers/chunk_markdown.rs b/swiftide-indexing/src/transformers/chunk_markdown.rs index b82c5636..a03f2edc 100644 --- a/swiftide-indexing/src/transformers/chunk_markdown.rs +++ b/swiftide-indexing/src/transformers/chunk_markdown.rs @@ -128,12 +128,11 @@ impl ChunkerTransformer for ChunkMarkdown { }) .collect::>(); - IndexingStream::iter(chunks.into_iter().map(move |chunk| { - Ok(Node { - chunk, - ..node.clone() - }) - })) + IndexingStream::iter( + chunks + .into_iter() + .map(move |chunk| Node::build_from_other(&node).chunk(chunk).build()), + ) } fn concurrency(&self) -> Option { @@ -164,10 +163,7 @@ mod test { async fn test_transforming_with_max_characters_and_trimming() { let chunker = ChunkMarkdown::from_max_characters(40); - let node = Node { - chunk: MARKDOWN.to_string(), - ..Node::default() - }; + let node = Node::new(MARKDOWN.to_string()); let nodes: Vec = chunker .transform_node(node) @@ -192,10 +188,7 @@ mod test { let ranges = vec![(10..15), (20..25), (30..35), (40..45), (50..55)]; for range in ranges { let chunker = ChunkMarkdown::from_chunk_range(range.clone()); - let node = Node { - chunk: MARKDOWN.to_string(), - ..Node::default() - }; + let node = Node::new(MARKDOWN.to_string()); let nodes: Vec = chunker .transform_node(node) .await diff --git a/swiftide-indexing/src/transformers/chunk_text.rs b/swiftide-indexing/src/transformers/chunk_text.rs index 1af33ab7..d1f1d0cc 100644 --- a/swiftide-indexing/src/transformers/chunk_text.rs +++ b/swiftide-indexing/src/transformers/chunk_text.rs @@ -121,12 +121,11 @@ impl ChunkerTransformer for ChunkText { }) .collect::>(); - IndexingStream::iter(chunks.into_iter().map(move |chunk| { - Ok(Node { - chunk, - ..node.clone() - }) - })) + IndexingStream::iter( + chunks + .into_iter() + .map(move |chunk| Node::build_from_other(&node).chunk(chunk).build()), + ) } fn concurrency(&self) -> Option { @@ -151,10 +150,7 @@ mod test { async fn test_transforming_with_max_characters_and_trimming() { let chunker = ChunkText::from_max_characters(40); - let node = Node { - chunk: TEXT.to_string(), - ..Node::default() - }; + let node = Node::new(TEXT.to_string()); let nodes: Vec = chunker .transform_node(node) @@ -175,10 +171,7 @@ mod test { let ranges = vec![(10..15), (20..25), (30..35), (40..45), (50..55)]; for range in ranges { let chunker = ChunkText::from_chunk_range(range.clone()); - let node = Node { - chunk: TEXT.to_string(), - ..Node::default() - }; + let node = Node::new(TEXT.to_string()); let nodes: Vec = chunker .transform_node(node) .await diff --git a/swiftide-indexing/src/transformers/embed.rs b/swiftide-indexing/src/transformers/embed.rs index 003492d7..bd1a29f0 100644 --- a/swiftide-indexing/src/transformers/embed.rs +++ b/swiftide-indexing/src/transformers/embed.rs @@ -254,11 +254,13 @@ mod tests { async fn batch_transform(test_data: Vec>) { let test_nodes: Vec = test_data .iter() - .map(|data| Node { - chunk: data.chunk.into(), - metadata: data.metadata.clone(), - embed_mode: data.embed_mode, - ..Default::default() + .map(|data| { + Node::builder() + .chunk(data.chunk) + .metadata(data.metadata.clone()) + .embed_mode(data.embed_mode) + .build() + .unwrap() }) .collect(); diff --git a/swiftide-indexing/src/transformers/sparse_embed.rs b/swiftide-indexing/src/transformers/sparse_embed.rs index ba8e2934..528a7fbc 100644 --- a/swiftide-indexing/src/transformers/sparse_embed.rs +++ b/swiftide-indexing/src/transformers/sparse_embed.rs @@ -255,11 +255,13 @@ mod tests { async fn batch_transform(test_data: Vec>) { let test_nodes: Vec = test_data .iter() - .map(|data| Node { - chunk: data.chunk.into(), - metadata: data.metadata.clone(), - embed_mode: data.embed_mode, - ..Default::default() + .map(|data| { + Node::builder() + .chunk(data.chunk) + .metadata(data.metadata.clone()) + .embed_mode(data.embed_mode) + .build() + .unwrap() }) .collect(); diff --git a/swiftide-integrations/src/qdrant/indexing_node.rs b/swiftide-integrations/src/qdrant/indexing_node.rs index e58c0c93..48ac54b9 100644 --- a/swiftide-integrations/src/qdrant/indexing_node.rs +++ b/swiftide-integrations/src/qdrant/indexing_node.rs @@ -116,7 +116,7 @@ mod tests { use qdrant_client::qdrant::{ vectors::VectorsOptions, NamedVectors, PointId, PointStruct, Value, Vector, Vectors, }; - use swiftide_core::indexing::{EmbeddedField, Metadata, Node}; + use swiftide_core::indexing::{EmbeddedField, Node}; use test_case::test_case; use crate::qdrant::indexing_node::NodeWithVectors; @@ -124,14 +124,14 @@ mod tests { static EXPECTED_UUID: &str = "d42d252d-671d-37ef-a157-8e85d0710610"; #[test_case( - Node { id: None, path: "/path".into(), chunk: "data".into(), - vectors: Some(HashMap::from([(EmbeddedField::Chunk, vec![1.0])])), - original_size: 4, - offset: 0, - metadata: Metadata::from([("m1", "mv1")]), - embed_mode: swiftide_core::indexing::EmbedMode::SingleWithMetadata, - ..Default::default() - }, + Node::builder() + .path("/path") + .chunk("data") + .vectors([(EmbeddedField::Chunk, vec![1.0])]) + .metadata([("m1", "mv1")]) + .embed_mode(swiftide_core::indexing::EmbedMode::SingleWithMetadata) + .build().unwrap() + , HashSet::from([EmbeddedField::Combined]), PointStruct { id: Some(PointId::from(EXPECTED_UUID)), payload: HashMap::from([ ("content".into(), Value::from("data")), @@ -142,17 +142,16 @@ mod tests { "Node with single vector creates struct with unnamed vector" )] #[test_case( - Node { id: None, path: "/path".into(), chunk: "data".into(), - vectors: Some(HashMap::from([ + Node::builder() + .path("/path") + .chunk("data") + .vectors([ (EmbeddedField::Chunk, vec![1.0]), (EmbeddedField::Metadata("m1".into()), vec![2.0]) - ])), - metadata: Metadata::from([("m1", "mv1")]), - embed_mode: swiftide_core::indexing::EmbedMode::PerField, - original_size: 4, - offset: 0, - ..Default::default() - }, + ]) + .metadata([("m1", "mv1")]) + .embed_mode(swiftide_core::indexing::EmbedMode::PerField) + .build().unwrap(), HashSet::from([EmbeddedField::Chunk, EmbeddedField::Metadata("m1".into())]), PointStruct { id: Some(PointId::from(EXPECTED_UUID)), payload: HashMap::from([ ("content".into(), Value::from("data")), @@ -170,19 +169,18 @@ mod tests { "Node with multiple vectors creates struct with named vectors" )] #[test_case( - Node { id: None, path: "/path".into(), chunk: "data".into(), - vectors: Some(HashMap::from([ + Node::builder() + .path("/path") + .chunk("data") + .vectors([ (EmbeddedField::Chunk, vec![1.0]), (EmbeddedField::Combined, vec![1.0]), (EmbeddedField::Metadata("m1".into()), vec![1.0]), (EmbeddedField::Metadata("m2".into()), vec![2.0]) - ])), - metadata: Metadata::from([("m1", "mv1"), ("m2", "mv2")]), - embed_mode: swiftide_core::indexing::EmbedMode::Both, - original_size: 4, - offset: 0, - ..Default::default() - }, + ]) + .metadata([("m1", "mv1"), ("m2", "mv2")]) + .embed_mode(swiftide_core::indexing::EmbedMode::Both) + .build().unwrap(), HashSet::from([EmbeddedField::Combined]), PointStruct { id: Some(PointId::from(EXPECTED_UUID)), payload: HashMap::from([ ("content".into(), Value::from("data")), diff --git a/swiftide-integrations/src/qdrant/mod.rs b/swiftide-integrations/src/qdrant/mod.rs index 0ba0b695..f81ed204 100644 --- a/swiftide-integrations/src/qdrant/mod.rs +++ b/swiftide-integrations/src/qdrant/mod.rs @@ -118,7 +118,7 @@ impl Qdrant { tracing::debug!(?sparse_vectors_config, "Adding sparse vectors config"); collection = collection.sparse_vectors_config(sparse_vectors_config); } - tracing::warn!("Creating collection"); + tracing::warn!("Creating collection {}", &self.collection_name); self.client.create_collection(collection).await?; Ok(()) diff --git a/swiftide-integrations/src/redis/node_cache.rs b/swiftide-integrations/src/redis/node_cache.rs index 716c4c0f..bb7b314c 100644 --- a/swiftide-integrations/src/redis/node_cache.rs +++ b/swiftide-integrations/src/redis/node_cache.rs @@ -118,12 +118,7 @@ mod tests { .expect("Could not build redis client"); cache.reset_cache().await; - let node = Node { - id: None, - path: "test".into(), - chunk: "chunk".into(), - ..Default::default() - }; + let node = Node::new("chunk"); let before_cache = cache.get(&node).await; assert!(!before_cache); diff --git a/swiftide-integrations/src/redis/persist.rs b/swiftide-integrations/src/redis/persist.rs index 85bc79fc..891cc907 100644 --- a/swiftide-integrations/src/redis/persist.rs +++ b/swiftide-integrations/src/redis/persist.rs @@ -108,12 +108,7 @@ mod tests { .build() .unwrap(); - let node = Node { - id: None, - path: "test".into(), - chunk: "chunk".into(), - ..Default::default() - }; + let node = Node::new("chunk"); redis.store(node.clone()).await.unwrap(); let stored_node = serde_json::from_str(&redis.get_node(&node).await.unwrap().unwrap()); @@ -132,18 +127,7 @@ mod tests { .batch_size(20) .build() .unwrap(); - let nodes = vec![ - Node { - id: None, - path: "test".into(), - ..Default::default() - }, - Node { - id: None, - path: "other".into(), - ..Default::default() - }, - ]; + let nodes = vec![Node::new("test"), Node::new("other")]; let stream = redis.batch_store(nodes).await; let streamed_nodes: Vec = stream.try_collect().await.unwrap(); @@ -167,10 +151,7 @@ mod tests { .persist_value_fn(|_node| Ok("hello world".to_string())) .build() .unwrap(); - let node = Node { - id: None, - ..Default::default() - }; + let node = Node::default(); redis.store(node.clone()).await.unwrap(); let stored_node = redis.get_node(&node).await.unwrap(); diff --git a/swiftide-integrations/src/scraping/html_to_markdown_transformer.rs b/swiftide-integrations/src/scraping/html_to_markdown_transformer.rs index 0f2d2790..83d74053 100644 --- a/swiftide-integrations/src/scraping/html_to_markdown_transformer.rs +++ b/swiftide-integrations/src/scraping/html_to_markdown_transformer.rs @@ -44,11 +44,9 @@ impl Transformer for HtmlToMarkdownTransformer { /// Will Err the node if the conversion fails. #[tracing::instrument(skip_all, name = "transformer.html_to_markdown")] async fn transform_node(&self, node: Node) -> Result { - let chunk = self.htmd.convert(&node.chunk); - Ok(Node { - chunk: chunk?, - ..node - }) + let chunk = self.htmd.convert(&node.chunk)?; + + Node::build_from_other(&node).chunk(chunk).build() } fn concurrency(&self) -> Option { diff --git a/swiftide-integrations/src/scraping/loader.rs b/swiftide-integrations/src/scraping/loader.rs index 687e0d3e..509c57a1 100644 --- a/swiftide-integrations/src/scraping/loader.rs +++ b/swiftide-integrations/src/scraping/loader.rs @@ -55,15 +55,14 @@ impl Loader for ScrapingLoader { while let Ok(res) = spider_rx.recv().await { let html = res.get_html(); let original_size = html.len(); - let node = Node { - chunk: html, - original_size, - // TODO: Probably not the best way to represent this - // and will fail. Can we add more metadata too? - path: res.get_url().into(), - ..Default::default() - }; - if tx.send(Ok(node)).is_err() { + + let node = Node::builder() + .chunk(html) + .original_size(original_size) + .path(res.get_url()) + .build(); + + if tx.send(node).is_err() { break; } } diff --git a/swiftide-integrations/src/treesitter/chunk_code.rs b/swiftide-integrations/src/treesitter/chunk_code.rs index 7b7cd062..5f9fb2f6 100644 --- a/swiftide-integrations/src/treesitter/chunk_code.rs +++ b/swiftide-integrations/src/treesitter/chunk_code.rs @@ -109,13 +109,15 @@ impl ChunkerTransformer for ChunkCode { IndexingStream::iter(split.into_iter().map(move |chunk| { let chunk_size = chunk.len(); - let mut node = Node { - chunk, - ..node.clone() - }; - node.offset = offset; + + let node = Node::build_from_other(&node) + .chunk(chunk) + .offset(offset) + .build(); + offset += chunk_size; - Ok(node) + + node })) } else { // Send the error downstream diff --git a/swiftide-test-utils/src/test_utils.rs b/swiftide-test-utils/src/test_utils.rs index f8644071..54a43caa 100644 --- a/swiftide-test-utils/src/test_utils.rs +++ b/swiftide-test-utils/src/test_utils.rs @@ -36,7 +36,7 @@ pub fn openai_client( /// Setup Qdrant container. /// Returns container server and `server_url`. pub async fn start_qdrant() -> (ContainerAsync, String) { - let qdrant = testcontainers::GenericImage::new("qdrant/qdrant", "v1.11.3") + let qdrant = testcontainers::GenericImage::new("qdrant/qdrant", "v1.12.1") .with_exposed_port(6334.into()) .with_exposed_port(6333.into()) .with_wait_for(testcontainers::core::WaitFor::http(