We implement pipe as a File
, with a ring buffer inside.
// os/src/fs/pipe.rs
pub struct Pipe {
readable: bool,
writable: bool,
buffer: Arc<Mutex<PipeRingBuffer>>,
}
// os/src/fs/pipe.rs
const RING_BUFFER_SIZE: usize = 32;
#[derive(Copy, Clone, PartialEq)]
enum RingBufferStatus {
FULL,
EMPTY,
NORMAL,
}
pub struct PipeRingBuffer {
arr: [u8; RING_BUFFER_SIZE],
head: usize,
tail: usize,
status: RingBufferStatus,
write_end: Option<Weak<Pipe>>,
}
The write_end
filed of PipeRingBuffer
stores the weak reference counter of the writing Pipe
, so that we can recycle the buffer when all writing ends are closed.
In order to use the PipeRingBuffer
easily, we wrap some utility functions for it:
impl PipeRingBuffer {
pub fn new() -> Self {
Self {
arr: [0; RING_BUFFER_SIZE],
head: 0,
tail: 0,
status: RingBufferStatus::EMPTY,
write_end: None,
}
}
pub fn set_write_end(&mut self, write_end: &Arc<Pipe>) {
self.write_end = Some(Arc::downgrade(write_end));
}
pub fn read_byte(&mut self) -> u8 {
self.status = RingBufferStatus::NORMAL;
let c = self.arr[self.head];
self.head = (self.head + 1) % RING_BUFFER_SIZE;
if self.head == self.tail {
self.status = RingBufferStatus::EMPTY;
}
c
}
pub fn write_byte(&mut self, byte: u8) {
self.status = RingBufferStatus::NORMAL;
self.arr[self.tail] = byte;
self.tail = (self.tail + 1) % RING_BUFFER_SIZE;
if self.tail == self.head {
self.status = RingBufferStatus::FULL;
}
}
pub fn available_read(&self) -> usize {
if self.status == RingBufferStatus::EMPTY {
0
} else {
if self.tail > self.head {
self.tail - self.head
} else {
self.tail + RING_BUFFER_SIZE - self.head
}
}
}
pub fn available_write(&self) -> usize {
if self.status == RingBufferStatus::FULL {
0
} else {
RING_BUFFER_SIZE - self.available_read()
}
}
pub fn all_write_ends_closed(&self) -> bool {
self.write_end.as_ref().unwrap().upgrade().is_none()
}
}
Then we can implement File
trait for Pipe
:
impl File for Pipe {
fn readable(&self) -> bool {
self.readable
}
fn writable(&self) -> bool {
self.writable
}
fn read(&self, buf: UserBuffer) -> usize {
assert!(self.readable());
let mut buf_iter = buf.into_iter();
let mut read_size = 0usize;
loop {
let mut ring_buffer = self.buffer.exclusive_access();
let loop_read = ring_buffer.available_read();
if loop_read == 0 {
if ring_buffer.all_write_ends_closed() {
return read_size;
}
drop(ring_buffer);
suspend_current_and_run_next();
continue;
}
// read at most loop_read bytes
for _ in 0..loop_read {
if let Some(byte_ref) = buf_iter.next() {
unsafe {
*byte_ref = ring_buffer.read_byte();
}
read_size += 1;
} else {
return read_size;
}
}
}
}
fn write(&self, buf: UserBuffer) -> usize {
assert!(self.writable());
let mut buf_iter = buf.into_iter();
let mut write_size = 0usize;
loop {
let mut ring_buffer = self.buffer.exclusive_access();
let loop_write = ring_buffer.available_write();
if loop_write == 0 {
drop(ring_buffer);
suspend_current_and_run_next();
continue;
}
// write at most loop_write bytes
for _ in 0..loop_write {
if let Some(byte_ref) = buf_iter.next() {
ring_buffer.write_byte(unsafe { *byte_ref });
write_size += 1;
} else {
return write_size;
}
}
}
}
// TODO define a status for pipe
fn fstat(&self) -> Stat {
Stat::new()
}
}
Finally we add the sys_pipe
syscall:
/// os/src/fs/pipe.rs
// create a pipe and return it's read end and write end
pub fn make_pipe() -> (Arc<Pipe>, Arc<Pipe>) {
let pipe_buffer = unsafe { Arc::new(UPSafeCell::new(PipeRingBuffer::new())) };
let read_end = Arc::new(Pipe::read_end_with_buffer(pipe_buffer.clone()));
let write_end = Arc::new(Pipe::write_end_with_buffer(pipe_buffer.clone()));
pipe_buffer.exclusive_access().set_write_end(&write_end);
(read_end, write_end)
}
/// os/src/syscall/fs.rs
pub fn sys_pipe(pipe: *mut usize) -> isize {
let task = current_task().unwrap();
let token = current_user_token();
let mut inner = task.inner_exclusive_access();
let (pipe_read, pipe_write) = make_pipe();
let read_fd = inner.alloc_fd();
inner.fd_table[read_fd] = Some(pipe_read);
let write_fd = inner.alloc_fd();
inner.fd_table[write_fd] = Some(pipe_write);
*translated_refmut(token, pipe) = read_fd;
*translated_refmut(token, unsafe { pipe.add(1) }) = write_fd;
0
}