From 9c03a8450882468ecc4e094ddc7694f5f8b32f96 Mon Sep 17 00:00:00 2001 From: rsdy Date: Thu, 7 Sep 2023 16:24:39 +0200 Subject: [PATCH] Ensure consistency of semantic & tantivy & caches (#896) * Simplify caching dependencies * Moving closer to the parent obj * Synchronize caches & all databases * Comments & renamings * Address review comments --- server/bleep/src/cache.rs | 321 +++++++++++++++++++++++++------ server/bleep/src/indexes/file.rs | 249 ++++++++++-------------- server/bleep/src/semantic.rs | 47 ++--- 3 files changed, 379 insertions(+), 238 deletions(-) diff --git a/server/bleep/src/cache.rs b/server/bleep/src/cache.rs index 5acd714871..3a6512a3d1 100644 --- a/server/bleep/src/cache.rs +++ b/server/bleep/src/cache.rs @@ -1,11 +1,15 @@ use std::{ + collections::HashSet, + ops::Deref, sync::{Arc, RwLock}, time::Instant, }; use qdrant_client::qdrant::{PointId, PointStruct}; +use rayon::prelude::ParallelIterator; +use scc::hash_map::Entry; use sqlx::Sqlite; -use tracing::{error, trace}; +use tracing::{error, info, trace, warn}; use uuid::Uuid; use crate::{ @@ -19,12 +23,21 @@ use crate::{ use super::db::SqlDb; #[derive(serde::Serialize, serde::Deserialize, Eq)] -pub(crate) struct FreshValue { +pub struct FreshValue { // default value is `false` on deserialize pub(crate) fresh: bool, pub(crate) value: T, } +impl FreshValue { + fn fresh_default() -> Self { + Self { + fresh: true, + value: Default::default(), + } + } +} + impl PartialEq for FreshValue where T: PartialEq, @@ -52,7 +65,75 @@ impl From for FreshValue { /// Snapshot of the current state of a FileCache /// Since it's atomically (as in ACID) read from SQLite, this will be /// representative at a single point in time -pub(crate) type FileCacheSnapshot = Arc>>; +pub struct FileCacheSnapshot<'a> { + snapshot: Arc>>, + parent: &'a FileCache<'a>, +} + +/// CacheKeys unifies the different keys to different databases. +/// +/// Different layers of cache use different keys. +/// +/// Tantivy keys are more specific. Since in Tantivy we can't update +/// an existing record, all cache keys identify a record in the +/// database universally (as in, in space & time). +/// +/// In QDrant, however, it is possible to update existing records, +/// therefore the cache key is less strong. We use the weaker key to +/// identify existing, similar records, and update them with a +/// refreshed property set. +/// +/// For the specific calculation of what goes into these keys, take a +/// look at +/// [`Workload::cache_keys`][crate::indexes::file::Workload::cache_keys] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct CacheKeys(String, String); + +impl CacheKeys { + pub fn new(semantic: impl Into, tantivy: impl Into) -> Self { + Self(semantic.into(), tantivy.into()) + } + + pub fn tantivy(&self) -> &str { + &self.1 + } + + pub fn semantic(&self) -> &str { + &self.0 + } +} + +impl<'a> FileCacheSnapshot<'a> { + pub(crate) fn parent(&'a self) -> &'a FileCache<'a> { + self.parent + } + + #[tracing::instrument(skip(self))] + pub(crate) fn is_fresh(&self, keys: &CacheKeys) -> bool { + match self.snapshot.entry(keys.clone()) { + Entry::Occupied(mut val) => { + val.get_mut().fresh = true; + + trace!("cache hit"); + true + } + Entry::Vacant(val) => { + _ = val.insert_entry(FreshValue::fresh_default()); + + trace!("cache miss"); + false + } + } + } +} + +impl<'a> Deref for FileCacheSnapshot<'a> { + type Target = scc::HashMap>; + + fn deref(&self) -> &Self::Target { + &self.snapshot + } +} /// Manage the SQL cache for a repository, establishing a /// content-addressed space for files in it. @@ -82,7 +163,8 @@ impl<'a> FileCache<'a> { } } - pub(crate) async fn retrieve(&self) -> FileCacheSnapshot { + /// Retrieve a file-level snapshot of the cache for the repository in scope. + pub(crate) async fn retrieve(&'a self) -> FileCacheSnapshot<'a> { let repo_str = self.reporef.to_string(); let rows = sqlx::query! { "SELECT cache_hash FROM file_cache \ @@ -94,40 +176,107 @@ impl<'a> FileCache<'a> { let output = scc::HashMap::default(); for row in rows.into_iter().flatten() { - _ = output.insert(row.cache_hash, FreshValue::stale(())); + let (semantic_hash, tantivy_hash) = row.cache_hash.split_at(64); + _ = output.insert( + CacheKeys::new(semantic_hash, tantivy_hash), + FreshValue::stale(()), + ); } - output.into() + FileCacheSnapshot { + parent: self, + snapshot: output.into(), + } } - pub(crate) async fn persist(&self, cache: FileCacheSnapshot) -> anyhow::Result<()> { + /// Synchronize the cache and DBs. + /// + /// `delete_tantivy` is a callback that takes a single key and + /// records the delete operation in a Tantivy writer. + /// + /// Semantic deletions are handled internally. + pub(crate) async fn synchronize( + &'a self, + cache: FileCacheSnapshot<'a>, + delete_tantivy: impl Fn(&str), + ) -> anyhow::Result<()> { let mut tx = self.db.begin().await?; self.delete_files(&mut tx).await?; - let keys = { - let mut keys = vec![]; - cache.scan_async(|k, _v| keys.push(k.clone())).await; - keys + // files that are no longer tracked by the git index are to be removed + // from the tantivy & qdrant indices + let qdrant_stale = { + let mut semantic_fresh = HashSet::new(); + let mut semantic_all = HashSet::new(); + + cache.retain(|k, v| { + // check if it's already in to avoid unnecessary copies + if v.fresh && !semantic_fresh.contains(k.semantic()) { + semantic_fresh.insert(k.semantic().to_string()); + } + + if !semantic_all.contains(k.semantic()) { + semantic_all.insert(k.semantic().to_string()); + } + + // just call the passed closure for tantivy + if !v.fresh { + delete_tantivy(k.tantivy()) + } + + v.fresh + }); + + semantic_all + .difference(&semantic_fresh) + .cloned() + .collect::>() }; - for hash in keys { - let repo_str = self.reporef.to_string(); - sqlx::query!( - "INSERT INTO file_cache \ + // generate a transaction to push the remaining entries + // into the sql cache + { + let mut next = cache.first_occupied_entry_async().await; + while let Some(entry) = next { + let key = entry.key(); + let repo_str = self.reporef.to_string(); + let hash = format!("{}{}", key.0, key.1); + sqlx::query!( + "INSERT INTO file_cache \ (repo_ref, cache_hash) \ VALUES (?, ?)", - repo_str, - hash, - ) - .execute(&mut tx) - .await?; + repo_str, + hash, + ) + .execute(&mut tx) + .await?; + + next = entry.next(); + } + + tx.commit().await?; } - tx.commit().await?; + // batch-delete points from qdrant index + if !qdrant_stale.is_empty() { + if let Some(semantic) = self.semantic { + let semantic = semantic.clone(); + let reporef = self.reporef.to_string(); + tokio::spawn(async move { + semantic + .delete_points_for_hash(reporef.as_str(), qdrant_stale.into_iter()) + .await; + }); + } + } + + // make sure we generate & commit all remaining embeddings + self.batched_embed_or_flush_queue(true).await?; Ok(()) } + /// Delete all caches for the repository in scope. pub(crate) async fn delete(&self) -> anyhow::Result<()> { let mut tx = self.db.begin().await?; self.delete_files(&mut tx).await?; @@ -137,30 +286,12 @@ impl<'a> FileCache<'a> { Ok(()) } - async fn delete_files(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> anyhow::Result<()> { - let repo_str = self.reporef.to_string(); - sqlx::query! { - "DELETE FROM file_cache \ - WHERE repo_ref = ?", - repo_str - } - .execute(&mut *tx) - .await?; - - Ok(()) - } - - async fn delete_chunks(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> anyhow::Result<()> { - let repo_str = self.reporef.to_string(); - sqlx::query! { - "DELETE FROM chunk_cache \ - WHERE repo_ref = ?", - repo_str - } - .execute(&mut *tx) - .await?; - - Ok(()) + /// Process the next chunk from the embedding queue if the batch size is met. + pub fn process_embedding_queue(&self) -> anyhow::Result<()> { + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current() + .block_on(async { self.batched_embed_or_flush_queue(false).await }) + }) } /// Commit the embed log, invoking the embedder if batch size is met. @@ -168,7 +299,7 @@ impl<'a> FileCache<'a> { /// If `flush == true`, drain the log, send the entire batch to /// the embedder, and commit the results, disregarding the internal /// batch sizing. - pub async fn batched_process_embed_queue(&self, flush: bool) -> anyhow::Result<()> { + async fn batched_embed_or_flush_queue(&self, flush: bool) -> anyhow::Result<()> { let Some(semantic) = self.semantic else { return Ok(()); @@ -188,6 +319,8 @@ impl<'a> FileCache<'a> { Ok(()) } + /// Empty the queue in batches, and generate embeddings using the + /// configured embedder async fn embed_queued_points( &self, semantic: &Semantic, @@ -249,14 +382,89 @@ impl<'a> FileCache<'a> { } } - pub async fn chunks_for_file(&'a self, key: &'a str) -> ChunkCache<'a> { + /// Chunks and inserts the buffer content into the semantic db. + /// + /// Assumes that the semantic db is initialized and usable, otherwise panics. + #[allow(clippy::too_many_arguments)] + pub(crate) async fn process_semantic( + &self, + cache_keys: &CacheKeys, + repo_name: &str, + repo_ref: &str, + relative_path: &str, + buffer: &str, + lang_str: &str, + branches: &[String], + ) { + let chunk_cache = self.chunks_for_file(cache_keys).await; + let semantic = self.semantic.expect("uninitialized semantic db"); + + semantic + .chunks_for_buffer( + cache_keys.semantic().into(), + repo_name, + repo_ref, + relative_path, + buffer, + lang_str, + branches, + ) + .for_each(|(data, payload)| { + let cached = chunk_cache.update_or_embed(&data, payload); + if let Err(err) = cached { + warn!(?err, %repo_name, %relative_path, "embedding failed"); + } + }); + + match chunk_cache.commit().await { + Ok((new, updated, deleted)) => { + info!( + repo_name, + relative_path, new, updated, deleted, "Successful commit" + ) + } + Err(err) => { + warn!(repo_name, relative_path, ?err, "Failed to upsert vectors") + } + } + } + + /// Delete all files in the `file_cache` table for the repository in scope. + async fn delete_files(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> anyhow::Result<()> { + let repo_str = self.reporef.to_string(); + sqlx::query! { + "DELETE FROM file_cache \ + WHERE repo_ref = ?", + repo_str + } + .execute(&mut *tx) + .await?; + + Ok(()) + } + + /// Delete all chunks in the `chunk_cache` table for the repository in scope. + async fn delete_chunks(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> anyhow::Result<()> { + let repo_str = self.reporef.to_string(); + sqlx::query! { + "DELETE FROM chunk_cache \ + WHERE repo_ref = ?", + repo_str + } + .execute(&mut *tx) + .await?; + + Ok(()) + } + + async fn chunks_for_file(&'a self, key: &'a CacheKeys) -> ChunkCache<'a> { ChunkCache::for_file( self.db, self.semantic .expect("we shouldn't get here without semantic db configured"), self.reporef, &self.embed_queue, - key, + key.semantic(), ) .await } @@ -310,8 +518,12 @@ impl<'a> ChunkCache<'a> { } } - pub fn update_or_embed(&self, data: &'a str, payload: Payload) -> anyhow::Result<()> { - let id = self.cache_key(data, &payload); + /// Update a cache entry with the details from `payload`, or create a new embedding. + /// + /// New insertions are queued, and stored on the repository-level + /// `FileCache` instance that created this. + fn update_or_embed(&self, data: &'a str, payload: Payload) -> anyhow::Result<()> { + let id = self.derive_chunk_uuid(data, &payload); let branches_hash = blake3::hash(payload.branches.join("\n").as_ref()).to_string(); match self.cache.entry(id) { @@ -378,7 +590,7 @@ impl<'a> ChunkCache<'a> { /// actually written to qdrant in batches. /// /// All qdrant operations are executed in batches through a call - /// to [`FileCache::commit_chunks`]. + /// to [`FileCache::commit_embed_log`]. async fn commit_inserts( &self, tx: &mut sqlx::Transaction<'_, Sqlite>, @@ -502,14 +714,9 @@ impl<'a> ChunkCache<'a> { Ok(update_size) } - /// Return the cache key for the file that contains these chunks - pub fn file_hash(&self) -> String { - self.file_cache_key.to_string() - } - /// Generate a content hash from the embedding data, and pin it to /// the containing file's content id. - fn cache_key(&self, data: &str, payload: &Payload) -> String { + fn derive_chunk_uuid(&self, data: &str, payload: &Payload) -> String { let id = { let mut bytes = [0; 16]; let mut hasher = blake3::Hasher::new(); diff --git a/server/bleep/src/indexes/file.rs b/server/bleep/src/indexes/file.rs index 332554f8c0..6592a22010 100644 --- a/server/bleep/src/indexes/file.rs +++ b/server/bleep/src/indexes/file.rs @@ -10,7 +10,6 @@ use std::{ use anyhow::{bail, Result}; use async_trait::async_trait; use rayon::prelude::*; -use scc::hash_map::Entry; use tantivy::{ collector::TopDocs, doc, @@ -33,7 +32,7 @@ use super::{ }; use crate::{ background::SyncPipes, - cache::{FileCache, FileCacheSnapshot}, + cache::{CacheKeys, FileCache, FileCacheSnapshot}, intelligence::TreeSitterFile, query::compiler::{case_permutations, trigrams}, repo::{iterator::*, RepoMetadata, RepoRef, Repository}, @@ -41,13 +40,36 @@ use crate::{ }; struct Workload<'a> { + cache: &'a FileCacheSnapshot<'a>, repo_disk_path: &'a Path, - repo_ref: String, repo_name: &'a str, repo_metadata: &'a RepoMetadata, - file_cache: &'a FileCache<'a>, - cache_snapshot: &'a FileCacheSnapshot, - dir_entry: RepoDirEntry, + repo_ref: String, + relative_path: PathBuf, + normalized_path: PathBuf, +} + +impl<'a> Workload<'a> { + fn cache_keys(&self, dir_entry: &RepoDirEntry) -> CacheKeys { + let semantic_hash = { + let mut hash = blake3::Hasher::new(); + hash.update(crate::state::SCHEMA_VERSION.as_bytes()); + hash.update(self.relative_path.to_string_lossy().as_ref().as_ref()); + hash.update(self.repo_ref.as_bytes()); + hash.update(dir_entry.buffer().unwrap_or_default().as_bytes()); + hash.finalize().to_hex().to_string() + }; + + let tantivy_hash = { + let branch_list = dir_entry.branches().unwrap_or_default(); + let mut hash = blake3::Hasher::new(); + hash.update(semantic_hash.as_ref()); + hash.update(branch_list.join("\n").as_bytes()); + hash.finalize().to_hex().to_string() + }; + + CacheKeys::new(semantic_hash, tantivy_hash) + } } #[async_trait] @@ -65,38 +87,42 @@ impl Indexable for File { self.semantic.as_ref(), reporef, )); - let cache_snapshot = file_cache.retrieve().await; + let cache = file_cache.retrieve().await; let repo_name = reporef.indexed_name(); let processed = &AtomicU64::new(0); let file_worker = |count: usize| { - let cache_snapshot = cache_snapshot.clone(); - let file_cache = file_cache.clone(); + let cache = &cache; move |dir_entry: RepoDirEntry| { let completed = processed.fetch_add(1, Ordering::Relaxed); pipes.index_percent(((completed as f32 / count as f32) * 100f32) as u8); - let entry_disk_path = dir_entry.path().unwrap_or_default().to_owned(); + let entry_disk_path = dir_entry.path().unwrap().to_owned(); + let relative_path = { + let entry_srcpath = PathBuf::from(&entry_disk_path); + entry_srcpath + .strip_prefix(&repo.disk_path) + .map(ToOwned::to_owned) + .unwrap_or(entry_srcpath) + }; + let normalized_path = repo.disk_path.join(&relative_path); + let workload = Workload { repo_disk_path: &repo.disk_path, repo_ref: reporef.to_string(), repo_name: &repo_name, - file_cache: &file_cache, - cache_snapshot: &cache_snapshot, + relative_path, + normalized_path, repo_metadata, - dir_entry, + cache, }; trace!(entry_disk_path, "queueing entry"); - if let Err(err) = self.worker(workload, writer) { + if let Err(err) = self.worker(dir_entry, workload, writer) { warn!(%err, entry_disk_path, "indexing failed; skipping"); } - let commit_embeddings = tokio::task::block_in_place(|| { - Handle::current() - .block_on(async { file_cache.batched_process_embed_queue(false).await }) - }); - if let Err(err) = commit_embeddings { + if let Err(err) = cache.parent().process_embedding_queue() { warn!(?err, "failed to commit embeddings"); } } @@ -126,34 +152,13 @@ impl Indexable for File { info!(?repo.disk_path, "repo file indexing finished, took {:?}", start.elapsed()); - // files that are no longer tracked by the git index are to be removed - // from the tantivy & qdrant indices - let mut qdrant_remove_list = vec![]; - cache_snapshot.retain(|k, v| { - if !v.fresh { - writer.delete_term(Term::from_field_text(self.unique_hash, k)); - qdrant_remove_list.push(k.to_string()); - } - - v.fresh - }); - - // batch-delete points from qdrant index - if !qdrant_remove_list.is_empty() { - if let Some(semantic) = &self.semantic { - let semantic = semantic.clone(); - let reporef = reporef.to_string(); - tokio::spawn(async move { - semantic - .delete_points_for_hash(reporef.as_str(), qdrant_remove_list.into_iter()) - .await; - }); - } - } + file_cache + .synchronize(cache, |key| { + writer.delete_term(Term::from_field_text(self.unique_hash, key)); + }) + .await?; pipes.index_percent(100); - file_cache.persist(cache_snapshot).await?; - file_cache.batched_process_embed_queue(true).await?; Ok(()) } @@ -420,68 +425,28 @@ impl Indexer { } impl File { - #[tracing::instrument(fields(repo=%workload.repo_ref, entry_disk_path=?workload.dir_entry.path()), skip_all)] - fn worker(&self, workload: Workload<'_>, writer: &IndexWriter) -> Result<()> { - let Workload { - repo_ref, - repo_disk_path, - repo_name, - repo_metadata, - file_cache, - cache_snapshot, - dir_entry, - } = workload; - + #[tracing::instrument(fields(repo=%workload.repo_ref, entry_disk_path=?dir_entry.path()), skip_all)] + fn worker( + &self, + dir_entry: RepoDirEntry, + workload: Workload<'_>, + writer: &IndexWriter, + ) -> Result<()> { #[cfg(feature = "debug")] let start = Instant::now(); trace!("processing file"); - let relative_path = { - let entry_srcpath = PathBuf::from(dir_entry.path().ok_or(anyhow::anyhow!( - "dir entry is not a valid file or directory" - ))?); - entry_srcpath - .strip_prefix(repo_disk_path) - .map(ToOwned::to_owned) - .unwrap_or(entry_srcpath) - }; - let entry_pathbuf = repo_disk_path.join(&relative_path); - - let semantic_hash = { - let mut hash = blake3::Hasher::new(); - hash.update(crate::state::SCHEMA_VERSION.as_bytes()); - hash.update(relative_path.to_string_lossy().as_ref().as_ref()); - hash.update(repo_ref.as_bytes()); - hash.update(dir_entry.buffer().unwrap_or_default().as_bytes()); - hash.finalize().to_hex().to_string() - }; - - let tantivy_hash = { - let branch_list = dir_entry.branches().unwrap_or_default(); - let mut hash = blake3::Hasher::new(); - hash.update(semantic_hash.as_ref()); - hash.update(branch_list.join("\n").as_bytes()); - hash.finalize().to_hex().to_string() - }; - - let last_commit = repo_metadata.last_commit_unix_secs.unwrap_or(0); + let cache_keys = workload.cache_keys(&dir_entry); + let last_commit = workload.repo_metadata.last_commit_unix_secs.unwrap_or(0); match dir_entry { - _ if is_cache_fresh(cache_snapshot, &tantivy_hash, &entry_pathbuf) => { + _ if workload.cache.is_fresh(&cache_keys) => { info!("fresh; skipping"); return Ok(()); } RepoDirEntry::Dir(dir) => { trace!("writing dir document"); - let doc = dir.build_document( - self, - repo_name, - relative_path.as_path(), - repo_disk_path, - repo_ref.as_str(), - last_commit, - tantivy_hash, - ); + let doc = dir.build_document(self, &workload, last_commit, &cache_keys); writer.add_document(doc)?; trace!("dir document written"); } @@ -490,16 +455,10 @@ impl File { let doc = file .build_document( self, - repo_name, - relative_path.as_path(), - repo_disk_path, - semantic_hash, - tantivy_hash, - entry_pathbuf.as_path(), - repo_ref.as_str(), + &workload, + &cache_keys, last_commit, - repo_metadata, - file_cache, + workload.cache.parent(), ) .ok_or(anyhow::anyhow!("failed to build document"))?; writer.add_document(doc)?; @@ -541,13 +500,18 @@ impl RepoDir { fn build_document( self, schema: &File, - repo_name: &str, - relative_path: &Path, - repo_disk_path: &Path, - repo_ref: &str, + workload: &Workload<'_>, last_commit: u64, - tantivy_cache_key: String, + cache_keys: &CacheKeys, ) -> tantivy::schema::Document { + let Workload { + relative_path, + repo_name, + repo_disk_path, + repo_ref, + .. + } = workload; + let relative_path_str = format!("{}/", relative_path.to_string_lossy()); #[cfg(windows)] let relative_path_str = relative_path_str.replace('\\', "/"); @@ -559,12 +523,12 @@ impl RepoDir { schema.raw_relative_path => relative_path_str.as_bytes(), schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), schema.relative_path => relative_path_str, - schema.repo_ref => repo_ref, - schema.repo_name => repo_name, + schema.repo_ref => repo_ref.as_str(), + schema.repo_name => *repo_name, schema.last_commit_unix_seconds => last_commit, schema.branches => branches, schema.is_directory => true, - schema.unique_hash => tantivy_cache_key, + schema.unique_hash => cache_keys.tantivy(), // nulls schema.raw_content => Vec::::default(), @@ -583,17 +547,21 @@ impl RepoFile { fn build_document( mut self, schema: &File, - repo_name: &str, - relative_path: &Path, - repo_disk_path: &Path, - semantic_cache_key: String, - tantivy_cache_key: String, - entry_pathbuf: &Path, - repo_ref: &str, + workload: &Workload<'_>, + cache_keys: &CacheKeys, last_commit: u64, - repo_metadata: &RepoMetadata, file_cache: &FileCache, ) -> Option { + let Workload { + relative_path, + repo_name, + repo_disk_path, + repo_ref, + repo_metadata, + normalized_path, + .. + } = workload; + let relative_path_str = relative_path.to_string_lossy().to_string(); #[cfg(windows)] let relative_path_str = relative_path_str.replace('\\', "/"); @@ -601,9 +569,9 @@ impl RepoFile { let branches = self.branches.join("\n"); let lang_str = repo_metadata .langs - .get(entry_pathbuf, self.buffer.as_ref()) + .get(normalized_path, self.buffer.as_ref()) .unwrap_or_else(|| { - warn!(?entry_pathbuf, "Path not found in language map"); + warn!(?normalized_path, "Path not found in language map"); "" }); @@ -649,20 +617,20 @@ impl RepoFile { let lines_avg = self.buffer.len() as f64 / self.buffer.lines().count() as f64; - if let Some(semantic) = &schema.semantic { + if schema.semantic.is_some() { tokio::task::block_in_place(|| { Handle::current().block_on(async { - semantic - .insert_points_for_buffer( + file_cache + .process_semantic( + cache_keys, repo_name, repo_ref, &relative_path_str, &self.buffer, lang_str, &self.branches, - file_cache.chunks_for_file(&semantic_cache_key).await, ) - .await + .await; }) }); } @@ -671,11 +639,11 @@ impl RepoFile { schema.raw_content => self.buffer.as_bytes(), schema.raw_repo_name => repo_name.as_bytes(), schema.raw_relative_path => relative_path_str.as_bytes(), - schema.unique_hash => tantivy_cache_key, + schema.unique_hash => cache_keys.tantivy(), schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), schema.relative_path => relative_path_str, - schema.repo_ref => repo_ref, - schema.repo_name => repo_name, + schema.repo_ref => repo_ref.as_str(), + schema.repo_name => *repo_name, schema.content => self.buffer, schema.line_end_indices => line_end_indices, schema.lang => lang_str.to_ascii_lowercase().as_bytes(), @@ -689,25 +657,6 @@ impl RepoFile { } } -#[tracing::instrument(skip(cache))] -fn is_cache_fresh(cache: &FileCacheSnapshot, unique_hash: &str, entry_pathbuf: &PathBuf) -> bool { - match cache.entry(unique_hash.into()) { - Entry::Occupied(mut val) => { - // skip processing if contents are up-to-date in the cache - val.get_mut().fresh = true; - - trace!("cache hit"); - return true; - } - Entry::Vacant(val) => { - _ = val.insert_entry(().into()); - } - } - - trace!("cache miss"); - false -} - fn build_fuzzy_regex_filter(query_str: &str) -> Option { fn additions(s: &str, i: usize, j: usize) -> String { if i > j { diff --git a/server/bleep/src/semantic.rs b/server/bleep/src/semantic.rs index 26286fb5e6..bb78565567 100644 --- a/server/bleep/src/semantic.rs +++ b/server/bleep/src/semantic.rs @@ -424,17 +424,17 @@ impl Semantic { } #[allow(clippy::too_many_arguments)] - #[tracing::instrument(skip(self, repo_name, buffer, chunk_cache))] - pub async fn insert_points_for_buffer( - &self, - repo_name: &str, - repo_ref: &str, - relative_path: &str, - buffer: &str, - lang_str: &str, - branches: &[String], - chunk_cache: crate::cache::ChunkCache<'_>, - ) { + #[tracing::instrument(skip(self, repo_name, buffer))] + pub fn chunks_for_buffer<'a>( + &'a self, + file_cache_key: String, + repo_name: &'a str, + repo_ref: &'a str, + relative_path: &'a str, + buffer: &'a str, + lang_str: &'a str, + branches: &'a [String], + ) -> impl ParallelIterator + 'a { const MIN_CHUNK_TOKENS: usize = 50; let chunks = chunk::by_tokens( @@ -447,13 +447,13 @@ impl Semantic { ); debug!(chunk_count = chunks.len(), "found chunks"); - chunks.par_iter().for_each(|chunk| { - let data = format!("{repo_name}\t{relative_path}\n{}", chunk.data,); + chunks.into_par_iter().map(move |chunk| { + let data = format!("{repo_name}\t{relative_path}\n{}", chunk.data); let payload = Payload { repo_name: repo_name.to_owned(), repo_ref: repo_ref.to_owned(), relative_path: relative_path.to_owned(), - content_hash: chunk_cache.file_hash(), + content_hash: file_cache_key.to_string(), text: chunk.data.to_owned(), lang: lang_str.to_ascii_lowercase(), branches: branches.to_owned(), @@ -464,23 +464,8 @@ impl Semantic { ..Default::default() }; - let cached = chunk_cache.update_or_embed(&data, payload); - if let Err(err) = cached { - warn!(?err, %repo_name, %relative_path, "embedding failed"); - } - }); - - match chunk_cache.commit().await { - Ok((new, updated, deleted)) => { - info!( - repo_name, - relative_path, new, updated, deleted, "Successful commit" - ) - } - Err(err) => { - warn!(repo_name, relative_path, ?err, "Failed to upsert vectors") - } - } + (data, payload) + }) } pub async fn delete_points_for_hash(