-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.rs
65 lines (53 loc) · 1.76 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::fs::File;
use std::path::Path;
use std::sync::mpsc::channel;
use std::{thread, thread::available_parallelism};
use std::time::Instant;
use ahash::AHashMap;
use memmap::Mmap;
use one_brc::{FILE_PATH, LINE_MAX_LEN, process_block, result::TemperStatResult};
fn main() {
let start = Instant::now();
let cpu = available_parallelism().unwrap();
let file = File::open(Path::new(FILE_PATH)).unwrap();
let mmap = unsafe { Mmap::map(&file).unwrap() };
let len = mmap.len();
let size = len / cpu + LINE_MAX_LEN;
thread::scope(|s| {
let mut rest = mmap.as_ref();
let mut threads= AHashMap::new();
let (tx, rx) = channel::<i32>();
let mut result = TemperStatResult::new();
let mut id = 0;
while !rest.is_empty() {
let mut l = if rest.len() > size { size } else { rest.len() };
let mut i = l;
while i > 0 && rest[i - 1] != b'\n' {
i -= 1;
}
if i != 0 { l = i };
let (cur, tail) = rest.split_at(l);
let h = s.spawn({
let tx = tx.clone();
move || {
let (map, lines, errors) = process_block(cur, 7000);
tx.send(id).unwrap();
(map, lines, errors)
}
});
threads.insert(id, h);
rest = tail;
id += 1;
}
while !threads.is_empty() {
let id = rx.recv().unwrap();
if let Some(h) = threads.remove(&id) {
if let Ok((map, _, _)) = h.join() {
result.aggregate(&map);
}
}
}
println!("{result}");
});
eprintln!("elapsed: {:?}", start.elapsed());
}