Skip to content

Commit

Permalink
Merge pull request #30 from serpent-os/feat/batch-add
Browse files Browse the repository at this point in the history
Feat/batch add
  • Loading branch information
ikeycode authored Sep 19, 2023
2 parents e824269 + 572977e commit 6f06312
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 108 deletions.
2 changes: 1 addition & 1 deletion moss/src/cli/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn handle(args: &ArgMatches) -> Result<(), Error> {
.by_name(&nom, Flags::AVAILABLE)
.collect::<Vec<_>>()
.await;
if resolved.len() == 0 {
if resolved.is_empty() {
return Err(Error::NotFound(pkg));
}
for candidate in resolved {
Expand Down
50 changes: 25 additions & 25 deletions moss/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ mod encoding {
pub struct Decoder<T>(pub T);

/// A trait to define an encoding between a sql type and rust type
pub trait Encoding: Sized {
type Encoded;
pub trait Encoding<'a>: Sized {
type Encoded: ToOwned;
type Error;

fn decode(encoded: Self::Encoded) -> Result<Self, Self::Error>;
fn encode(self) -> Self::Encoded;
fn encode(&'a self) -> Self::Encoded;
}

impl<'r, T, U, E> sqlx::Decode<'r, Sqlite> for Decoder<T>
where
T: Encoding<Encoded = U, Error = E>,
U: sqlx::Decode<'r, Sqlite>,
T: Encoding<'r, Encoded = U, Error = E>,
U: sqlx::Decode<'r, Sqlite> + ToOwned,
E: std::error::Error + Send + Sync + 'static,
{
fn decode(
Expand All @@ -43,8 +43,8 @@ mod encoding {

impl<T, U, E> Type<Sqlite> for Decoder<T>
where
T: Encoding<Encoded = U, Error = E>,
U: Type<Sqlite>,
T: Encoding<'static, Encoded = U, Error = E>,
U: ToOwned + Type<Sqlite>,
{
fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
U::type_info()
Expand All @@ -58,57 +58,57 @@ mod encoding {
/** Encoding on external types */

/// Encoding of package identity (String)
impl Encoding for package::Id {
type Encoded = String;
impl<'a> Encoding<'a> for package::Id {
type Encoded = &'a str;
type Error = Infallible;

fn decode(encoded: Self::Encoded) -> Result<Self, Self::Error> {
Ok(package::Id::from(encoded))
fn decode(encoded: &'a str) -> Result<Self, Self::Error> {
Ok(package::Id::from(encoded.to_owned()))
}

fn encode(self) -> Self::Encoded {
String::from(self)
fn encode(&'a self) -> &'a str {
self.as_ref()
}
}

/// Encoding of package name (String)
impl Encoding for package::Name {
type Encoded = String;
impl<'a> Encoding<'a> for package::Name {
type Encoded = &'a str;
type Error = Infallible;

fn decode(encoded: Self::Encoded) -> Result<Self, Self::Error> {
Ok(package::Name::from(encoded))
fn decode(encoded: &'a str) -> Result<Self, Self::Error> {
Ok(package::Name::from(encoded.to_owned()))
}

fn encode(self) -> Self::Encoded {
String::from(self)
fn encode(&'a self) -> &'a str {
self.as_ref()
}
}

/// Encoding of Dependency type
impl Encoding for Dependency {
impl<'a> Encoding<'a> for Dependency {
type Encoded = String;
type Error = dependency::ParseError;

fn decode(encoded: Self::Encoded) -> Result<Self, Self::Error> {
fn decode(encoded: String) -> Result<Self, Self::Error> {
encoded.parse()
}

fn encode(self) -> Self::Encoded {
fn encode(&self) -> String {
self.to_string()
}
}

/// Encoding of Provider type
impl Encoding for Provider {
impl<'a> Encoding<'a> for Provider {
type Encoded = String;
type Error = dependency::ParseError;

fn decode(encoded: Self::Encoded) -> Result<Self, Self::Error> {
fn decode(encoded: String) -> Result<Self, Self::Error> {
encoded.parse()
}

fn encode(self) -> Self::Encoded {
fn encode(&self) -> String {
self.to_string()
}
}
Expand Down
136 changes: 83 additions & 53 deletions moss/src/db/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl Database {
WHERE package = ?;
",
)
.bind(package.clone().encode());
.bind(package.encode());

let licenses_query = sqlx::query_as::<_, encoding::License>(
"
Expand All @@ -158,7 +158,7 @@ impl Database {
WHERE package = ?;
",
)
.bind(package.clone().encode());
.bind(package.encode());

let dependencies_query = sqlx::query_as::<_, encoding::Dependency>(
"
Expand All @@ -167,7 +167,7 @@ impl Database {
WHERE package = ?;
",
)
.bind(package.clone().encode());
.bind(package.encode());

let providers_query = sqlx::query_as::<_, encoding::Provider>(
"
Expand All @@ -176,7 +176,7 @@ impl Database {
WHERE package = ?;
",
)
.bind(package.clone().encode());
.bind(package.encode());

let (entry, licenses, dependencies, providers) = futures::try_join!(
entry_query.fetch_one(&self.pool),
Expand Down Expand Up @@ -205,31 +205,21 @@ impl Database {
}

pub async fn add(&self, id: package::Id, meta: Meta) -> Result<(), Error> {
let Meta {
name,
version_identifier,
source_release,
build_release,
architecture,
summary,
description,
source_id,
homepage,
licenses,
dependencies,
providers,
uri,
hash,
download_size,
} = meta;
self.batch_add(vec![(id, meta)]).await
}

pub async fn batch_add(&self, packages: Vec<(package::Id, Meta)>) -> Result<(), Error> {
let mut transaction = self.pool.begin().await?;

// Remove package (other tables cascade)
remove(id.clone(), transaction.acquire().await?).await?;
batch_remove(
packages.iter().map(|(id, _)| id),
transaction.acquire().await?,
)
.await?;

// Create entry
sqlx::query(
sqlx::QueryBuilder::new(
"
INSERT INTO meta (
package,
Expand All @@ -246,66 +236,98 @@ impl Database {
hash,
download_size
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
",
)
.bind(id.clone().encode())
.bind(name.encode())
.bind(version_identifier)
.bind(source_release as i64)
.bind(build_release as i64)
.bind(architecture)
.bind(summary)
.bind(description)
.bind(source_id)
.bind(homepage)
.bind(uri)
.bind(hash)
.bind(download_size.map(|i| i as i64))
.push_values(&packages, |mut b, (id, meta)| {
let Meta {
name,
version_identifier,
source_release,
build_release,
architecture,
summary,
description,
source_id,
homepage,
uri,
hash,
download_size,
..
} = meta;

b.push_bind(id.encode())
.push_bind(name.encode())
.push_bind(version_identifier)
.push_bind(*source_release as i64)
.push_bind(*build_release as i64)
.push_bind(architecture)
.push_bind(summary)
.push_bind(description)
.push_bind(source_id)
.push_bind(homepage)
.push_bind(uri)
.push_bind(hash)
.push_bind(download_size.map(|i| i as i64));
})
.build()
.execute(transaction.acquire().await?)
.await?;

// Licenses
let licenses = packages
.iter()
.flat_map(|(id, meta)| meta.licenses.iter().map(move |license| (id, license)))
.collect::<Vec<_>>();
if !licenses.is_empty() {
sqlx::QueryBuilder::new(
"
INSERT INTO meta_licenses (package, license)
",
)
.push_values(licenses, |mut b, license| {
b.push_bind(id.clone().encode()).push_bind(license);
.push_values(licenses, |mut b, (id, license)| {
b.push_bind(id.encode()).push_bind(license);
})
.build()
.execute(transaction.acquire().await?)
.await?;
}

// Dependencies
let dependencies = packages
.iter()
.flat_map(|(id, meta)| {
meta.dependencies
.iter()
.map(move |dependency| (id, dependency))
})
.collect::<Vec<_>>();
if !dependencies.is_empty() {
sqlx::QueryBuilder::new(
"
INSERT INTO meta_dependencies (package, dependency)
",
)
.push_values(dependencies, |mut b, dependency| {
b.push_bind(id.clone().encode())
.push_bind(dependency.encode());
.push_values(dependencies, |mut b, (id, dependency)| {
b.push_bind(id.encode()).push_bind(dependency.encode());
})
.build()
.execute(transaction.acquire().await?)
.await?;
}

// Providers
let providers = packages
.iter()
.flat_map(|(id, meta)| meta.providers.iter().map(move |provider| (id, provider)))
.collect::<Vec<_>>();
if !providers.is_empty() {
sqlx::QueryBuilder::new(
"
INSERT INTO meta_providers (package, provider)
",
)
.push_values(providers, |mut b, provider| {
b.push_bind(id.clone().encode())
.push_bind(provider.encode());
.push_values(providers, |mut b, (id, provider)| {
b.push_bind(id.encode()).push_bind(provider.encode());
})
.build()
.execute(transaction.acquire().await?)
Expand All @@ -318,16 +340,24 @@ impl Database {
}
}

async fn remove(package: package::Id, connection: &mut SqliteConnection) -> Result<(), Error> {
sqlx::query(
async fn batch_remove(
packages: impl IntoIterator<Item = &package::Id>,
connection: &mut SqliteConnection,
) -> Result<(), Error> {
let mut query_builder = sqlx::QueryBuilder::new(
"
DELETE FROM meta
WHERE package = ?;
WHERE package IN (
",
)
.bind(package.encode())
.execute(connection)
.await?;
);

let mut separated = query_builder.separated(", ");
packages.into_iter().for_each(|package| {
separated.push_bind(package.encode());
});
separated.push_unseparated(");");

query_builder.build().execute(connection).await?;

Ok(())
}
Expand Down Expand Up @@ -435,7 +465,7 @@ mod test {

assert_eq!(&meta.name, &"bash-completion".to_string().into());

remove(id.clone(), &mut database.pool.acquire().await.unwrap())
batch_remove([&id], &mut database.pool.acquire().await.unwrap())
.await
.unwrap();

Expand Down
Loading

0 comments on commit 6f06312

Please sign in to comment.