Skip to content

Commit

Permalink
fix(indexing)!: Node ID no longer memoized (bosun-ai#414)
Browse files Browse the repository at this point in the history
As @shamb0 pointed out in bosun-ai#392, there is a potential issue where Node
ids are get cached before chunking or other transformations, breaking
upserts and potentially resulting in data loss.

BREAKING CHANGE: This PR reworks Nodes with a builder API and a private
id. Hence, manually creating nodes no longer works. In the future, all
the fields are likely to follow the same pattern, so that we can
decouple the inner fields from the Node's implementation.
  • Loading branch information
timonv authored and shamb0 committed Oct 30, 2024
1 parent 6781ec3 commit 5c3458c
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 161 deletions.
107 changes: 91 additions & 16 deletions swiftide-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
path::PathBuf,
};

use derive_builder::Builder;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

Expand All @@ -35,36 +36,60 @@ use crate::{metadata::Metadata, util::debug_long_utf8, Embedding, SparseEmbeddin
/// `Node` encapsulates all necessary information for a single unit of data being processed
/// in the indexing pipeline. It includes fields for an identifier, file path, data chunk, optional
/// vector representation, and metadata.
#[derive(Default, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Default, Clone, Serialize, Deserialize, PartialEq, Builder)]
#[builder(setter(into, strip_option), build_fn(error = "anyhow::Error"))]
pub struct Node {
/// Optional identifier for the node.
pub id: Option<uuid::Uuid>,
/// File path associated with the node.
#[builder(default)]
pub path: PathBuf,
/// Data chunk contained in the node.
pub chunk: String,
/// Optional vector representation of embedded data.
#[builder(default)]
pub vectors: Option<HashMap<EmbeddedField, Embedding>>,
/// Optional sparse vector representation of embedded data.
#[builder(default)]
pub sparse_vectors: Option<HashMap<EmbeddedField, SparseEmbedding>>,
/// Metadata associated with the node.
#[builder(default)]
pub metadata: Metadata,
/// Mode of embedding data Chunk and Metadata
#[builder(default)]
pub embed_mode: EmbedMode,
/// Size of the input this node was originally derived from in bytes
#[builder(default)]
pub original_size: usize,
/// Offset of the chunk relative to the start of the input this node was originally derived from in bytes
#[builder(default)]
pub offset: usize,
}

impl NodeBuilder {
pub fn maybe_sparse_vectors(
&mut self,
sparse_vectors: Option<HashMap<EmbeddedField, SparseEmbedding>>,
) -> &mut Self {
self.sparse_vectors = Some(sparse_vectors);
self
}

pub fn maybe_vectors(
&mut self,
vectors: Option<HashMap<EmbeddedField, Embedding>>,
) -> &mut Self {
self.vectors = Some(vectors);
self
}
}

impl Debug for Node {
/// Formats the node for debugging purposes.
///
/// This method is used to provide a human-readable representation of the node when debugging.
/// The vector field is displayed as the number of elements in the vector if present.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Node")
.field("id", &self.id)
.field("id", &self.id())
.field("path", &self.path)
.field("chunk", &debug_long_utf8(&self.chunk, 100))
.field("metadata", &self.metadata)
Expand Down Expand Up @@ -98,6 +123,26 @@ impl Debug for Node {
}

impl Node {
/// Builds a new instance of `Node`, returning a `NodeBuilder`. Copies
/// over the fields from the provided `Node`.
pub fn build_from_other(node: &Node) -> NodeBuilder {
NodeBuilder::default()
.path(node.path.clone())
.chunk(node.chunk.clone())
.metadata(node.metadata.clone())
.maybe_vectors(node.vectors.clone())
.maybe_sparse_vectors(node.sparse_vectors.clone())
.embed_mode(node.embed_mode)
.original_size(node.original_size)
.offset(node.offset)
.to_owned()
}

/// Creates a new instance of `NodeBuilder.`
pub fn builder() -> NodeBuilder {
NodeBuilder::default()
}

/// Creates a new instance of `Node` with the specified data chunk.
///
/// The other fields are set to their default values.
Expand Down Expand Up @@ -189,21 +234,11 @@ impl Node {
/// Calculates the identifier of the node based on its path and chunk as bytes, returning a
/// UUID (v3).
///
/// If previously calculated, the identifier is returned directly. If not, a new identifier is
/// calculated without storing it.
/// WARN: Does not memoize the id. Use sparingly.
pub fn id(&self) -> uuid::Uuid {
if let Some(id) = self.id {
return id;
}

let bytes = [self.path.as_os_str().as_bytes(), self.chunk.as_bytes()].concat();
uuid::Uuid::new_v3(&uuid::Uuid::NAMESPACE_OID, &bytes)
}

/// Updates the identifier of the node.
pub fn update_id(&mut self) {
self.id = None;
self.id = Some(self.id());
uuid::Uuid::new_v3(&uuid::Uuid::NAMESPACE_OID, &bytes)
}
}

Expand Down Expand Up @@ -293,4 +328,44 @@ mod tests {
Node::new("Jürgen".repeat(100));
let _ = format!("{node:?}");
}

#[test]
fn test_build_from_other_without_vectors() {
let original_node = Node::new("test_chunk")
.with_metadata(Metadata::default())
.with_vectors(HashMap::new())
.with_sparse_vectors(HashMap::new())
.to_owned();

let builder = Node::build_from_other(&original_node);
let new_node = builder.build().unwrap();

assert_eq!(original_node, new_node);
}

#[test]
fn test_build_from_other_with_vectors() {
let mut vectors = HashMap::new();
vectors.insert(EmbeddedField::Chunk, Embedding::default());

let mut sparse_vectors = HashMap::new();
sparse_vectors.insert(
EmbeddedField::Chunk,
SparseEmbedding {
indices: vec![],
values: vec![],
},
);

let original_node = Node::new("test_chunk")
.with_metadata(Metadata::default())
.with_vectors(vectors.clone())
.with_sparse_vectors(sparse_vectors.clone())
.to_owned();

let builder = Node::build_from_other(&original_node);
let new_node = builder.build().unwrap();

assert_eq!(original_node, new_node);
}
}
24 changes: 12 additions & 12 deletions swiftide-indexing/src/loaders/file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ impl FileLoader {
tracing::debug!("Reading file: {:?}", entry);
let content = std::fs::read_to_string(&entry).unwrap();
let original_size = content.len();
Node {
path: entry,
chunk: content,
original_size,
..Default::default()
}
Node::builder()
.path(entry)
.chunk(content)
.original_size(original_size)
.build()
.expect("Failed to build node")
})
.collect()
}
Expand Down Expand Up @@ -113,12 +113,12 @@ impl Loader for FileLoader {
let content =
std::fs::read_to_string(entry.path()).context("Failed to read file")?;
let original_size = content.len();
Ok(Node {
path: entry.path().into(),
chunk: content,
original_size,
..Default::default()
})

Node::builder()
.path(entry.path())
.chunk(content)
.original_size(original_size)
.build()
});

