Skip to content

Commit

Permalink
New lock-free append-only vector
Browse files Browse the repository at this point in the history
  • Loading branch information
Avarel committed Dec 26, 2023
1 parent 3480190 commit d0e3389
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 7 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ a plethora of other features that I couldn't find in any other pager.
## Features

### In-Progress or Planned
| Feature | Description | Progress |
| ------------------ | ----------------------------------------------------------- | -------------- |
| Mouse Support | Use mouse to interact with the TUI. | In Development |
| Filter Jumping | Jump to the next or previous line that matches a filter. | Planned |
| Custom Keybindings | Customize the keybindings of the program. | Planned |
| Word-Wrapping | Wrap long lines. | Planned |
| Export Output | Export selected data to a file or pipe it to a new command. | Planned |
| Feature | Description | Progress |
| ------------------ | ----------------------------------------------------------- | -------- |
| Filter Jumping | Jump to the next or previous line that matches a filter. | Planned |
| Custom Keybindings | Customize the keybindings of the program. | Planned |
| Word-Wrapping | Wrap long lines. | Planned |
| Export Output | Export selected data to a file or pipe it to a new command. | Planned |

### Basic Support
| Feature | Description | Progress |
Expand All @@ -45,6 +44,7 @@ a plethora of other features that I couldn't find in any other pager.
| Status Bar | View current state of the pager. | Basic |
| Horizontal Scrolling | Pan the view horizontally. | Basic |
| Commands | Use modal commands to interact with the pager. | Basic |
| Mouse Support | Use mouse to interact with the TUI. | Done |
| Regex Filters | Select and disable additive search filters. | Done |
| Multiplexing | View multiple files through tabs or windows. | Done |
| Follow Output | Constantly scroll down as new data is loaded. | Done |
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ memchr = "2.6"
memmap2 = "0.9"
lru = "0.12.0"
regex = "1.10"
arc-swap = "1.6"

[dev-dependencies]
tempfile = "3.8"
Expand Down
303 changes: 303 additions & 0 deletions crates/core/src/cowvec2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
use std::{
alloc::{self, Layout},
marker::PhantomData,
ops::Deref,
ptr::NonNull,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

use arc_swap::ArcSwap;

struct RawBuf<T> {
ptr: NonNull<T>,
len: AtomicUsize,
cap: usize,
}

impl<T> RawBuf<T> {
#[inline]
const fn empty() -> Self {
Self::new(std::ptr::NonNull::dangling(), 0, 0)
}

#[inline]
const fn new(ptr: NonNull<T>, len: usize, cap: usize) -> Self {
Self {
ptr,
len: AtomicUsize::new(len),
cap,
}
}
}

impl<T> Deref for RawBuf<T> {
type Target = NonNull<T>;

#[inline]
fn deref(&self) -> &Self::Target {
&self.ptr
}
}

unsafe impl<T: Send> Send for RawBuf<T> {}
unsafe impl<T: Sync> Sync for RawBuf<T> {}

impl<T> Drop for RawBuf<T> {
fn drop(&mut self) {
let cap = self.cap;
if cap != 0 {
// Safety: we are the last owner, we can do a relaxed read of len
unsafe {
std::ptr::drop_in_place(std::ptr::slice_from_raw_parts_mut(
self.ptr.as_ptr(),
self.len.load(Ordering::Relaxed),
));
}
unsafe {
alloc::dealloc(
self.ptr.as_ptr().cast::<u8>(),
Layout::array::<T>(cap).unwrap(),
);
}
}
}
}

pub struct CowVecWriter<T> {
buf: Arc<ArcSwap<RawBuf<T>>>,
}

impl<T> CowVecWriter<T>
where
T: Copy,
{
/// Appends an element to the back of this collection.
///
/// If the collection is in a borrowed state, it will copy the data
/// underlying the borrowed state and transition to an owned state.
pub fn push(&mut self, elem: T) {
let buf = self.buf.load();
let len = buf.len.load(Ordering::SeqCst);
let cap = buf.cap;
if len == cap {
// Safety: If this runs, then buf will no longer be borrowed from
push_inner(&self.grow(), len, elem)
} else {
push_inner(&buf, len, elem)
}

#[inline(always)]
fn push_inner<T: Copy>(buf: &RawBuf<T>, len: usize, elem: T) {
unsafe { std::ptr::write_volatile(buf.ptr.as_ptr().add(len), elem) }

// Can't fail, we'll OOM first.
// There should be no other writers, but lets be safe.
buf.len.store(len + 1, Ordering::Release);
}
}

/// Grow will return a buffer that the caller can write to.
fn grow(&mut self) -> Arc<RawBuf<T>> {
let buf = self.buf.load();
let len = buf.len.load(Ordering::SeqCst);
let cap = buf.cap;

// since we set the capacity to usize::MAX when T has size 0,
// getting to here necessarily means the Vec is overfull.
assert!(std::mem::size_of::<T>() != 0, "capacity overflow");

let (new_cap, new_layout) = if cap == 0 {
(1, Layout::array::<T>(1).unwrap())
} else {
// This can't overflow because we ensure self.cap <= isize::MAX.
let new_cap = 2 * cap;

// `Layout::array` checks that the number of bytes is <= usize::MAX,
// but this is redundant since old_layout.size() <= isize::MAX,
// so the `unwrap` should never fail.
let new_layout = Layout::array::<T>(new_cap).unwrap();
(new_cap, new_layout)
};

// Ensure that the new allocation doesn't exceed `isize::MAX` bytes.
assert!(
new_layout.size() <= isize::MAX as usize,
"Allocation too large"
);

let new_ptr = if cap == 0 {
unsafe { alloc::alloc(new_layout) }
} else {
let old_ptr = buf.ptr.as_ptr().cast::<u8>();
// Cannot use realloc here since it may drop the old pointer
let new_ptr = unsafe { alloc::alloc(new_layout) };
if NonNull::new(new_ptr.cast::<T>()).is_none() {
alloc::handle_alloc_error(new_layout)
}
// This is fine since our elements are Copy
let old_layout_len = Layout::array::<T>(len).unwrap();
unsafe { std::ptr::copy_nonoverlapping(old_ptr, new_ptr, old_layout_len.size()) };
new_ptr
};

// If allocation fails, `new_ptr` will be null, in which case we abort.
match NonNull::new(new_ptr.cast::<T>()) {
Some(new_ptr) => {
debug_assert_ne!(new_ptr, buf.ptr);
let ret = Arc::new(RawBuf::new(new_ptr, len, new_cap));
self.buf.store(ret.clone());
ret
}
None => alloc::handle_alloc_error(new_layout),
}
}
}

#[derive(Clone)]
pub struct CowVec<T> {
buf: Arc<ArcSwap<RawBuf<T>>>,
}

impl<T> CowVec<T> {
/// Constructs a new, empty `CowVec<T>`.
///
/// The vector will not allocate until elements are pushed onto it.
#[inline]
pub fn new() -> (Self, CowVecWriter<T>) {
assert!(std::mem::size_of::<T>() != 0);
let buf = Arc::new(ArcSwap::new(Arc::new(RawBuf::empty())));
(Self { buf: buf.clone() }, CowVecWriter { buf })
}

pub fn len(&self) -> usize {
self.read(|slice| slice.len())
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn read<F, R>(&self, cb: F) -> R
where
F: FnOnce(&[T]) -> R,
{
let buf = self.buf.load();
let len = buf.len.load(Ordering::SeqCst);
cb(unsafe { std::slice::from_raw_parts(buf.as_ptr(), len) })
}

pub fn snapshot(&self) -> CowVecSnapshot<'_, T> {
CowVecSnapshot {
buf: self.buf.load_full(),
_phantom: PhantomData,
}
}
}

impl<T> CowVec<T>
where
T: Copy,
{
pub fn get(&self, index: usize) -> Option<T> {
self.read(|slice| slice.get(index).copied())
}
}

struct CowVecSnapshot<'a, T> {
buf: Arc<RawBuf<T>>,
_phantom: PhantomData<&'a ()>,
}

