Skip to content

Commit

Permalink
Add heap impl for buffered and unbuffered channel
Browse files Browse the repository at this point in the history
Our design choice is to make all blocking and non blocking at the oogavm-machine level
  • Loading branch information
JothamWong committed Apr 10, 2024
1 parent e16c242 commit 445add1
Showing 1 changed file with 115 additions and 0 deletions.
115 changes: 115 additions & 0 deletions src/vm/oogavm-heap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ export enum Tag {
STRING,
MUTEX,
ARRAY,
BUFFERED,
UNBUFFERED,
BUFFERED_NODE,
}

export function getTagStringFromAddress(address: number): string {
Expand Down Expand Up @@ -73,6 +76,10 @@ function getTagString(tag: Tag): string {
return 'MUTEX';
case Tag.ARRAY:
return 'ARRAY';
case Tag.BUFFERED:
return 'BUFFERED';
case Tag.UNBUFFERED:
return 'UNBUFFERED';
default:
return 'UNKNOWN';
}
Expand Down Expand Up @@ -599,6 +606,113 @@ export function getArrayValueAtIndex(arrayAddress: number, idx: number): any {
return getWordOffset(arrayAddress, idx + headerSize);
}

// ********************************
// Channels
// ********************************
// If the channel is unbuffered, the sender blocks until
// the receiver has received the value. If the channel has
// a buffer, the sender blocks only until the value has been
// copied to the buffer; if the buffer is full,
// this means waiting until some receiver has retrieved a value.

// Tag.BUFFERED_CHANNEL
// Tag.UNBUFFERED_CHANNEL

// For both
// 1st word is size and tag
// 2nd word is the forwarding address
// Unbuffered channel
// 3rd word to store whether it has an element
// 4th word to store the value
// Buffered channel
// 3rd word stores current number of elements
// 3rd word onwards to store the value up till limit

// I am going to do a naive array move on pop because it's simpler to do that
// then have to worry about node pointer manipulation at the moment

export function allocateBufferedChannel(capacity: number): number {
const address = allocate(Tag.BUFFERED, capacity + headerSize + 1);
// starts with 0 size
heap.setUint32((address + headerSize) * wordSize, 0);
return address;
}

function getBufferChannelLength(address: number): number {
return getWord(address + headerSize);
}

// The checking of whether buffered channel is full is done at oogavm-machine
// so here I would still want to throw an error if it exceeds capacity
// cos then that means that no blocking had happened
function pushToBufferedChannel(address: number, value: number) {
let currentSize = getBufferChannelLength(address);
const capacity = getSize(address) - headerSize;
if (currentSize >= capacity) {
// This indicates a bug with our code and not a user error
throw new OogaError("Attempting to push onto a full buffered channel");
}
// +1 required because the 3rd word is the currentSize
// the fourth word onwards is the payload
setWord(address + headerSize + currentSize + 1, value);
heap.setUint32((address + headerSize) * wordSize, currentSize + 1);
}

// Pop the first value then copy all values -1 index
function popBufferedChannel(address: number): number {
let currentSize = getBufferChannelLength(address);
if (currentSize <= 0) {
throw new OogaError("Attempting to pop from an empty buffered channel");
}
const returnValue = getWord(address + headerSize + 1);
// copy the 2nd to nth value 1 element back
for (let i = 1; i < currentSize; i++) {
const originalValue = getWord(address + headerSize + 1 + i);
setWord(address + headerSize + i, originalValue);
}
// finally update the size
heap.setUint32((address + headerSize) * wordSize, currentSize - 1);
return returnValue;
}

export function isBufferedChannel(address: number): boolean {
return getTag(address) === Tag.BUFFERED;
}

const unbufferedCapacity = 1;

export function allocateUnbufferedChannel(): number {
return allocate(Tag.UNBUFFERED, unbufferedCapacity + headerSize + 1);
}

function getUnBufferChannelLength(address: number): number {
return getWord(address + headerSize);
}

// As usual, we expect the blocking to be done at oogavm-machine and not here
export function pushUnbufferedChannel(address: number, value: number) {
// check that the unbuffered channel is empty!
const size = getUnBufferChannelLength(address);
if (size !== 0) {
throw new OogaError("Attempting to push onto full unbuffered channel in the heap. Bug!");
}
setWord(address + headerSize + 1, value);
heap.setUint32((address + headerSize) * wordSize, 1);
}

export function popUnbufferedChannel(address: number): number {
// check that unbuffered channel is not empty!
const size = getUnBufferChannelLength(address);
if (size !== 1) {
throw new OogaError("Attempting to pop empty unbuffered channel in the heap. Bug!");
}
heap.setUint32((address + headerSize) * wordSize, 0);
return getWord(address + headerSize + 1);
}

export function isUnbufferedChannel(address: number): boolean {
return getTag(address) === Tag.UNBUFFERED;
}

// ********************************
// Debug
Expand Down Expand Up @@ -884,6 +998,7 @@ function updateReferences() {
setWord(curr + prevStackElementOffset, forwardedPrev);
}
break;
case Tag.ARRAY:
case Tag.FRAME:
case Tag.ENVIRONMENT:
case Tag.STRUCT:
Expand Down

0 comments on commit 445add1

Please sign in to comment.