Skip to content

Commit

Permalink
feat: support batch sizes in batch commands (#173)
Browse files Browse the repository at this point in the history
* feat: support batch sizes in batch commands

* refactor: change to progressbar
  • Loading branch information
samuelvanderwaal authored Aug 10, 2022
1 parent f177bf6 commit 4391b8b
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 13 deletions.
2 changes: 2 additions & 0 deletions src/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct BurnAllArgs {
pub keypair: Option<String>,
pub mint_list: Option<String>,
pub cache_file: Option<String>,
pub batch_size: usize,
pub retries: u8,
}

Expand Down Expand Up @@ -100,6 +101,7 @@ pub async fn burn_all(args: BurnAllArgs) -> AnyResult<()> {
mint_list: args.mint_list,
cache_file: args.cache_file,
new_value: String::new(),
batch_size: args.batch_size,
retries: args.retries,
};
BurnAll::run(args).await?;
Expand Down
44 changes: 32 additions & 12 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{anyhow, Result as AnyResult};
use async_trait::async_trait;
use indexmap::IndexMap;
use indicatif::ProgressBar;
use log::info;
use serde::{Deserialize, Serialize};
use solana_client::rpc_client::RpcClient;
Expand All @@ -9,9 +10,9 @@ use std::fs::{File, OpenOptions};
use std::path::Path;
use std::sync::Arc;
use std::{io::Write, ops::Deref};
use tokio::sync::Semaphore;

use crate::errors::ActionError;
use crate::spinner::create_alt_spinner;

#[derive(Debug, Deserialize, Serialize)]
pub struct Cache(pub IndexMap<String, CacheItem>);
Expand Down Expand Up @@ -73,6 +74,7 @@ pub struct BatchActionArgs {
pub mint_list: Option<String>,
pub cache_file: Option<String>,
pub new_value: String,
pub batch_size: usize,
pub retries: u8,
}

Expand Down Expand Up @@ -135,37 +137,55 @@ pub trait Action {
} else {
keypair.clone()
};
let semaphore = Arc::new(Semaphore::new(args.batch_size as usize));

loop {
let remaining_mints = mint_list.clone();

info!("Sending network requests...");
let spinner = create_alt_spinner("Sending network requests....");
let mut update_tasks = Vec::new();
println!("Updating NFTs. . .");
let pb = ProgressBar::new(remaining_mints.len() as u64);

// Create a vector of futures to execute.
let update_tasks: Vec<_> = remaining_mints
.into_iter()
.map(|mint_address| {
tokio::spawn(Self::action(RunActionArgs {
for mint_address in remaining_mints {
// Limit total number of concurrent requests to args.batch_size.
let permit = semaphore.clone().acquire_owned().await.unwrap();

// Create task to run the action in a separate thread.
let task = tokio::spawn({
let fut = Self::action(RunActionArgs {
client: client.clone(),
keypair: keypair.clone(),
payer: payer.clone(),
mint_account: mint_address,
new_value: args.new_value.clone(),
}))
})
.collect();
spinner.finish();
});

// Increment the counter and update the progress bar.
// pb.inc(1);

// Move the permit into the thread to take ownership of it and then drop it
// when the future is complete.
drop(permit);

fut
});

// Collect all the tasks in our futures vector.
update_tasks.push(task);
}

let update_tasks_len = update_tasks.len();

// Wait for all the tasks to resolve and push the results to our results vector
let mut update_results = Vec::new();
let spinner = create_alt_spinner("Awaiting results...");
for task in update_tasks {
update_results.push(task.await.unwrap());
// Increment the counter and update the progress bar.
pb.inc(1);
}
spinner.finish();
pb.finish_and_clear();

// Partition migration results.
let (_update_successful, update_failed): (CacheResults, CacheResults) =
Expand Down
3 changes: 3 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub const PUBLIC_RPC_URLS: &[&str] = &[

pub const DEFAULT_RPC_DELAY_MS: u32 = 200;

// This is a str so it can be used in Structopt arguments
pub const DEFAULT_BATCH_SIZE: &str = "1000";

pub const ERROR_FILE_BEGIN: &str = r#"#![allow(unused)]
use phf::phf_map;
Expand Down
32 changes: 31 additions & 1 deletion src/opt.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use structopt::StructOpt;

use crate::{collections::GetCollectionItemsMethods, data::Indexers};
use crate::{
collections::GetCollectionItemsMethods, constants::DEFAULT_BATCH_SIZE, data::Indexers,
};

#[derive(Debug, StructOpt)]
#[structopt(name = "Metaboss", about = "Metaplex NFT 'Swiss Army Knife' tool.")]
Expand Down Expand Up @@ -124,6 +126,10 @@ pub enum BurnSubcommands {
#[structopt(short, long)]
cache_file: Option<String>,

/// Maximum number of concurrent requests
#[structopt(short, long, default_value = DEFAULT_BATCH_SIZE)]
batch_size: usize,

/// Maximum retries: retry failed items up to this many times.
#[structopt(long, default_value = "1")]
retries: u8,
Expand Down Expand Up @@ -583,6 +589,10 @@ pub enum SetSubcommands {
#[structopt(short, long)]
cache_file: Option<String>,

/// Maximum number of concurrent requests
#[structopt(short, long, default_value = DEFAULT_BATCH_SIZE)]
batch_size: usize,

/// Maximum retries: retry failed items up to this many times.
#[structopt(long, default_value = "1")]
retries: u8,
Expand Down Expand Up @@ -629,6 +639,10 @@ pub enum SetSubcommands {
#[structopt(short, long)]
cache_file: Option<String>,

/// Maximum number of concurrent requests
#[structopt(short, long, default_value = DEFAULT_BATCH_SIZE)]
batch_size: usize,

/// Maximum retries: retry failed items up to this many times.
#[structopt(long, default_value = "1")]
retries: u8,
Expand Down Expand Up @@ -657,6 +671,10 @@ pub enum SetSubcommands {
#[structopt(short, long)]
cache_file: Option<String>,

/// Maximum number of concurrent requests
#[structopt(short, long, default_value = DEFAULT_BATCH_SIZE)]
batch_size: usize,

/// Maximum retries: retry failed items up to this many times.
#[structopt(long, default_value = "1")]
retries: u8,
Expand Down Expand Up @@ -840,6 +858,10 @@ pub enum UpdateSubcommands {
#[structopt(short, long)]
new_sfbp: u16,

/// Maximum number of concurrent requests
#[structopt(short, long, default_value = DEFAULT_BATCH_SIZE)]
batch_size: usize,

/// Maximum retries: retry failed items up to this many times.
#[structopt(long, default_value = "1")]
retries: u8,
Expand Down Expand Up @@ -892,6 +914,10 @@ pub enum UpdateSubcommands {
#[structopt(short, long)]
new_symbol: String,

/// Maximum number of concurrent requests
#[structopt(short, long, default_value = DEFAULT_BATCH_SIZE)]
batch_size: usize,

/// Maximum retries: retry failed items up to this many times.
#[structopt(long, default_value = "1")]
retries: u8,
Expand Down Expand Up @@ -938,6 +964,10 @@ pub enum UpdateSubcommands {
#[structopt(short = "A", long = "append")]
append: bool,

/// Maximum number of concurrent requests
#[structopt(short, long, default_value = DEFAULT_BATCH_SIZE)]
batch_size: usize,

/// Maximum retries: retry failed items up to this many times.
#[structopt(long, default_value = "1")]
retries: u8,
Expand Down
14 changes: 14 additions & 0 deletions src/process_subcommands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,15 @@ pub async fn process_burn(client: RpcClient, commands: BurnSubcommands) -> Resul
keypair,
mint_list,
cache_file,
batch_size,
retries,
} => {
burn_all(BurnAllArgs {
client,
keypair,
mint_list,
cache_file,
batch_size,
retries,
})
.await
Expand Down Expand Up @@ -293,13 +295,15 @@ pub async fn process_set(client: RpcClient, commands: SetSubcommands) -> Result<
keypair,
mint_list,
cache_file,
batch_size,
retries,
} => {
set_primary_sale_happened_all(SetPrimarySaleHappenedAllArgs {
client,
keypair,
mint_list,
cache_file,
batch_size,
retries,
})
.await
Expand All @@ -322,6 +326,7 @@ pub async fn process_set(client: RpcClient, commands: SetSubcommands) -> Result<
mint_list,
new_authority,
cache_file,
batch_size,
retries,
} => {
set_update_authority_all(SetUpdateAuthorityAllArgs {
Expand All @@ -331,6 +336,7 @@ pub async fn process_set(client: RpcClient, commands: SetSubcommands) -> Result<
mint_list,
new_authority,
cache_file,
batch_size,
retries,
})
.await
Expand All @@ -342,13 +348,15 @@ pub async fn process_set(client: RpcClient, commands: SetSubcommands) -> Result<
keypair,
mint_list,
cache_file,
batch_size,
retries,
} => {
set_immutable_all(SetImmutableAllArgs {
client,
keypair,
mint_list,
cache_file,
batch_size,
retries,
})
.await
Expand Down Expand Up @@ -439,6 +447,7 @@ pub async fn process_update(client: RpcClient, commands: UpdateSubcommands) -> R
mint_list,
cache_file,
new_sfbp,
batch_size,
retries,
} => {
update_seller_fee_basis_points_all(UpdateSellerFeeBasisPointsAllArgs {
Expand All @@ -447,6 +456,7 @@ pub async fn process_update(client: RpcClient, commands: UpdateSubcommands) -> R
mint_list,
cache_file,
new_sfbp,
batch_size,
retries,
})
.await
Expand All @@ -466,6 +476,7 @@ pub async fn process_update(client: RpcClient, commands: UpdateSubcommands) -> R
mint_list,
cache_file,
new_symbol,
batch_size,
retries,
} => {
update_symbol_all(UpdateSymbolAllArgs {
Expand All @@ -474,6 +485,7 @@ pub async fn process_update(client: RpcClient, commands: UpdateSubcommands) -> R
mint_list,
cache_file,
new_symbol,
batch_size,
retries,
})
.await
Expand All @@ -490,6 +502,7 @@ pub async fn process_update(client: RpcClient, commands: UpdateSubcommands) -> R
cache_file,
new_creators,
append,
batch_size,
retries,
} => {
update_creator_all(UpdateCreatorAllArgs {
Expand All @@ -499,6 +512,7 @@ pub async fn process_update(client: RpcClient, commands: UpdateSubcommands) -> R
cache_file,
new_creators,
should_append: append,
batch_size,
retries,
})
.await
Expand Down
2 changes: 2 additions & 0 deletions src/update/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub struct UpdateCreatorAllArgs {
pub cache_file: Option<String>,
pub new_creators: String,
pub should_append: bool,
pub batch_size: usize,
pub retries: u8,
}

Expand All @@ -143,6 +144,7 @@ pub async fn update_creator_all(args: UpdateCreatorAllArgs) -> AnyResult<()> {
mint_list: args.mint_list,
cache_file: args.cache_file,
new_value: args.new_creators,
batch_size: args.batch_size,
retries: args.retries,
};
UpdateCreatorAll::run(args).await?;
Expand Down
2 changes: 2 additions & 0 deletions src/update/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct SetImmutableAllArgs {
pub keypair: Option<String>,
pub mint_list: Option<String>,
pub cache_file: Option<String>,
pub batch_size: usize,
pub retries: u8,
}

Expand Down Expand Up @@ -117,6 +118,7 @@ pub async fn set_immutable_all(args: SetImmutableAllArgs) -> AnyResult<()> {
mint_list: args.mint_list,
cache_file: args.cache_file,
new_value: "".to_string(),
batch_size: args.batch_size,
retries: args.retries,
};
SetImmutableAll::run(args).await?;
Expand Down
2 changes: 2 additions & 0 deletions src/update/primary_sale_happened.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub struct SetPrimarySaleHappenedAllArgs {
pub keypair: Option<String>,
pub mint_list: Option<String>,
pub cache_file: Option<String>,
pub batch_size: usize,
pub retries: u8,
}

Expand Down Expand Up @@ -116,6 +117,7 @@ pub async fn set_primary_sale_happened_all(args: SetPrimarySaleHappenedAllArgs)
mint_list: args.mint_list,
cache_file: args.cache_file,
new_value: "".to_string(),
batch_size: args.batch_size,
retries: args.retries,
};
SetPrimarySaleHappenedAll::run(args).await?;
Expand Down
2 changes: 2 additions & 0 deletions src/update/seller_fee_basis_points.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct UpdateSellerFeeBasisPointsAllArgs {
pub mint_list: Option<String>,
pub cache_file: Option<String>,
pub new_sfbp: u16,
pub batch_size: usize,
pub retries: u8,
}

Expand Down Expand Up @@ -109,6 +110,7 @@ pub async fn update_seller_fee_basis_points_all(
mint_list: args.mint_list,
cache_file: args.cache_file,
new_value: args.new_sfbp.to_string(),
batch_size: args.batch_size,
retries: args.retries,
};
UpdateSellerFeeBasisPointsAll::run(args).await?;
Expand Down
2 changes: 2 additions & 0 deletions src/update/symbol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub struct UpdateSymbolAllArgs {
pub mint_list: Option<String>,
pub cache_file: Option<String>,
pub new_symbol: String,
pub batch_size: usize,
pub retries: u8,
}

Expand Down Expand Up @@ -97,6 +98,7 @@ pub async fn update_symbol_all(args: UpdateSymbolAllArgs) -> AnyResult<()> {
mint_list: args.mint_list,
cache_file: args.cache_file,
new_value: args.new_symbol,
batch_size: args.batch_size,
retries: args.retries,
};
UpdateSymbolAll::run(args).await?;
Expand Down
Loading

0 comments on commit 4391b8b

Please sign in to comment.