From d6cf90b5eead010f5305673b656a8d4dde2587b1 Mon Sep 17 00:00:00 2001 From: Cory Forsstrom Date: Wed, 18 Oct 2023 14:57:56 -0700 Subject: [PATCH 1/2] Prune skeleton Need to commit "remove" transaction before prune will pickup any files to garbage collect --- moss/src/cli/mod.rs | 8 +- moss/src/cli/remove.rs | 2 +- moss/src/cli/state.rs | 71 +++++++++++++++ moss/src/client/mod.rs | 15 +++- moss/src/client/prune.rs | 136 +++++++++++++++++++++++++++++ moss/src/db/mod.rs | 38 +++++++- moss/src/db/state/mod.rs | 132 +++++++--------------------- moss/src/installation.rs | 12 +-- moss/src/lib.rs | 2 + moss/src/registry/mod.rs | 4 +- moss/src/registry/plugin/active.rs | 5 +- moss/src/registry/transaction.rs | 2 +- moss/src/state.rs | 79 +++++++++++++++++ 13 files changed, 387 insertions(+), 119 deletions(-) create mode 100644 moss/src/cli/state.rs create mode 100644 moss/src/client/prune.rs create mode 100644 moss/src/state.rs diff --git a/moss/src/cli/mod.rs b/moss/src/cli/mod.rs index a0a2d22f..9feba49c 100644 --- a/moss/src/cli/mod.rs +++ b/moss/src/cli/mod.rs @@ -15,6 +15,7 @@ mod install; mod list; mod remove; mod repo; +mod state; mod version; /// Convert the name to a lookup provider @@ -64,8 +65,9 @@ fn command() -> Command { .subcommand(install::command()) .subcommand(list::command()) .subcommand(remove::command()) - .subcommand(version::command()) .subcommand(repo::command()) + .subcommand(state::command()) + .subcommand(version::command()) } /// Process all CLI arguments @@ -90,6 +92,7 @@ pub async fn process() -> Result<(), Error> { Some(("list", args)) => list::handle(args).await.map_err(Error::List), Some(("remove", args)) => remove::handle(args, root).await.map_err(Error::Remove), Some(("repo", args)) => repo::handle(args, root).await.map_err(Error::Repo), + Some(("state", args)) => state::handle(args, root).await.map_err(Error::State), _ => unreachable!(), } } @@ -116,4 +119,7 @@ pub enum Error { #[error("error handling repo: {0}")] Repo(#[from] repo::Error), + + #[error("error handling state: {0}")] + State(#[from] state::Error), } diff --git a/moss/src/cli/remove.rs b/moss/src/cli/remove.rs index 16ee21b3..651cd712 100644 --- a/moss/src/cli/remove.rs +++ b/moss/src/cli/remove.rs @@ -61,7 +61,7 @@ pub async fn handle(args: &ArgMatches, root: &Path) -> Result<(), Error> { // Add all installed packages to transaction let mut transaction = client .registry - .transaction_with_packages(installed_ids.iter().cloned().cloned().collect()) + .transaction_with_installed(installed_ids.iter().cloned().cloned().collect()) .await?; // Remove all pkgs for removal diff --git a/moss/src/cli/state.rs b/moss/src/cli/state.rs new file mode 100644 index 00000000..f76a3165 --- /dev/null +++ b/moss/src/cli/state.rs @@ -0,0 +1,71 @@ +// SPDX-FileCopyrightText: Copyright © 2020-2023 Serpent OS Developers +// +// SPDX-License-Identifier: MPL-2.0 + +use std::path::Path; + +use clap::{arg, ArgAction, ArgMatches, Command}; +use futures::{stream, StreamExt, TryFutureExt, TryStreamExt}; +use moss::{ + client::{self, prune, Client}, + state, +}; +use thiserror::Error; +use tui::pretty::print_to_columns; + +pub fn command() -> Command { + Command::new("state") + .about("Manage state") + .long_about("Manage state ...") + .subcommand_required(true) + .subcommand(Command::new("list").about("List all states")) + .subcommand( + Command::new("prune").about("Prune old states").arg( + arg!(-k --keep "Keep this many states") + .action(ArgAction::Set) + .default_value("10") + .value_parser(clap::value_parser!(u64).range(1..)), + ), + ) +} + +pub async fn handle(args: &ArgMatches, root: &Path) -> Result<(), Error> { + match args.subcommand() { + Some(("list", _)) => list(root).await, + Some(("prune", args)) => prune(args, root).await, + _ => unreachable!(), + } +} + +pub async fn list(root: &Path) -> Result<(), Error> { + let client = Client::new_for_root(root).await?; + + let state_ids = client.state_db.list_ids().await?; + + let states = stream::iter(state_ids.iter().map(|(id, _)| id)) + .then(|id| client.state_db.get(id).map_err(Error::StateDB)) + .try_collect::>() + .await?; + + print_to_columns(&states.iter().map(state::ColumnDisplay).collect::>()); + + Ok(()) +} + +pub async fn prune(args: &ArgMatches, root: &Path) -> Result<(), Error> { + let keep = *args.get_one::("keep").unwrap(); + + let client = Client::new_for_root(root).await?; + client.prune(prune::Strategy::KeepRecent(keep)).await?; + + Ok(()) +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("client error: {0}")] + Client(#[from] client::Error), + + #[error("statedb error: {0}")] + StateDB(#[from] moss::db::state::Error), +} diff --git a/moss/src/client/mod.rs b/moss/src/client/mod.rs index 50c2dffa..8024d9a3 100644 --- a/moss/src/client/mod.rs +++ b/moss/src/client/mod.rs @@ -6,12 +6,15 @@ use std::path::PathBuf; use thiserror::Error; +use self::prune::prune; use crate::{ db, registry::plugin::{self, Plugin}, - repository, Installation, Registry, + repository, Installation, Registry, State, }; +pub mod prune; + #[derive(Debug, Error)] pub enum Error { #[error("Root is invalid")] @@ -25,6 +28,9 @@ pub enum Error { Layout(#[from] db::layout::Error), #[error("state: {0}")] State(#[from] db::state::Error), + + #[error("prune: {0}")] + Prune(#[from] prune::Error), } /// A Client is a connection to the underlying package management systems @@ -96,12 +102,17 @@ impl Client { Ok(()) } + + pub async fn prune(&self, strategy: prune::Strategy) -> Result<(), Error> { + prune(strategy, &self.state_db, &self.install_db, &self.layout_db).await?; + Ok(()) + } } async fn build_registry( repositories: &repository::Manager, installdb: &db::meta::Database, - state: Option, + state: Option, ) -> Result { let mut registry = Registry::default(); diff --git a/moss/src/client/prune.rs b/moss/src/client/prune.rs new file mode 100644 index 00000000..b05e089e --- /dev/null +++ b/moss/src/client/prune.rs @@ -0,0 +1,136 @@ +// SPDX-FileCopyrightText: Copyright © 2020-2023 Serpent OS Developers +// +// SPDX-License-Identifier: MPL-2.0 + +use std::collections::HashMap; + +use itertools::Itertools; +use thiserror::Error; +use tui::pretty::print_to_columns; + +use crate::{db, package, state}; + +/// The prune strategy for removing old states +#[derive(Debug, Clone, Copy)] +pub enum Strategy { + /// Keep the most recent N states, remove the rest + KeepRecent(u64), + /// Removes a specific state + Remove(state::Id), +} + +/// The status of a state +enum Status { + /// Keep the state + Keep(state::Id), + /// Remove the state + Remove(state::Id), +} + +impl Status { + fn id(&self) -> &state::Id { + match self { + Status::Keep(id) => id, + Status::Remove(id) => id, + } + } + + fn is_removal(&self) -> bool { + matches!(self, Self::Remove(_)) + } +} + +/// Prune old states using [`Strategy`] and garbage collect +/// all cached data related to those states being removed +/// +/// TODO: Add indicatif / CLI output +pub async fn prune( + strategy: Strategy, + state_db: &db::state::Database, + install_db: &db::meta::Database, + layout_db: &db::layout::Database, +) -> Result<(), Error> { + let state_ids = state_db.list_ids().await?; + + // Define each state as either Keep or Remove + let states_by_status = match strategy { + Strategy::KeepRecent(keep) => { + // Calculate how many states over the limit we are + let num_to_remove = state_ids.len().saturating_sub(keep as usize); + + state_ids + .into_iter() + .sorted_by_key(|(_, created)| *created) + .enumerate() + .map(|(idx, (id, _))| { + if idx < num_to_remove { + Status::Remove(id) + } else { + Status::Keep(id) + } + }) + .collect::>() + } + Strategy::Remove(remove) => state_ids + .iter() + .find_map(|(id, _)| (*id == remove).then_some(Status::Remove(remove))) + .into_iter() + .collect(), + }; + + if !states_by_status.iter().any(Status::is_removal) { + // TODO: Print no states to be removed + return Ok(()); + } + + // Keep track of how many active states are using a package + let mut packages_counts = HashMap::::new(); + let mut removals = vec![]; + + // Add each package and get net count + for status in states_by_status { + // Get metadata + let state = state_db.get(status.id()).await?; + + dbg!(&state.packages.len()); + + // Increment each package + state.packages.iter().for_each(|pkg| { + *packages_counts.entry(pkg.clone()).or_default() += 1; + }); + + // Decrement if removal + if status.is_removal() { + state.packages.iter().for_each(|pkg| { + *packages_counts.entry(pkg.clone()).or_default() -= 1; + }); + removals.push(state); + } + } + + println!("The following state(s) will be removed:"); + println!(); + print_to_columns( + &removals + .iter() + .map(state::ColumnDisplay) + .collect::>(), + ); + println!(); + + // Get all packages which were decremented to 0 + let package_removals = packages_counts + .into_iter() + .filter_map(|(pkg, count)| (count == 0).then_some(pkg)) + .collect::>(); + + dbg!(package_removals); + + Ok(()) +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("state db: {0}")] + StateDB(#[from] db::state::Error), +} diff --git a/moss/src/db/mod.rs b/moss/src/db/mod.rs index 29bbe9b1..05762dfb 100644 --- a/moss/src/db/mod.rs +++ b/moss/src/db/mod.rs @@ -13,8 +13,9 @@ mod encoding { use std::convert::Infallible; use sqlx::{Sqlite, Type}; + use thiserror::Error; - use crate::{dependency, package, Dependency, Provider}; + use crate::{dependency, package, state, Dependency, Provider}; /// Decode from a database type using [`Encoding::decode`] #[derive(Debug, Clone, Copy)] @@ -113,4 +114,39 @@ mod encoding { self.to_string() } } + + impl<'a> Encoding<'a> for state::Id { + type Encoded = i64; + type Error = Infallible; + + fn decode(value: i64) -> Result { + Ok(Self::from(value)) + } + + fn encode(&self) -> i64 { + (*self).into() + } + } + + impl<'a> Encoding<'a> for state::Kind { + type Encoded = &'a str; + type Error = DecodeStateKindError; + + fn decode(value: &'a str) -> Result { + match value { + "transaction" => Ok(Self::Transaction), + _ => Err(DecodeStateKindError(value.to_string())), + } + } + + fn encode(&self) -> Self::Encoded { + match self { + state::Kind::Transaction => "transaction", + } + } + } + + #[derive(Debug, Error)] + #[error("Invalid state type: {0}")] + pub struct DecodeStateKindError(String); } diff --git a/moss/src/db/state/mod.rs b/moss/src/db/state/mod.rs index 04950210..150b7e00 100644 --- a/moss/src/db/state/mod.rs +++ b/moss/src/db/state/mod.rs @@ -2,66 +2,15 @@ // // SPDX-License-Identifier: MPL-2.0 -use std::fmt; - use chrono::{DateTime, Utc}; use sqlx::sqlite::SqliteConnectOptions; use sqlx::{Acquire, Executor, Pool, Sqlite}; use thiserror::Error; use crate::db::Encoding; -use crate::package; +use crate::state::{self, Id}; use crate::Installation; - -/// Unique identifier for [`State`] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct Id(i64); - -impl From for Id { - fn from(id: i64) -> Self { - Id(id) - } -} - -impl From for i64 { - fn from(id: Id) -> Self { - id.0 - } -} - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -/// State types -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[repr(u8)] -pub enum Kind { - /// Automatically constructed state - Transaction, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct State { - /// Unique identifier for this state - pub id: Id, - /// Quick summary for the state (optional) - pub summary: Option, - /// Description for the state (optional) - pub description: Option, - /// Package IDs / selections in this state - pub packages: Vec, - /// Creation timestamp - pub created: Timestamp, - /// Relevant type for this State - pub kind: Kind, -} - -// TODO: Add crate timestamp type that can be reused -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct Timestamp(DateTime); +use crate::{package, State}; #[derive(Debug)] pub struct Database { @@ -89,6 +38,22 @@ impl Database { Ok(Self { pool }) } + pub async fn list_ids(&self) -> Result)>, Error> { + let states = sqlx::query_as::<_, encoding::Created>( + " + SELECT id, created + FROM state; + ", + ) + .fetch_all(&self.pool) + .await?; + + Ok(states + .into_iter() + .map(|state| (state.id.0, state.created)) + .collect()) + } + pub async fn get(&self, id: &Id) -> Result { let state_query = sqlx::query_as::<_, encoding::State>( " @@ -122,7 +87,7 @@ impl Database { summary: state.summary, description: state.description, packages, - created: Timestamp(state.created), + created: state.created, kind: state.kind.0, }) } @@ -142,7 +107,7 @@ impl Database { RETURNING id; ", ) - .bind(Kind::Transaction.encode()) + .bind(state::Kind::Transaction.encode()) .bind(summary) .bind(description) .fetch_one(transaction.acquire().await?) @@ -181,23 +146,23 @@ pub enum Error { } mod encoding { - use std::convert::Infallible; - use chrono::{DateTime, Utc}; use sqlx::FromRow; - use thiserror::Error; - use super::{Id, Kind}; - use crate::{ - db::{Decoder, Encoding}, - package, - }; + use super::{state, Id}; + use crate::{db::Decoder, package}; + + #[derive(FromRow)] + pub struct Created { + pub id: Decoder, + pub created: DateTime, + } #[derive(FromRow)] pub struct State { pub id: Decoder, #[sqlx(rename = "type")] - pub kind: Decoder, + pub kind: Decoder, pub created: DateTime, pub summary: Option, pub description: Option, @@ -212,41 +177,6 @@ mod encoding { pub struct Package { pub package_id: Decoder, } - - impl<'a> Encoding<'a> for Id { - type Encoded = i64; - type Error = Infallible; - - fn decode(value: i64) -> Result { - Ok(Self(value)) - } - - fn encode(&self) -> i64 { - self.0 - } - } - - impl<'a> Encoding<'a> for Kind { - type Encoded = &'a str; - type Error = DecodeKindError; - - fn decode(value: &'a str) -> Result { - match value { - "transaction" => Ok(Self::Transaction), - _ => Err(DecodeKindError(value.to_string())), - } - } - - fn encode(&self) -> Self::Encoded { - match self { - Kind::Transaction => "transaction", - } - } - } - - #[derive(Debug, Error)] - #[error("Invalid state type: {0}")] - pub struct DecodeKindError(String); } #[cfg(test)] @@ -280,10 +210,10 @@ mod test { .unwrap(); // First record - assert_eq!(state.id.0, 1); + assert_eq!(i64::from(state.id), 1); // Check created - let elapsed = Utc::now().signed_duration_since(state.created.0); + let elapsed = Utc::now().signed_duration_since(state.created); assert!(elapsed.num_seconds() == 0); assert!(!elapsed.is_zero()); diff --git a/moss/src/installation.rs b/moss/src/installation.rs index 28e4e539..4f69fa32 100644 --- a/moss/src/installation.rs +++ b/moss/src/installation.rs @@ -10,7 +10,7 @@ use std::{ use log::{trace, warn}; use nix::unistd::{access, AccessFlags, Uid}; -use crate::db; +use crate::state; /// System mutability - do we have readwrite? #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -43,7 +43,7 @@ pub struct Installation { pub mutability: Mutability, /// Detected currently active state (optional) - pub active_state: Option, + pub active_state: Option, } impl Installation { @@ -135,7 +135,7 @@ impl Installation { /// In older versions of moss, the `/usr` entry was a symlink /// to an active state. In newer versions, the state is recorded /// within the installation tree. (`/usr/.stateID`) -fn read_state_id(root: &Path) -> Option { +fn read_state_id(root: &Path) -> Option { let usr_path = root.join("usr"); let state_path = root.join("usr").join(".stateID"); @@ -143,7 +143,7 @@ fn read_state_id(root: &Path) -> Option { .ok() .and_then(|s| s.parse::().ok()) { - return Some(db::state::Id::from(id)); + return Some(state::Id::from(id)); } else if let Ok(usr_target) = usr_path.read_link() { return read_legacy_state_id(&usr_target); } @@ -152,13 +152,13 @@ fn read_state_id(root: &Path) -> Option { } // Legacy `/usr` link support -fn read_legacy_state_id(usr_target: &Path) -> Option { +fn read_legacy_state_id(usr_target: &Path) -> Option { if usr_target.ends_with("usr") { let parent = usr_target.parent()?; let base = parent.file_name()?; let id = base.to_str()?.parse::().ok()?; - return Some(db::state::Id::from(id)); + return Some(state::Id::from(id)); } None diff --git a/moss/src/lib.rs b/moss/src/lib.rs index 1a520688..d1c300d5 100644 --- a/moss/src/lib.rs +++ b/moss/src/lib.rs @@ -11,6 +11,7 @@ pub use self::installation::Installation; pub use self::package::Package; pub use self::registry::Registry; pub use self::repository::Repository; +pub use self::state::State; pub mod client; pub mod config; @@ -21,4 +22,5 @@ pub mod package; pub mod registry; pub mod repository; mod request; +pub mod state; pub mod stone; diff --git a/moss/src/registry/mod.rs b/moss/src/registry/mod.rs index a2cfea52..5b72b682 100644 --- a/moss/src/registry/mod.rs +++ b/moss/src/registry/mod.rs @@ -102,11 +102,11 @@ impl Registry { } /// Return a new transaction for this registry initialised with the incoming package set as installed - pub async fn transaction_with_packages( + pub async fn transaction_with_installed( &self, incoming: Vec, ) -> Result, transaction::Error> { - transaction::new_with_packages(self, incoming).await + transaction::new_with_installed(self, incoming).await } } diff --git a/moss/src/registry/plugin/active.rs b/moss/src/registry/plugin/active.rs index 63d6f6c6..b1ed5b22 100644 --- a/moss/src/registry/plugin/active.rs +++ b/moss/src/registry/plugin/active.rs @@ -4,10 +4,7 @@ use log::warn; -use crate::{ - db::{self, state::State}, - package, Package, Provider, -}; +use crate::{db, package, Package, Provider, State}; // TODO: #[derive(Debug, Clone)] diff --git a/moss/src/registry/transaction.rs b/moss/src/registry/transaction.rs index 489980ba..14f4386e 100644 --- a/moss/src/registry/transaction.rs +++ b/moss/src/registry/transaction.rs @@ -53,7 +53,7 @@ pub(super) fn new(registry: &Registry) -> Result, Error> { } /// Populate the transaction on initialisation -pub(super) async fn new_with_packages( +pub(super) async fn new_with_installed( registry: &Registry, incoming: Vec, ) -> Result, Error> { diff --git a/moss/src/state.rs b/moss/src/state.rs new file mode 100644 index 00000000..102b9012 --- /dev/null +++ b/moss/src/state.rs @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: Copyright © 2020-2023 Serpent OS Developers +// +// SPDX-License-Identifier: MPL-2.0 + +use std::{fmt, io::Write}; + +use chrono::{DateTime, Utc}; +use tui::{pretty, Stylize}; + +use crate::package; + +/// Unique identifier for [`State`] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct Id(i64); + +impl From for Id { + fn from(id: i64) -> Self { + Id(id) + } +} + +impl From for i64 { + fn from(id: Id) -> Self { + id.0 + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +/// State types +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum Kind { + /// Automatically constructed state + Transaction, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct State { + /// Unique identifier for this state + pub id: Id, + /// Quick summary for the state (optional) + pub summary: Option, + /// Description for the state (optional) + pub description: Option, + /// Package IDs / selections in this state + pub packages: Vec, + /// Creation timestamp + pub created: DateTime, + /// Relevant type for this State + pub kind: Kind, +} + +pub struct ColumnDisplay<'a>(pub &'a State); + +impl<'a> pretty::ColumnDisplay for ColumnDisplay<'a> { + fn get_display_width(&self) -> usize { + const WHITESPACE: usize = 1; + + "State ".len() + self.0.id.to_string().len() + } + + fn display_column(&self, writer: &mut impl Write, col: pretty::Column, width: usize) { + let right_gap = matches!(col, pretty::Column::Last) + .then_some(" ") + .unwrap_or_default(); + + let _ = write!( + writer, + "State {}{:width$}", + self.0.id.to_string().bold(), + " ", + ); + } +} From 5b189dd53bbfe78c23af9f2a08a4a2344f99e164 Mon Sep 17 00:00:00 2001 From: Cory Forsstrom Date: Thu, 19 Oct 2023 10:56:56 -0700 Subject: [PATCH 2/2] Finish prune --- moss/src/client/mod.rs | 9 ++- moss/src/client/prune.rs | 117 +++++++++++++++++++++++++++++++++++--- moss/src/db/layout/mod.rs | 53 +++++++++++++---- moss/src/db/meta/mod.rs | 23 +++++--- moss/src/db/state/mod.rs | 52 +++++++++++++---- moss/src/package/fetch.rs | 14 +++-- 6 files changed, 226 insertions(+), 42 deletions(-) diff --git a/moss/src/client/mod.rs b/moss/src/client/mod.rs index 8024d9a3..268cbd1d 100644 --- a/moss/src/client/mod.rs +++ b/moss/src/client/mod.rs @@ -104,7 +104,14 @@ impl Client { } pub async fn prune(&self, strategy: prune::Strategy) -> Result<(), Error> { - prune(strategy, &self.state_db, &self.install_db, &self.layout_db).await?; + prune( + strategy, + &self.state_db, + &self.install_db, + &self.layout_db, + &self.installation, + ) + .await?; Ok(()) } } diff --git a/moss/src/client/prune.rs b/moss/src/client/prune.rs index b05e089e..c1ba23fc 100644 --- a/moss/src/client/prune.rs +++ b/moss/src/client/prune.rs @@ -2,13 +2,21 @@ // // SPDX-License-Identifier: MPL-2.0 -use std::collections::HashMap; +use std::{ + collections::{HashMap, HashSet}, + io, + path::Path, +}; +use futures::{stream, StreamExt, TryStreamExt}; use itertools::Itertools; use thiserror::Error; +use tokio::fs; use tui::pretty::print_to_columns; -use crate::{db, package, state}; +use crate::{db, package, state, Installation}; + +const CONCURRENT_REMOVALS: usize = 16; /// The prune strategy for removing old states #[derive(Debug, Clone, Copy)] @@ -42,13 +50,12 @@ impl Status { /// Prune old states using [`Strategy`] and garbage collect /// all cached data related to those states being removed -/// -/// TODO: Add indicatif / CLI output pub async fn prune( strategy: Strategy, state_db: &db::state::Database, install_db: &db::meta::Database, layout_db: &db::layout::Database, + installation: &Installation, ) -> Result<(), Error> { let state_ids = state_db.list_ids().await?; @@ -92,8 +99,6 @@ pub async fn prune( // Get metadata let state = state_db.get(status.id()).await?; - dbg!(&state.packages.len()); - // Increment each package state.packages.iter().for_each(|pkg| { *packages_counts.entry(pkg.clone()).or_default() += 1; @@ -124,13 +129,111 @@ pub async fn prune( .filter_map(|(pkg, count)| (count == 0).then_some(pkg)) .collect::>(); - dbg!(package_removals); + let download_hashes = stream::iter(package_removals.iter()) + .then(|id| install_db.get(id)) + .try_collect::>() + .await? + .into_iter() + .filter_map(|meta| meta.hash) + .collect::>(); + + // Remove states + state_db + .batch_remove(removals.iter().map(|state| &state.id)) + .await?; + + // Remove metadata + install_db.batch_remove(&package_removals).await?; + + // Remove layouts and compute change in file hashes + let pre_hashes = layout_db.file_hashes().await?; + layout_db.batch_remove(&package_removals).await?; + let post_hashes = layout_db.file_hashes().await?; + let asset_hashes = pre_hashes.difference(&post_hashes); + + // Remove cached assets + stream::iter(asset_hashes) + .map(|hash| async { + let hash = format!("{:02x}", *hash); + let Ok(asset) = package::fetch::asset_path(installation, &hash).await else { + return Ok(()); + }; + if fs::try_exists(&asset).await? { + fs::remove_file(&asset).await?; + } + + if let Some(parent) = asset.parent() { + remove_empty_dirs(parent, &installation.assets_path("v2")).await?; + } + + Ok(()) as Result<(), Error> + }) + .buffer_unordered(CONCURRENT_REMOVALS) + .try_collect::<()>() + .await?; + + // Remove cached downloads + stream::iter(&download_hashes) + .map(|hash| async { + let Ok(download) = package::fetch::download_path(installation, hash).await else { + return Ok(()); + }; + if fs::try_exists(&download).await? { + fs::remove_file(&download).await?; + } + + if let Some(parent) = download.parent() { + remove_empty_dirs(parent, &installation.cache_path("downloads").join("v1")).await?; + } + + Ok(()) as Result<(), Error> + }) + .buffer_unordered(CONCURRENT_REMOVALS) + .try_collect::<()>() + .await?; + + Ok(()) +} + +/// Remove all empty folders from `starting` and moving up until `root` +/// +/// `root` must be a prefix / ancestory of `starting` +async fn remove_empty_dirs(starting: &Path, root: &Path) -> Result<(), io::Error> { + if !starting.starts_with(root) || !starting.is_dir() || !root.is_dir() { + return Ok(()); + } + + let mut current = Some(starting); + + while let Some(dir) = current.take() { + if fs::try_exists(dir).await? { + let is_empty = fs::read_dir(&dir).await?.next_entry().await?.is_none(); + + if !is_empty { + return Ok(()); + } + + fs::remove_dir(&dir).await?; + } + + if let Some(parent) = dir.parent() { + if parent != root { + current = Some(parent); + } + } + } Ok(()) } #[derive(Debug, Error)] pub enum Error { + #[error("layout db: {0}")] + LayoutDB(#[from] db::layout::Error), + #[error("meta db: {0}")] + MetaDB(#[from] db::meta::Error), #[error("state db: {0}")] StateDB(#[from] db::state::Error), + #[error("io error: {0}")] + Io(#[from] io::Error), } diff --git a/moss/src/db/layout/mod.rs b/moss/src/db/layout/mod.rs index 6f4d33c9..3cd66120 100644 --- a/moss/src/db/layout/mod.rs +++ b/moss/src/db/layout/mod.rs @@ -2,6 +2,8 @@ // // SPDX-License-Identifier: MPL-2.0 +use std::collections::HashSet; + use sqlx::sqlite::SqliteConnectOptions; use sqlx::{Pool, Sqlite}; use stone::payload; @@ -87,22 +89,25 @@ impl Database { .collect()) } - pub async fn add(&self, package: package::Id, layout: payload::Layout) -> Result<(), Error> { - self.batch_add(vec![(package, layout)]).await - } - - pub async fn remove(&self, package: &package::Id) -> Result<(), Error> { - sqlx::query( + pub async fn file_hashes(&self) -> Result, Error> { + let layouts = sqlx::query_as::<_, (String,)>( " - DELETE FROM layout - WHERE package_id = ?; + SELECT DISTINCT entry_value1 + FROM layout + WHERE entry_type = 'regular'; ", ) - .bind(package.encode()) - .execute(&self.pool) + .fetch_all(&self.pool) .await?; - Ok(()) + Ok(layouts + .into_iter() + .filter_map(|(hash,)| hash.parse::().ok()) + .collect()) + } + + pub async fn add(&self, package: package::Id, layout: payload::Layout) -> Result<(), Error> { + self.batch_add(vec![(package, layout)]).await } pub async fn batch_add( @@ -150,6 +155,32 @@ impl Database { Ok(()) } + + pub async fn remove(&self, package: &package::Id) -> Result<(), Error> { + self.batch_remove(Some(package)).await + } + + pub async fn batch_remove( + &self, + packages: impl IntoIterator, + ) -> Result<(), Error> { + let mut query = sqlx::QueryBuilder::new( + " + DELETE FROM layout + WHERE package_id IN ( + ", + ); + + let mut separated = query.separated(", "); + packages.into_iter().for_each(|pkg| { + separated.push_bind(pkg.encode()); + }); + separated.push_unseparated(");"); + + query.build().execute(&self.pool).await?; + + Ok(()) + } } #[derive(Debug, Error)] diff --git a/moss/src/db/meta/mod.rs b/moss/src/db/meta/mod.rs index aee80710..50b3459c 100644 --- a/moss/src/db/meta/mod.rs +++ b/moss/src/db/meta/mod.rs @@ -5,7 +5,7 @@ use std::path::Path; use sqlx::{sqlite::SqliteConnectOptions, Acquire, Pool, Sqlite}; -use sqlx::{QueryBuilder, SqliteConnection}; +use sqlx::{Executor, QueryBuilder}; use thiserror::Error; use crate::db::Encoding; @@ -312,7 +312,7 @@ impl Database { let mut transaction = self.pool.begin().await?; // Remove package (other tables cascade) - batch_remove( + batch_remove_impl( packages.iter().map(|(id, _)| id), transaction.acquire().await?, ) @@ -438,11 +438,22 @@ impl Database { Ok(()) } + + pub async fn remove(&self, package: &package::Id) -> Result<(), Error> { + self.batch_remove(Some(package)).await + } + + pub async fn batch_remove( + &self, + packages: impl IntoIterator, + ) -> Result<(), Error> { + batch_remove_impl(packages, &self.pool).await + } } -async fn batch_remove( +async fn batch_remove_impl<'a>( packages: impl IntoIterator, - connection: &mut SqliteConnection, + connection: impl Executor<'a, Database = Sqlite>, ) -> Result<(), Error> { let mut query_builder = sqlx::QueryBuilder::new( " @@ -576,9 +587,7 @@ mod test { let fetched = database.query(Some(lookup)).await.unwrap(); assert_eq!(fetched.len(), 1); - batch_remove([&id], &mut database.pool.acquire().await.unwrap()) - .await - .unwrap(); + batch_remove_impl([&id], &database.pool).await.unwrap(); let result = database.get(&id).await; diff --git a/moss/src/db/state/mod.rs b/moss/src/db/state/mod.rs index 150b7e00..56a7abb5 100644 --- a/moss/src/db/state/mod.rs +++ b/moss/src/db/state/mod.rs @@ -113,21 +113,23 @@ impl Database { .fetch_one(transaction.acquire().await?) .await?; - transaction - .execute( - sqlx::QueryBuilder::new( - " + if !packages.is_empty() { + transaction + .execute( + sqlx::QueryBuilder::new( + " INSERT INTO state_packages (state_id, package_id, reason) ", + ) + .push_values(packages, |mut b, package| { + b.push_bind(id.0.encode()) + .push_bind(package.encode()) + .push_bind(Option::::None); + }) + .build(), ) - .push_values(packages, |mut b, package| { - b.push_bind(id.0.encode()) - .push_bind(package.encode()) - .push_bind(Option::::None); - }) - .build(), - ) - .await?; + .await?; + } transaction.commit().await?; @@ -135,6 +137,32 @@ impl Database { Ok(state) } + + pub async fn remove(&self, state: &state::Id) -> Result<(), Error> { + self.batch_remove(Some(state)).await + } + + pub async fn batch_remove( + &self, + states: impl IntoIterator, + ) -> Result<(), Error> { + let mut query = sqlx::QueryBuilder::new( + " + DELETE FROM state + WHERE id IN ( + ", + ); + + let mut separated = query.separated(", "); + states.into_iter().for_each(|id| { + separated.push_bind(id.encode()); + }); + separated.push_unseparated(");"); + + query.build().execute(&self.pool).await?; + + Ok(()) + } } #[derive(Debug, Error)] diff --git a/moss/src/package/fetch.rs b/moss/src/package/fetch.rs index e7e9555c..447381ed 100644 --- a/moss/src/package/fetch.rs +++ b/moss/src/package/fetch.rs @@ -11,6 +11,7 @@ use thiserror::Error; use tokio::{ fs::{self, File}, io::AsyncWriteExt, + runtime::Handle, task, }; use url::Url; @@ -133,6 +134,8 @@ impl Download { } } + let rt = Handle::current(); + task::spawn_blocking(move || { let content_dir = self.installation.cache_path("content"); let content_path = content_dir.join(self.id.as_ref()); @@ -168,7 +171,10 @@ impl Download { file.seek(SeekFrom::Start(idx.start))?; let mut split_file = (&mut file).take(idx.end - idx.start); - let path = asset_path(&self.installation, &format!("{:02x}", idx.digest))?; + let path = rt.block_on(asset_path( + &self.installation, + &format!("{:02x}", idx.digest), + ))?; let mut output = File::create(path)?; @@ -190,7 +196,7 @@ impl Download { } } -async fn download_path(installation: &Installation, hash: &str) -> Result { +pub async fn download_path(installation: &Installation, hash: &str) -> Result { if hash.len() < 5 { return Err(Error::MalformedHash(hash.to_string())); } @@ -208,7 +214,7 @@ async fn download_path(installation: &Installation, hash: &str) -> Result Result { +pub async fn asset_path(installation: &Installation, hash: &str) -> Result { let directory = if hash.len() >= 10 { installation .assets_path("v2") @@ -220,7 +226,7 @@ fn asset_path(installation: &Installation, hash: &str) -> Result }; if !directory.exists() { - std::fs::create_dir_all(&directory)?; + fs::create_dir_all(&directory).await?; } Ok(directory.join(hash))