Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Index markdown in pgvector #392

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bfa44b5
feat: Index markdown in pgvector
shamb0 Oct 16, 2024
3243fd5
chore(ci): Switch to dependabot for better grouping (#398)
timonv Oct 20, 2024
e914cba
chore(deps): bump SethCohen/github-releases-to-discord from 1.15.1 to…
dependabot[bot] Oct 20, 2024
f305ef8
chore(ci): Explicit allow all for dependabot
timonv Oct 20, 2024
b3b3175
fix(ci): Update dependabot.yml via ui (#402)
timonv Oct 21, 2024
fe25b17
fix(indexing): Improve splitters consistency and provide defaults (#403)
timonv Oct 21, 2024
b531bdd
fix(indexing): Visibility of ChunkMarkdown builder should be public
timonv Oct 21, 2024
2a43a75
chore: Improve workspace configuration (#404)
timonv Oct 21, 2024
c17e9a9
chore: release v0.13.4 (#400)
SwabbieBosun Oct 21, 2024
b6fa280
fix(ci): Remove explicit 'all' from dependabot config
timonv Oct 21, 2024
c08658f
chore: Soft update deps
timonv Oct 21, 2024
5c3aff8
fix(ci): Add zlib to allowed licenses
timonv Oct 21, 2024
57014d2
fix(ci): Add back allow all in dependabot and fix aws pattern
timonv Oct 21, 2024
f60d009
feat: Index markdown in pgvector
shamb0 Oct 16, 2024
4266bbe
Addressed review comments:
shamb0 Oct 22, 2024
9a32436
Addressed review comments:
shamb0 Oct 22, 2024
95e925a
Update examples/index_md_into_pgvector.rs
shamb0 Oct 30, 2024
72ba300
fix(ci): Remove cache fixing ci disk limits (#408)
timonv Oct 22, 2024
6781ec3
chore(deps): bump the minor group across 1 directory with 12 updates …
dependabot[bot] Oct 23, 2024
5c3458c
fix(indexing)!: Node ID no longer memoized (#414)
timonv Oct 27, 2024
40709be
fix(indexing): Use atomics for key generation in memory storage (#415)
timonv Oct 27, 2024
7fba78d
feat(integrations): Support in process hugging face models via mistra…
timonv Oct 27, 2024
ce3945b
chore(deps): bump the minor group across 1 directory with 16 updates …
dependabot[bot] Oct 27, 2024
ae7718d
chore: release v0.14.0 (#416)
SwabbieBosun Oct 27, 2024
3c74464
fix: Revert 0.14 release as mistralrs is unpublished (#417)
timonv Oct 27, 2024
e32f721
fix(integrations): Revert mistralrs support (#418)
timonv Oct 27, 2024
30c2d01
chore: Re-release 0.14 without mistralrs (#419)
timonv Oct 27, 2024
fade2fe
chore: release v0.14.1 (#420)
SwabbieBosun Oct 27, 2024
acb34af
feat: Index markdown in pgvector
shamb0 Oct 16, 2024
b7aa295
chore: release v0.13.4 (#400)
SwabbieBosun Oct 21, 2024
bd0b265
Completed release v0.14.1 intake
shamb0 Oct 30, 2024
3eb579f
Merge branch 'master' into feat/indexing-into-pgvector
shamb0 Oct 30, 2024
6ad22f1
merge to upstream master
shamb0 Oct 30, 2024
15b2909
Address review feedback:
shamb0 Nov 1, 2024
6817d4b
Merge remote-tracking branch 'upstream/master' into feat/indexing-int…
shamb0 Nov 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
467 changes: 467 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ arrow-array = { version = "52.0", default-features = false }
arrow = { version = "52.2" }
parquet = { version = "52.2", default-features = false, features = ["async"] }
redb = { version = "2.1" }
sqlx = { version = "0.8.2", features = ["postgres", "uuid"] }
pgvector = { version = "0.4.0", features = ["sqlx"] }

# Testing
test-log = "0.2.16"
Expand All @@ -61,6 +63,8 @@ temp-dir = "0.1.13"
wiremock = "0.6.0"
test-case = "3.3.1"
insta = { version = "1.39.0", features = ["yaml"] }
tempfile = "3.10.1"
portpicker = "0.1.1"

[workspace.lints.rust]
unsafe_code = "forbid"
Expand Down
8 changes: 8 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ swiftide = { path = "../swiftide/", features = [
"ollama",
"fluvio",
"lancedb",
"pgvector",
] }
tracing-subscriber = "0.3"
tracing = { workspace = true }
serde_json = { workspace = true }
spider = { workspace = true }
qdrant-client = { workspace = true }
fluvio = { workspace = true }
temp-dir = { workspace = true }
sqlx = { workspace = true }
swiftide-test-utils = { path = "../swiftide-test-utils" }

[[example]]
doc-scrape-examples = true
Expand Down Expand Up @@ -91,3 +95,7 @@ path = "fluvio.rs"
[[example]]
name = "lancedb"
path = "lancedb.rs"

[[example]]
name = "index-md-pgvector"
path = "index_md_into_pgvector.rs"
77 changes: 77 additions & 0 deletions examples/index_md_into_pgvector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* This example demonstrates how to use the Pgvector integration with SwiftIDE
shamb0 marked this conversation as resolved.
Show resolved Hide resolved
*/
use std::path::PathBuf;
use swiftide::{
indexing::{
self,
loaders::FileLoader,
transformers::{
metadata_qa_text::NAME as METADATA_QA_TEXT_NAME, ChunkMarkdown, Embed, MetadataQAText,
},
EmbeddedField,
},
integrations::{self, pgvector::PgVector},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
tracing::info!("Starting PgVector indexing test");

// Get the manifest directory path
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR not set");

// Create a PathBuf to test dataset from the manifest directory
let test_dataset_path = PathBuf::from(manifest_dir).join("../README.md");

tracing::info!("Test Dataset path: {:?}", test_dataset_path);

let (_pgv_db_container, pgv_db_url, _temp_dir) = swiftide_test_utils::start_postgres().await;

tracing::info!("pgv_db_url :: {:#?}", pgv_db_url);

let llm_client = integrations::ollama::Ollama::default()
.with_default_prompt_model("llama3.2:latest")
.to_owned();

let fastembed =
integrations::fastembed::FastEmbed::try_default().expect("Could not create FastEmbed");

// Configure Pgvector with a default vector size, a single embedding
// and in addition to embedding the text metadata, also store it in a field
let pgv_storage = PgVector::builder()
.try_connect_to_pool(pgv_db_url, Some(10))
.await
.expect("Failed to connect to postgres server")
.vector_size(384)
.with_vector(EmbeddedField::Combined)
.with_metadata(METADATA_QA_TEXT_NAME)
.table_name("swiftide_pgvector_test".to_string())
.build()
.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost exactly right! I prefer it if builders do not do IO if they can avoid it, for multiple reasons. In this case, that also has the benefit of being able to connect lazilly and hiding the details of the connection pool.

i.e. the builder api like:

 let pgv_storage = PgVector::builder()
        .database_url(pgv_db_url)
        .pool_size(10) // With a sane default if ommitted 
        .vector_size(384)
        .with_vector(EmbeddedField::Combined)
        .with_metadata(METADATA_QA_TEXT_NAME)
        .table_name("swiftide_pgvector_test".to_string())
        .build()
        .unwrap();

And then in PgVector::setup (which is only called once):

async fn setup(&self) -> Result<()> {
  self.try_connect_to_pool(self.database_url, self.pool_size).await?;
  ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timonv, I'm looking for your input on a design choice here.

If we decide to handle the database connection pool setup within fn setup(&self) instead of PgVectorBuilder, we'll need to mutate PgVector within fn setup(). This change would mean updating the function signature in trait Persist to:

async fn setup(&mut self) -> Result<()>

For example:

async fn setup(&mut self) -> Result<()> {
  self.connection_pool = self.try_connect_to_pool(self.database_url, self.pool_size).await?;
  ...
}

This adjustment would introduce breaking changes across the stack, particularly impacting:

  • swiftide-indexing/src/persist/memory_storage.rs
  • swiftide-integrations/src/lancedb/persist.rs
  • swiftide-integrations/src/qdrant/persist.rs
  • swiftide-integrations/src/redis/persist.rs

Would you prefer moving the IO operations into Persist::setup() for these components? If so, we could handle this as a separate PR to streamline the updates.

Looking forward to your thoughts!


// Drop the existing test table before running the test
tracing::info!("Dropping existing test table & index if it exists");
let drop_table_sql = "DROP TABLE IF EXISTS swiftide_pgvector_test";
let drop_index_sql = "DROP INDEX IF EXISTS swiftide_pgvector_test_embedding_idx";

if let Ok(pool) = pgv_storage.get_pool() {
sqlx::query(drop_table_sql).execute(&pool).await?;
sqlx::query(drop_index_sql).execute(&pool).await?;
} else {
return Err("Failed to get database connection pool".into());
}

tracing::info!("Starting indexing pipeline");
indexing::Pipeline::from_loader(FileLoader::new(test_dataset_path).with_extensions(&["md"]))
.then_chunk(ChunkMarkdown::from_chunk_range(10..2048))
.then(MetadataQAText::new(llm_client.clone()))
.then_in_batch(Embed::new(fastembed.clone()).with_batch_size(100))
.then_store_with(pgv_storage.clone())
.run()
.await?;

tracing::info!("PgVector Indexing test completed successfully");
Ok(())
}
9 changes: 9 additions & 0 deletions swiftide-integrations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ async-openai = { workspace = true, optional = true }
qdrant-client = { workspace = true, optional = true, default-features = false, features = [
"serde",
] }
sqlx = { workspace = true, optional = true, features = [
"postgres",
"runtime-tokio",
"chrono",
"uuid"
] }
pgvector = { workspace = true, optional = true, features = ["sqlx"] }
redis = { version = "0.27", features = [
"aio",
"tokio-comp",
Expand Down Expand Up @@ -102,6 +109,8 @@ default = ["rustls"]
rustls = ["reqwest/rustls-tls-native-roots"]
# Qdrant for storage
qdrant = ["dep:qdrant-client", "swiftide-core/qdrant"]
# PgVector for storage
pgvector = ["dep:sqlx", "dep:pgvector"]
# Redis for caching and storage
redis = ["dep:redis"]
# Tree-sitter for code operations and chunking
Expand Down
2 changes: 2 additions & 0 deletions swiftide-integrations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub mod ollama;
pub mod openai;
#[cfg(feature = "parquet")]
pub mod parquet;
#[cfg(feature = "pgvector")]
pub mod pgvector;
#[cfg(feature = "qdrant")]
pub mod qdrant;
#[cfg(feature = "redb")]
Expand Down
163 changes: 163 additions & 0 deletions swiftide-integrations/src/pgvector/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
//! This module integrates with the pgvector database, providing functionalities to create and manage vector collections,
//! store data, and optimize indexing for efficient searches.
//!
//! pgvector is utilized in both the `indexing::Pipeline` and `query::Pipeline` modules.
mod persist;
mod pgv_table_types;
use anyhow::Result;
use derive_builder::Builder;
use sqlx::PgPool;
use std::fmt;

use pgv_table_types::{FieldConfig, MetadataConfig, PgDBConnectionPool, VectorConfig};

const DEFAULT_BATCH_SIZE: usize = 50;

/// Represents a Pgvector client with configuration options.
///
/// This struct is used to interact with the Pgvector vector database, providing methods to manage vector collections,
/// store data, and ensure efficient searches. The client can be cloned with low cost as it shares connections.
#[derive(Builder, Clone)]
#[builder(setter(into, strip_option), build_fn(error = "anyhow::Error"))]
pub struct PgVector {
/// Database connection pool.
#[builder(default = "PgDBConnectionPool::default()")]
connection_pool: PgDBConnectionPool,

/// Table name to store vectors in.
#[builder(default = "String::from(\"swiftide_pgv_store\")")]
table_name: String,

/// Default sizes of vectors. Vectors can also be of different
/// sizes by specifying the size in the vector configuration.
vector_size: Option<i32>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value isn't optional, is it? What happens if it is None?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify: this parameter can’t be None because if it is, the user won’t be able to build the PgVector::fields parameter, and PgVectorBuilder::with_vector() would fail. We’re ensuring that users configure this parameter correctly before launching the indexing pipeline.

I’d love to hear your thoughts and any suggestions for enhancing this approach.


/// Batch size for storing nodes.
#[builder(default = "Some(DEFAULT_BATCH_SIZE)")]
batch_size: Option<usize>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that this value is optional and can be None?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the batch_size configuration, I decided to keep it optional, drawing inspiration from the reference Qdrant implementation. By default, the batch size for PGVector is set to 50—though we still plan to fine-tune this for optimal performance.

To answer your question, "Is it intentional that this value is optional and can be None?"—yes, it’s designed that way. In the indexing pipeline, Pipeline::then_store_with() functions as an adapter, routing Node Streams to backend storage depending on the batch setting.

Here’s the approach:

  • When Persist::batch_size() returns None, each node is processed individually, with Persist::store() sending each chunk to the backend.
  • If Persist::batch_size() returns Some(), batch processing is enabled. The stream of nodes is grouped into chunks based on the batch size, and Persist::batch_store() sends these batches to storage.

Does this implementation align with requirements? Let me know if there’s anything specific you’d like adjusted.


/// Field configuration for the Pgvector table, determining the eventual table schema.
///
/// Supports multiple field types; see [`FieldConfig`] for details.
#[builder(default)]
fields: Vec<FieldConfig>,
}

impl fmt::Debug for PgVector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Access the connection pool synchronously and determine the status.
let connection_status = self.connection_pool.connection_status();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this do a query? Debug is called extensively with tracing. Which would mean (I think?) a query on every log / trace statement.


f.debug_struct("PgVector")
.field("table_name", &self.table_name)
.field("vector_size", &self.vector_size)
.field("batch_size", &self.batch_size)
.field("connection_status", &connection_status)
.finish()
}
}

impl PgVector {
/// Creates a new instance of `PgVectorBuilder` with default settings.
///
/// # Returns
///
/// A new `PgVectorBuilder`.
pub fn builder() -> PgVectorBuilder {
PgVectorBuilder::default()
}

/// Retrieves a connection pool for `PostgreSQL`.
///
/// This function returns the connection pool used for interacting with the `PostgreSQL` database.
/// It fetches the pool from the `PgDBConnectionPool` struct.
///
/// # Returns
///
/// A `Result` that, on success, contains the `PgPool` representing the database connection pool.
/// On failure, an error is returned.
///
/// # Errors
timonv marked this conversation as resolved.
Show resolved Hide resolved
///
/// This function will return an error if it fails to retrieve the connection pool, which could occur
/// if the underlying connection to `PostgreSQL` has not been properly established.
pub fn get_pool(&self) -> Result<PgPool> {
self.connection_pool.get_pool()
}
}

impl PgVectorBuilder {
/// Tries to asynchronously connect to a `Postgres` server and initialize a connection pool.
///
/// This function attempts to establish a connection to the specified `Postgres` server and
/// sets up a connection pool with an optional maximum number of connections.
///
/// # Arguments
///
/// * `url` - A string reference representing the URL of the `Postgres` server to connect to.
/// * `connection_max` - An optional value specifying the maximum number of connections in the pool.
///
/// # Returns
///
/// A `Result` that contains an updated `PgVector` instance with the new connection pool on success.
/// On failure, an error is returned.
///
/// # Errors
///
/// This function returns an error if the connection to the database fails or if retries are exhausted.
/// Possible reasons include invalid database URLs, unreachable servers, or exceeded retry limits.
pub async fn try_connect_to_pool(
mut self,
url: impl AsRef<str>,
connection_max: Option<u32>,
) -> Result<Self> {
let pool = self.connection_pool.clone().unwrap_or_default();

self.connection_pool = Some(pool.try_connect_to_url(url, connection_max).await?);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment. Changing that and moving it to the main struct will also remove the need to clone, unwrap and option.


Ok(self)
}

/// Adds a vector configuration to the builder.
///
/// # Arguments
///
/// * `config` - The vector configuration to add, which can be converted into a `VectorConfig`.
///
/// # Returns
///
/// A mutable reference to the builder with the new vector configuration added.
pub fn with_vector(&mut self, config: impl Into<VectorConfig>) -> &mut Self {
// Use `get_or_insert_with` to initialize `fields` if it's `None`
self.fields
.get_or_insert_with(Self::default_fields)
.push(FieldConfig::Vector(config.into()));

self
}

/// Sets the metadata configuration for the vector similarity search.
///
/// This method allows you to specify metadata configurations for vector similarity search using `MetadataConfig`.
/// The provided configuration will be added as a new field in the builder.
///
/// # Arguments
///
/// * `config` - The metadata configuration to use.
///
/// # Returns
///
/// * Returns a mutable reference to `self` for method chaining.
pub fn with_metadata(&mut self, config: impl Into<MetadataConfig>) -> &mut Self {
// Use `get_or_insert_with` to initialize `fields` if it's `None`
self.fields
.get_or_insert_with(Self::default_fields)
.push(FieldConfig::Metadata(config.into()));

self
}

fn default_fields() -> Vec<FieldConfig> {
vec![FieldConfig::ID, FieldConfig::Chunk]
}
}
Loading
Loading