Skip to content

Commit

Permalink
feat(indexing): Parquet loader (#279)
Browse files Browse the repository at this point in the history
Ingest and index data from parquet files.
  • Loading branch information
timonv authored Sep 8, 2024
1 parent 873795b commit bdf17ad
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 1 deletion.
71 changes: 71 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ qdrant-client = { version = "1.10", default-features = false, features = [
fluvio = { version = "0.23", default-features = false }
lancedb = { version = "0.9", default-features = false }
arrow-array = { version = "52.2", default-features = false }
arrow = { version = "52.2" }
parquet = { version = "52.2", default-features = false, features = ["async"] }

# Testing
test-log = "0.2.16"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ Our goal is to create a fast, extendable platform for Retrieval Augmented Genera
| **Feature** | **Details** |
| -------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Supported Large Language Model providers** | OpenAI (and Azure) - All models and embeddings <br> AWS Bedrock - Anthropic and Titan <br> Groq - All models <br> Ollama - All models |
| **Loading data** | Files <br> Scraping <br> Fluvio <br> Other pipelines and streams |
| **Loading data** | Files <br> Scraping <br> Fluvio <br> Parquet <br> Other pipelines and streams |
| **Transformers and metadata generation** | Generate Question and answerers for both text and code (Hyde) <br> Summaries, titles and queries via an LLM <br> Extract definitions and references with tree-sitter |
| **Splitting and chunking** | Markdown <br> Code (with tree-sitter) |
| **Storage** | Qdrant <br> Redis <br> LanceDB |
Expand Down
6 changes: 6 additions & 0 deletions swiftide-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ impl Hash for Node {
}
}

impl<T: Into<String>> From<T> for Node {
fn from(value: T) -> Self {
Node::new(value)
}
}

/// Embed mode of the pipeline.
///
/// See also [`super::pipeline::Pipeline::with_embed_mode`].
Expand Down
10 changes: 10 additions & 0 deletions swiftide-integrations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,18 @@ deadpool = { version = "0.12", optional = true, features = [
fluvio = { workspace = true, optional = true }
arrow-array = { workspace = true, optional = true }
lancedb = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, features = [
"async",
"arrow",
"snap",
] }
arrow = { workspace = true, optional = true }

[dev-dependencies]
swiftide-core = { path = "../swiftide-core", features = ["test-utils"] }

arrow = { workspace = true, features = ["test_utils"] }

# Used for hacking fluv to play nice
flv-util = "0.5.2"

Expand Down Expand Up @@ -118,6 +126,8 @@ aws-bedrock = [
lancedb = ["dep:lancedb", "dep:deadpool", "dep:arrow-array"]
# Fluvio loader
fluvio = ["dep:fluvio"]
# Paruqet loader
parquet = ["dep:arrow-array", "dep:parquet", "dep:arrow"]

[lints]
workspace = true
2 changes: 2 additions & 0 deletions swiftide-integrations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub mod lancedb;
pub mod ollama;
#[cfg(feature = "openai")]
pub mod openai;
#[cfg(feature = "parquet")]
pub mod parquet;
#[cfg(feature = "qdrant")]
pub mod qdrant;
#[cfg(feature = "redis")]
Expand Down
101 changes: 101 additions & 0 deletions swiftide-integrations/src/parquet/loader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use anyhow::{Context as _, Result};
use arrow_array::StringArray;
use futures_util::StreamExt as _;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use swiftide_core::{
indexing::{IndexingStream, Node},
Loader,
};
use tokio::{fs::File, runtime::Handle};

use super::Parquet;

impl Loader for Parquet {
fn into_stream(self) -> IndexingStream {
let mut builder = tokio::task::block_in_place(|| {
Handle::current().block_on(async {
let file = File::open(self.path).await.expect("Failed to open file");

ParquetRecordBatchStreamBuilder::new(file)
.await
.context("Failed to load builder")
.unwrap()
.with_batch_size(self.batch_size)
})
});

let file_metadata = builder.metadata().file_metadata().clone();
dbg!(file_metadata.schema_descr().columns());
let column_idx = file_metadata
.schema()
.get_fields()
.iter()
.enumerate()
.find_map(|(pos, column)| {
if self.column_name == column.name() {
Some(pos)
} else {
None
}
})
.unwrap_or_else(|| panic!("Column {} not found in dataset", &self.column_name));

let mask = ProjectionMask::roots(file_metadata.schema_descr(), [column_idx]);
builder = builder.with_projection(mask);

let stream = builder.build().expect("Failed to build parquet builder");

let swiftide_stream = stream.flat_map_unordered(None, move |result_batch| {
let Ok(batch) = result_batch else {
let new_result: Result<Node> = Err(anyhow::anyhow!(result_batch.unwrap_err()));

return vec![new_result].into();
};
assert!(batch.num_columns() == 1, "Number of columns _must_ be 1");

let node_values = batch
.column(0) // Should only have one column at this point
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.into_iter()
.flatten()
.map(Node::from)
.map(Ok)
.collect::<Vec<_>>();

IndexingStream::iter(node_values)
});

swiftide_stream.boxed().into()

// let mask = ProjectionMask::
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use futures_util::TryStreamExt as _;

use super::*;

#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_parquet_loader() {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("src/parquet/test.parquet");
dbg!(&path);

let loader = Parquet::builder()
.path(path)
.column_name("chunk")
.build()
.unwrap();

let result = loader.into_stream().try_collect::<Vec<_>>().await.unwrap();

let expected = [Node::new("hello"), Node::new("world")];
assert_eq!(result, expected);
}
}
30 changes: 30 additions & 0 deletions swiftide-integrations/src/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//! Stream data from parquet files
use std::path::PathBuf;

use derive_builder::Builder;

pub mod loader;

/// Stream data from parquet files on a single column
///
/// Provide a path, column and optional batch size. The column must be of type `StringArray`. Then
/// the column is loaded into the chunks of the Node.
///
/// # Panics
///
/// The loader can panic during initialization if anything with parquet or arrow fails before
/// starting the stream.
#[derive(Debug, Clone, Builder)]
#[builder(setter(into, strip_option))]
pub struct Parquet {
path: PathBuf,
column_name: String,
#[builder(default = "1024")]
batch_size: usize,
}

impl Parquet {
pub fn builder() -> ParquetBuilder {
ParquetBuilder::default()
}
}
Binary file added swiftide-integrations/src/parquet/test.parquet
Binary file not shown.

0 comments on commit bdf17ad

Please sign in to comment.