impl<T> CowVecSnapshot<'_, T>
where
T: Copy,
{
pub fn get(&self, index: usize) -> Option<T> {
self.deref().get(index).copied()
}
}

impl<T> Deref for CowVecSnapshot<'_, T> {
type Target = [T];

#[inline]
fn deref(&self) -> &Self::Target {
let buf = &self.buf;
let len = buf.len.load(Ordering::SeqCst);
unsafe { std::slice::from_raw_parts(buf.as_ptr(), len) }
}
}

#[cfg(test)]
mod test {
use super::CowVec;

#[test]
fn test_miri_push_and_access() {
let (arr, mut writer) = CowVec::new();
for i in 0..10000 {
writer.push(i);
}
for i in 0..10000 {
assert_eq!(Some(i), arr.get(i));
}
}

#[test]
fn test_miri_push_and_concurrent_clone() {
let (arr, mut writer) = CowVec::new();
let handle = std::thread::spawn({
move || {
for _ in 0..10 {
for i in 0..1000 {
writer.push(i);
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
});

while !handle.is_finished() {
let slice = arr.snapshot();
for i in slice.iter().copied() {
assert_eq!(i, slice[i]);
}
}

handle.join().unwrap();
}

#[test]
fn test_miri_clone() {
let (arr, mut writer) = CowVec::new();
for i in 0..10 {
writer.push(i);
}
let cloned_arr = arr.clone();
assert_eq!(arr.len(), cloned_arr.len());
for i in 0..10 {
assert_eq!(arr.get(i), cloned_arr.get(i));
}
writer.push(10);
assert_eq!(arr.get(10), cloned_arr.get(10));
assert_eq!(arr.len(), cloned_arr.len());
}

#[test]
fn test_miri_deref() {
let (arr, mut writer) = CowVec::new();
for i in 0..10 {
writer.push(i);
}
let snap = arr.snapshot();
let slice: &[i32] = &snap;
assert_eq!(slice.len(), arr.len());
for i in 0..10 {
assert_eq!(slice.get(i).copied(), arr.get(i));
assert_eq!(snap.get(i), arr.get(i));
}
}
}
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod buf;

pub mod components;
pub mod cowvec;
mod cowvec2;
pub mod err;

pub use buf::{segment::SegStr, SegBuffer};
Expand Down

0 comments on commit d0e3389

Please sign in to comment.