Skip to content

Commit

Permalink
Reduce uneeded buffer copies in producer threads
Browse files Browse the repository at this point in the history
  • Loading branch information
althonos committed Oct 5, 2023
1 parent 25881e6 commit 0731da2
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions src/parser/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<B: BufRead + Send + 'static> Producer<B> {
threads,
handle: None,
alive: Arc::new(AtomicBool::new(false)),
buffer_size: 65536, //8192,
buffer_size: 8192,
}
}

Expand All @@ -51,7 +51,6 @@ impl<B: BufRead + Send + 'static> Producer<B> {

self.handle = Some(std::thread::spawn(move || {
let mut buffer = vec![0; buffer_size];
let mut buffer_start = 0;
let mut buffer_end = 0;

loop {
Expand All @@ -69,23 +68,22 @@ impl<B: BufRead + Send + 'static> Producer<B> {
.map(|y| y + i)
.find(|&y| buffer[..=y].ends_with(b"</entry>"))
{
s_text
.send(Some(Ok(Buffer {
data: buffer[i..j].to_vec(),
})))
.ok();
buffer_start = j + 1;
} else if n == 0 {
// create a new buffer and copy only remainer of the current one
let mut new_buffer = vec![0; buffer.len()];
new_buffer[0..buffer_end - j - 1]
.copy_from_slice(&buffer[j + 1..buffer_end]);
// truncate the current buffer and send it to a consumer
buffer.truncate(j + 1);
s_text.send(Some(Ok(Buffer { data: buffer }))).ok();
// update buffer and buffer boundary
buffer = new_buffer;
buffer_end -= j + 1;
} else if n == 0 && buffer_end != 0 {
let name = String::from("entry");
let err = Error::from(XmlError::UnexpectedEof(name));
s_text.send(Some(Err(err))).ok();
}
}
if buffer_start > 0 {
buffer.copy_within(buffer_start..buffer_end, 0);
buffer_end -= buffer_start;
buffer_start = 0;
}
if buffer_end == buffer.len() {
buffer.resize(buffer.len() * 2, 0);
}
Expand Down

0 comments on commit 0731da2

Please sign in to comment.