Skip to content

Commit

Permalink
[CallBatch] Allow to execute a batch in multiple sub-batches (#464)
Browse files Browse the repository at this point in the history
While using CallBatch in dex-services, I noticed that we basically always need to limit the maximum batch size and therefore implement custom logic in the consumer of this API. 

This PR moves the sub-batching logic into ethcontracts by giving `execute_all` a batch size arguments.

Unfortunately this is another breaking change (so we might have to release 0.12.0).

### Test Plan

Added a unit test.
  • Loading branch information
fleupold authored Feb 4, 2021
1 parent 6756cb4 commit 9d9215a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 40 deletions.
97 changes: 58 additions & 39 deletions ethcontract/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,40 +49,44 @@ impl<T: Web3BatchTransport> CallBatch<T> {
}
}

/// Execute and resolve all enqueued CallRequests in a single RPC call
pub async fn execute_all(self) -> Result<(), Web3Error> {
if self.requests.is_empty() {
return Ok(());
/// Execute and resolve all enqueued CallRequests in a batched RPC call, `chunk_size` requests per roundtrip.
/// Top level request failures will be forwarded to the individual requests.
pub async fn execute_all(self, batch_size: usize) {
let Self { inner, requests } = self;
let mut iterator = requests.into_iter().peekable();
while iterator.peek().is_some() {
let (requests, senders): (Vec<_>, Vec<_>) = iterator.by_ref().take(batch_size).unzip();

// Send requests in a single call
let batch_result = inner
.send_batch(requests.iter().map(|(request, block)| {
let req = helpers::serialize(request);
let block =
helpers::serialize(&block.unwrap_or_else(|| BlockNumber::Latest.into()));
let (id, request) = inner.prepare("eth_call", vec![req, block]);
(id, request)
}))
.await;

// Process results
for (i, sender) in senders.into_iter().enumerate() {
let _ = match &batch_result {
Ok(results) => sender.send(
results
.get(i)
.unwrap_or(&Err(Web3Error::Decoder(
"Batch result did not contain enough responses".to_owned(),
)))
.clone()
.and_then(helpers::decode),
),
Err(err) => sender.send(Err(Web3Error::Transport(format!(
"Batch failed with: {}",
err
)))),
};
}
}

let batch_result = self
.inner
.send_batch(self.requests.iter().map(|((request, block), _)| {
let req = helpers::serialize(request);
let block =
helpers::serialize(&block.unwrap_or_else(|| BlockNumber::Latest.into()));
let (id, request) = self.inner.prepare("eth_call", vec![req, block]);
(id, request)
}))
.await;
for (i, (_, sender)) in self.requests.into_iter().enumerate() {
let _ = match &batch_result {
Ok(results) => sender.send(
results
.get(i)
.unwrap_or(&Err(Web3Error::Decoder(
"Batch result did not contain enough responses".to_owned(),
)))
.clone()
.and_then(helpers::decode),
),
Err(err) => sender.send(Err(Web3Error::Transport(format!(
"Batch failed with: {}",
err
)))),
};
}
batch_result.map(|_| ())
}
}

Expand All @@ -107,7 +111,7 @@ mod tests {
batch.push(CallRequest::default(), None),
];

batch.execute_all().immediate().unwrap();
batch.execute_all(usize::MAX).immediate();

let results = join_all(results).immediate();
assert_eq!(results[0].clone().unwrap().0, vec![1u8]);
Expand All @@ -134,17 +138,32 @@ mod tests {
let mut batch = CallBatch::new(transport);
let call = batch.push(CallRequest::default(), None);

assert!(batch.execute_all().immediate().is_err());
batch.execute_all(usize::MAX).immediate();
match call.immediate().unwrap_err() {
Web3Error::Transport(reason) => assert!(reason.starts_with("Batch failed with:")),
_ => panic!("Wrong Error type"),
};
}

#[test]
fn doesnt_issue_request_on_empty_batch() {
let transport = TestTransport::new();
let batch = CallBatch::new(transport);
assert!(batch.execute_all().immediate().is_ok());
fn splits_batch_into_multiple_calls() {
let mut transport = TestTransport::new();
transport.add_response(json!([json!("0x01"), json!("0x02")]));
transport.add_response(json!([json!("0x03")]));

let mut batch = CallBatch::new(transport);

let results = vec![
batch.push(CallRequest::default(), None),
batch.push(CallRequest::default(), None),
batch.push(CallRequest::default(), None),
];

batch.execute_all(2).immediate();

let results = join_all(results).immediate();
assert_eq!(results[0].clone().unwrap().0, vec![1u8]);
assert_eq!(results[1].clone().unwrap().0, vec![2u8]);
assert_eq!(results[2].clone().unwrap().0, vec![3u8]);
}
}
2 changes: 1 addition & 1 deletion examples/examples/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn main() {
.view()
.batch_call(&mut batch),
];
batch.execute_all().await.unwrap();
batch.execute_all(usize::MAX).await;
for (id, call) in calls.into_iter().enumerate() {
println!("Call {} returned {}", id, call.await.unwrap());
}
Expand Down

0 comments on commit 9d9215a

Please sign in to comment.