Skip to content

Commit

Permalink
made max concurrent transfers a CLI option instead of a constant
Browse files Browse the repository at this point in the history
  • Loading branch information
chanderlud committed Dec 8, 2023
1 parent 759cc18 commit 8466009
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
11 changes: 9 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ const PACKET_SIZE: usize = 8 + ID_SIZE + INDEX_SIZE + TRANSFER_BUFFER_SIZE;

// how long to wait for a job to be confirmed before requeuing it
const REQUEUE_INTERVAL: Duration = Duration::from_millis(1_000);
const MAX_CONCURRENT_TRANSFERS: usize = 100;

pub mod items {
include!(concat!(env!("OUT_DIR"), "/cccp.items.rs"));
Expand All @@ -65,7 +64,7 @@ enum ErrorKind {
Parse(std::net::AddrParseError),
Decode(prost::DecodeError),
Join(tokio::task::JoinError),
Send(kanal::SendError),
Send(SendError),
}

impl From<io::Error> for Error {
Expand Down Expand Up @@ -180,6 +179,14 @@ struct Options {
)]
rate: ByteSize,

#[clap(
short,
long = "max",
help = "the maximum number of concurrent transfers",
default_value = "100"
)]
max: usize,

#[clap(help = "where to get the data from")]
source: FileLocation,

Expand Down
11 changes: 7 additions & 4 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::items::{
use crate::sender::reader::reader;
use crate::{
read_message, socket_factory, write_message, Options, Result, TransferStats, ID_SIZE,
INDEX_SIZE, MAX_CONCURRENT_TRANSFERS, MAX_RETRIES, REQUEUE_INTERVAL, TRANSFER_BUFFER_SIZE,
INDEX_SIZE, MAX_RETRIES, REQUEUE_INTERVAL, TRANSFER_BUFFER_SIZE,
};

mod reader;
Expand Down Expand Up @@ -139,6 +139,7 @@ pub(crate) async fn main(
stats.confirmed_data,
options.source.file_path,
end_receiver,
options.max,
));

let handles: Vec<_> = sockets
Expand Down Expand Up @@ -196,20 +197,22 @@ async fn sender(
}
}

#[allow(clippy::too_many_arguments)]
async fn controller(
mut control_stream: TcpStream,
mut files: Manifest,
job_sender: AsyncSender<Job>,
read: Arc<Semaphore>,
confirmed_data: Arc<AtomicUsize>,
file_path: PathBuf,
base_path: PathBuf,
end_receiver: AsyncReceiver<End>,
max: usize,
) -> Result<()> {
let mut id = 0;
let mut active = 0;

loop {
while active < MAX_CONCURRENT_TRANSFERS {
while active < max {
match files.files.remove(&id) {
None => break,
Some(file_details) => {
Expand All @@ -218,7 +221,7 @@ async fn controller(
};
write_message(&mut control_stream, &message).await?;

let file_path = file_path.join(&file_details.file_path);
let file_path = base_path.join(&file_details.file_path);

let start_index: StartIndex = read_message(&mut control_stream).await?;
confirmed_data.fetch_add(start_index.index as usize, Relaxed);
Expand Down

0 comments on commit 8466009

Please sign in to comment.