IndexingStream::iter(files)
Expand Down
15 changes: 5 additions & 10 deletions swiftide-indexing/src/persist/memory_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use swiftide_core::{
///
/// Great for experimentation and testing.
///
/// By default the storage will use a zero indexed, incremental counter as the key for each node if the node id
/// The storage will use a zero indexed, incremental counter as the key for each node if the node id
/// is not set.
pub struct MemoryStorage {
data: Arc<RwLock<HashMap<String, Node>>>,
Expand All @@ -27,11 +27,8 @@ pub struct MemoryStorage {
}

impl MemoryStorage {
async fn key(&self, node: &Node) -> String {
match node.id {
Some(id) => id.to_string(),
None => (*self.node_count.read().await).to_string(),
}
async fn key(&self) -> String {
self.node_count.read().await.to_string()
}

/// Retrieve a node by its key
Expand Down Expand Up @@ -65,12 +62,10 @@ impl Persist for MemoryStorage {
///
/// If the node does not have an id, a simple counter is used as the key.
async fn store(&self, node: Node) -> Result<Node> {
let key = self.key(&node).await;
let key = self.key().await;
self.data.write().await.insert(key, node.clone());

if node.id.is_none() {
*self.node_count.write().await += 1;
}
(*self.node_count.write().await) += 1;
Ok(node)
}

Expand Down
10 changes: 2 additions & 8 deletions swiftide-indexing/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,10 +777,7 @@ mod tests {
.returning(|| {
vec![
Ok(Node::default()),
Ok(Node {
chunk: "skip".to_string(),
..Node::default()
}),
Ok(Node::new("skip")),
Ok(Node::default()),
]
.into()
Expand Down Expand Up @@ -808,10 +805,7 @@ mod tests {
.returning(|| {
vec![
Ok(Node::default()),
Ok(Node {
chunk: "will go left".to_string(),
..Node::default()
}),
Ok(Node::new("will go left")),
Ok(Node::default()),
]
.into()
Expand Down
21 changes: 7 additions & 14 deletions swiftide-indexing/src/transformers/chunk_markdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,11 @@ impl ChunkerTransformer for ChunkMarkdown {
})
.collect::<Vec<String>>();

IndexingStream::iter(chunks.into_iter().map(move |chunk| {
Ok(Node {
chunk,
..node.clone()
})
}))
IndexingStream::iter(
chunks
.into_iter()
.map(move |chunk| Node::build_from_other(&node).chunk(chunk).build()),
)
}

fn concurrency(&self) -> Option<usize> {
Expand Down Expand Up @@ -164,10 +163,7 @@ mod test {
async fn test_transforming_with_max_characters_and_trimming() {
let chunker = ChunkMarkdown::from_max_characters(40);

let node = Node {
chunk: MARKDOWN.to_string(),
..Node::default()
};
let node = Node::new(MARKDOWN.to_string());

let nodes: Vec<Node> = chunker
.transform_node(node)
Expand All @@ -192,10 +188,7 @@ mod test {
let ranges = vec![(10..15), (20..25), (30..35), (40..45), (50..55)];
for range in ranges {
let chunker = ChunkMarkdown::from_chunk_range(range.clone());
let node = Node {
chunk: MARKDOWN.to_string(),
..Node::default()
};
let node = Node::new(MARKDOWN.to_string());
let nodes: Vec<Node> = chunker
.transform_node(node)
.await
Expand Down
21 changes: 7 additions & 14 deletions swiftide-indexing/src/transformers/chunk_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ impl ChunkerTransformer for ChunkText {
})
.collect::<Vec<String>>();

IndexingStream::iter(chunks.into_iter().map(move |chunk| {
Ok(Node {
chunk,
..node.clone()
})
}))
IndexingStream::iter(
chunks
.into_iter()
.map(move |chunk| Node::build_from_other(&node).chunk(chunk).build()),
)
}

fn concurrency(&self) -> Option<usize> {
Expand All @@ -151,10 +150,7 @@ mod test {
async fn test_transforming_with_max_characters_and_trimming() {
let chunker = ChunkText::from_max_characters(40);

let node = Node {
chunk: TEXT.to_string(),
..Node::default()
};
let node = Node::new(TEXT.to_string());

let nodes: Vec<Node> = chunker
.transform_node(node)
Expand All @@ -175,10 +171,7 @@ mod test {
let ranges = vec![(10..15), (20..25), (30..35), (40..45), (50..55)];
for range in ranges {
let chunker = ChunkText::from_chunk_range(range.clone());
let node = Node {
chunk: TEXT.to_string(),
..Node::default()
};
let node = Node::new(TEXT.to_string());
let nodes: Vec<Node> = chunker
.transform_node(node)
.await
Expand Down
12 changes: 7 additions & 5 deletions swiftide-indexing/src/transformers/embed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,13 @@ mod tests {
async fn batch_transform(test_data: Vec<TestData<'_>>) {
let test_nodes: Vec<Node> = test_data
.iter()
.map(|data| Node {
chunk: data.chunk.into(),
metadata: data.metadata.clone(),
embed_mode: data.embed_mode,
..Default::default()
.map(|data| {
Node::builder()
.chunk(data.chunk)
.metadata(data.metadata.clone())
.embed_mode(data.embed_mode)
.build()
.unwrap()
})
.collect();

Expand Down
12 changes: 7 additions & 5 deletions swiftide-indexing/src/transformers/sparse_embed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,13 @@ mod tests {
async fn batch_transform(test_data: Vec<TestData<'_>>) {
let test_nodes: Vec<Node> = test_data
.iter()
.map(|data| Node {
chunk: data.chunk.into(),
metadata: data.metadata.clone(),
embed_mode: data.embed_mode,
..Default::default()
.map(|data| {
Node::builder()
.chunk(data.chunk)
.metadata(data.metadata.clone())
.embed_mode(data.embed_mode)
.build()
.unwrap()
})
.collect();

Expand Down
Loading

0 comments on commit 5c3458c

Please sign in to comment.