diff --git a/moss/src/cli/info.rs b/moss/src/cli/info.rs index e6e720d3..aca824bf 100644 --- a/moss/src/cli/info.rs +++ b/moss/src/cli/info.rs @@ -38,7 +38,7 @@ pub async fn handle(args: &ArgMatches) -> Result<(), Error> { .by_name(&nom, Flags::AVAILABLE) .collect::>() .await; - if resolved.len() == 0 { + if resolved.is_empty() { return Err(Error::NotFound(pkg)); } for candidate in resolved { diff --git a/moss/src/db.rs b/moss/src/db.rs index 03c2d68a..6283ff52 100644 --- a/moss/src/db.rs +++ b/moss/src/db.rs @@ -20,18 +20,18 @@ mod encoding { pub struct Decoder(pub T); /// A trait to define an encoding between a sql type and rust type - pub trait Encoding: Sized { - type Encoded; + pub trait Encoding<'a>: Sized { + type Encoded: ToOwned; type Error; fn decode(encoded: Self::Encoded) -> Result; - fn encode(self) -> Self::Encoded; + fn encode(&'a self) -> Self::Encoded; } impl<'r, T, U, E> sqlx::Decode<'r, Sqlite> for Decoder where - T: Encoding, - U: sqlx::Decode<'r, Sqlite>, + T: Encoding<'r, Encoded = U, Error = E>, + U: sqlx::Decode<'r, Sqlite> + ToOwned, E: std::error::Error + Send + Sync + 'static, { fn decode( @@ -43,8 +43,8 @@ mod encoding { impl Type for Decoder where - T: Encoding, - U: Type, + T: Encoding<'static, Encoded = U, Error = E>, + U: ToOwned + Type, { fn type_info() -> ::TypeInfo { U::type_info() @@ -58,57 +58,57 @@ mod encoding { /** Encoding on external types */ /// Encoding of package identity (String) - impl Encoding for package::Id { - type Encoded = String; + impl<'a> Encoding<'a> for package::Id { + type Encoded = &'a str; type Error = Infallible; - fn decode(encoded: Self::Encoded) -> Result { - Ok(package::Id::from(encoded)) + fn decode(encoded: &'a str) -> Result { + Ok(package::Id::from(encoded.to_owned())) } - fn encode(self) -> Self::Encoded { - String::from(self) + fn encode(&'a self) -> &'a str { + self.as_ref() } } /// Encoding of package name (String) - impl Encoding for package::Name { - type Encoded = String; + impl<'a> Encoding<'a> for package::Name { + type Encoded = &'a str; type Error = Infallible; - fn decode(encoded: Self::Encoded) -> Result { - Ok(package::Name::from(encoded)) + fn decode(encoded: &'a str) -> Result { + Ok(package::Name::from(encoded.to_owned())) } - fn encode(self) -> Self::Encoded { - String::from(self) + fn encode(&'a self) -> &'a str { + self.as_ref() } } /// Encoding of Dependency type - impl Encoding for Dependency { + impl<'a> Encoding<'a> for Dependency { type Encoded = String; type Error = dependency::ParseError; - fn decode(encoded: Self::Encoded) -> Result { + fn decode(encoded: String) -> Result { encoded.parse() } - fn encode(self) -> Self::Encoded { + fn encode(&self) -> String { self.to_string() } } /// Encoding of Provider type - impl Encoding for Provider { + impl<'a> Encoding<'a> for Provider { type Encoded = String; type Error = dependency::ParseError; - fn decode(encoded: Self::Encoded) -> Result { + fn decode(encoded: String) -> Result { encoded.parse() } - fn encode(self) -> Self::Encoded { + fn encode(&self) -> String { self.to_string() } } diff --git a/moss/src/db/meta.rs b/moss/src/db/meta.rs index 9dd741a2..b3019473 100644 --- a/moss/src/db/meta.rs +++ b/moss/src/db/meta.rs @@ -149,7 +149,7 @@ impl Database { WHERE package = ?; ", ) - .bind(package.clone().encode()); + .bind(package.encode()); let licenses_query = sqlx::query_as::<_, encoding::License>( " @@ -158,7 +158,7 @@ impl Database { WHERE package = ?; ", ) - .bind(package.clone().encode()); + .bind(package.encode()); let dependencies_query = sqlx::query_as::<_, encoding::Dependency>( " @@ -167,7 +167,7 @@ impl Database { WHERE package = ?; ", ) - .bind(package.clone().encode()); + .bind(package.encode()); let providers_query = sqlx::query_as::<_, encoding::Provider>( " @@ -176,7 +176,7 @@ impl Database { WHERE package = ?; ", ) - .bind(package.clone().encode()); + .bind(package.encode()); let (entry, licenses, dependencies, providers) = futures::try_join!( entry_query.fetch_one(&self.pool), @@ -205,31 +205,21 @@ impl Database { } pub async fn add(&self, id: package::Id, meta: Meta) -> Result<(), Error> { - let Meta { - name, - version_identifier, - source_release, - build_release, - architecture, - summary, - description, - source_id, - homepage, - licenses, - dependencies, - providers, - uri, - hash, - download_size, - } = meta; + self.batch_add(vec![(id, meta)]).await + } + pub async fn batch_add(&self, packages: Vec<(package::Id, Meta)>) -> Result<(), Error> { let mut transaction = self.pool.begin().await?; // Remove package (other tables cascade) - remove(id.clone(), transaction.acquire().await?).await?; + batch_remove( + packages.iter().map(|(id, _)| id), + transaction.acquire().await?, + ) + .await?; // Create entry - sqlx::query( + sqlx::QueryBuilder::new( " INSERT INTO meta ( package, @@ -246,34 +236,56 @@ impl Database { hash, download_size ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ", ) - .bind(id.clone().encode()) - .bind(name.encode()) - .bind(version_identifier) - .bind(source_release as i64) - .bind(build_release as i64) - .bind(architecture) - .bind(summary) - .bind(description) - .bind(source_id) - .bind(homepage) - .bind(uri) - .bind(hash) - .bind(download_size.map(|i| i as i64)) + .push_values(&packages, |mut b, (id, meta)| { + let Meta { + name, + version_identifier, + source_release, + build_release, + architecture, + summary, + description, + source_id, + homepage, + uri, + hash, + download_size, + .. + } = meta; + + b.push_bind(id.encode()) + .push_bind(name.encode()) + .push_bind(version_identifier) + .push_bind(*source_release as i64) + .push_bind(*build_release as i64) + .push_bind(architecture) + .push_bind(summary) + .push_bind(description) + .push_bind(source_id) + .push_bind(homepage) + .push_bind(uri) + .push_bind(hash) + .push_bind(download_size.map(|i| i as i64)); + }) + .build() .execute(transaction.acquire().await?) .await?; // Licenses + let licenses = packages + .iter() + .flat_map(|(id, meta)| meta.licenses.iter().map(move |license| (id, license))) + .collect::>(); if !licenses.is_empty() { sqlx::QueryBuilder::new( " INSERT INTO meta_licenses (package, license) ", ) - .push_values(licenses, |mut b, license| { - b.push_bind(id.clone().encode()).push_bind(license); + .push_values(licenses, |mut b, (id, license)| { + b.push_bind(id.encode()).push_bind(license); }) .build() .execute(transaction.acquire().await?) @@ -281,15 +293,22 @@ impl Database { } // Dependencies + let dependencies = packages + .iter() + .flat_map(|(id, meta)| { + meta.dependencies + .iter() + .map(move |dependency| (id, dependency)) + }) + .collect::>(); if !dependencies.is_empty() { sqlx::QueryBuilder::new( " INSERT INTO meta_dependencies (package, dependency) ", ) - .push_values(dependencies, |mut b, dependency| { - b.push_bind(id.clone().encode()) - .push_bind(dependency.encode()); + .push_values(dependencies, |mut b, (id, dependency)| { + b.push_bind(id.encode()).push_bind(dependency.encode()); }) .build() .execute(transaction.acquire().await?) @@ -297,15 +316,18 @@ impl Database { } // Providers + let providers = packages + .iter() + .flat_map(|(id, meta)| meta.providers.iter().map(move |provider| (id, provider))) + .collect::>(); if !providers.is_empty() { sqlx::QueryBuilder::new( " INSERT INTO meta_providers (package, provider) ", ) - .push_values(providers, |mut b, provider| { - b.push_bind(id.clone().encode()) - .push_bind(provider.encode()); + .push_values(providers, |mut b, (id, provider)| { + b.push_bind(id.encode()).push_bind(provider.encode()); }) .build() .execute(transaction.acquire().await?) @@ -318,16 +340,24 @@ impl Database { } } -async fn remove(package: package::Id, connection: &mut SqliteConnection) -> Result<(), Error> { - sqlx::query( +async fn batch_remove( + packages: impl IntoIterator, + connection: &mut SqliteConnection, +) -> Result<(), Error> { + let mut query_builder = sqlx::QueryBuilder::new( " DELETE FROM meta - WHERE package = ?; + WHERE package IN ( ", - ) - .bind(package.encode()) - .execute(connection) - .await?; + ); + + let mut separated = query_builder.separated(", "); + packages.into_iter().for_each(|package| { + separated.push_bind(package.encode()); + }); + separated.push_unseparated(");"); + + query_builder.build().execute(connection).await?; Ok(()) } @@ -435,7 +465,7 @@ mod test { assert_eq!(&meta.name, &"bash-completion".to_string().into()); - remove(id.clone(), &mut database.pool.acquire().await.unwrap()) + batch_remove([&id], &mut database.pool.acquire().await.unwrap()) .await .unwrap(); diff --git a/moss/src/db/state.rs b/moss/src/db/state.rs index 68f5a1b0..04950210 100644 --- a/moss/src/db/state.rs +++ b/moss/src/db/state.rs @@ -157,7 +157,7 @@ impl Database { ) .push_values(packages, |mut b, package| { b.push_bind(id.0.encode()) - .push_bind(package.clone().encode()) + .push_bind(package.encode()) .push_bind(Option::::None); }) .build(), @@ -213,7 +213,7 @@ mod encoding { pub package_id: Decoder, } - impl Encoding for Id { + impl<'a> Encoding<'a> for Id { type Encoded = i64; type Error = Infallible; @@ -221,25 +221,25 @@ mod encoding { Ok(Self(value)) } - fn encode(self) -> i64 { + fn encode(&self) -> i64 { self.0 } } - impl Encoding for Kind { - type Encoded = String; + impl<'a> Encoding<'a> for Kind { + type Encoded = &'a str; type Error = DecodeKindError; - fn decode(value: String) -> Result { - match value.as_str() { + fn decode(value: &'a str) -> Result { + match value { "transaction" => Ok(Self::Transaction), - _ => Err(DecodeKindError(value)), + _ => Err(DecodeKindError(value.to_string())), } } - fn encode(self) -> Self::Encoded { + fn encode(&self) -> Self::Encoded { match self { - Kind::Transaction => "transaction".into(), + Kind::Transaction => "transaction", } } } diff --git a/moss/src/package.rs b/moss/src/package.rs index 9b18c453..54abe32f 100644 --- a/moss/src/package.rs +++ b/moss/src/package.rs @@ -25,6 +25,12 @@ impl From for String { } } +impl AsRef for Id { + fn as_ref(&self) -> &str { + self.0.as_str() + } +} + impl From for meta::Id { fn from(id: Id) -> Self { meta::Id(id.0) diff --git a/moss/src/package/meta.rs b/moss/src/package/meta.rs index 33c3ee7f..7eb4f91c 100644 --- a/moss/src/package/meta.rs +++ b/moss/src/package/meta.rs @@ -35,6 +35,12 @@ impl From for String { } } +impl AsRef for Name { + fn as_ref(&self) -> &str { + &self.0 + } +} + impl fmt::Display for Name { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) diff --git a/moss/src/repository/manager.rs b/moss/src/repository/manager.rs index 443f887c..6c828aed 100644 --- a/moss/src/repository/manager.rs +++ b/moss/src/repository/manager.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; -use futures::{future, TryStreamExt}; +use futures::{future, StreamExt, TryStreamExt}; use thiserror::Error; use tokio::{fs, io}; @@ -14,6 +14,8 @@ use crate::{config, package, Installation}; use crate::repository::{self, Repository}; +const DB_BATCH_SIZE: usize = 1_000; + /// Manage a bunch of repositories pub struct Manager { installation: Installation, @@ -151,24 +153,41 @@ async fn refresh_index( // Update each payload into the meta db payloads .map_err(Error::ReadStone) - .try_for_each(|payload| async { - // We only care about meta payloads for index files - let stone::read::Payload::Meta(payload) = payload else { - return Ok(()); - }; - - let meta = package::Meta::from_stone_payload(&payload)?; - - // Create id from hash of meta - let hash = meta.hash.clone().ok_or(Error::MissingMetaField( - stone::payload::meta::Tag::PackageHash, - ))?; - let id = package::Id::from(hash); - - // Update db - state.db.add(id, meta).await?; - - Ok(()) + // Batch up to `DB_BATCH_SIZE` payloads + .chunks(DB_BATCH_SIZE) + // Transpose error for early bail + .map(|results| results.into_iter().collect::, _>>()) + .try_for_each(|payloads| async { + // Construct Meta for each payload + let packages = payloads + .into_iter() + .filter_map(|payload| { + if let stone::read::Payload::Meta(meta) = payload { + Some(meta) + } else { + None + } + }) + .map(|payload| { + let meta = package::Meta::from_stone_payload(&payload)?; + + // Create id from hash of meta + let hash = meta.hash.clone().ok_or(Error::MissingMetaField( + stone::payload::meta::Tag::PackageHash, + ))?; + let id = package::Id::from(hash); + + Ok((id, meta)) + }) + .collect::, Error>>()?; + + // Batch add to db + // + // Sqlite supports up to 32k parametized query binds. Adding a + // package has 13 binds x 1k batch size = 17k. This leaves us + // overhead to add more binds in the future, otherwise we can + // lower the `DB_BATCH_SIZE`. + state.db.batch_add(packages).await.map_err(Error::Database) }) .await?;