Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add multi-column support to UpdateBy RollingFormula() operator #6143

Merged
merged 17 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,16 @@ public void clear() {
final long prevHead = internalBuffer.head;
final int prevSize = size();

internalBuffer.clear();
// Reset the pointers in the ring buffer without clearing the storage array. This leaves existing `identityVal`
// entries in place for the next `evaluate()` call.
internalBuffer.head = internalBuffer.tail = 0;

calcHead = calcTail = 0;
// Reset the cleared storage entries to the identity value

// Reset the previously populated storage entries to the identity value. After this call, all entries in the
// storage buffer are `identityVal`
fillWithIdentityVal(prevHead, prevSize);

// Reset the tree buffer with the identity value
Arrays.fill(treeStorage, identityVal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,16 @@ public void clear() {
final long prevHead = internalBuffer.head;
final int prevSize = size();

internalBuffer.clear();
// Reset the pointers in the ring buffer without clearing the storage array. This leaves existing `identityVal`
// entries in place for the next `evaluate()` call.
internalBuffer.head = internalBuffer.tail = 0;

calcHead = calcTail = 0;
// Reset the cleared storage entries to the identity value

// Reset the previously populated storage entries to the identity value. After this call, all entries in the
// storage buffer are `identityVal`
fillWithIdentityVal(prevHead, prevSize);

// Reset the tree buffer with the identity value
Arrays.fill(treeStorage, identityVal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,16 @@ public void clear() {
final long prevHead = internalBuffer.head;
final int prevSize = size();

internalBuffer.clear();
// Reset the pointers in the ring buffer without clearing the storage array. This leaves existing `identityVal`
// entries in place for the next `evaluate()` call.
internalBuffer.head = internalBuffer.tail = 0;

calcHead = calcTail = 0;
// Reset the cleared storage entries to the identity value

// Reset the previously populated storage entries to the identity value. After this call, all entries in the
// storage buffer are `identityVal`
fillWithIdentityVal(prevHead, prevSize);

// Reset the tree buffer with the identity value
Arrays.fill(treeStorage, identityVal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,16 @@ public void clear() {
final long prevHead = internalBuffer.head;
final int prevSize = size();

internalBuffer.clear();
// Reset the pointers in the ring buffer without clearing the storage array. This leaves existing `identityVal`
// entries in place for the next `evaluate()` call.
internalBuffer.head = internalBuffer.tail = 0;

calcHead = calcTail = 0;
// Reset the cleared storage entries to the identity value

// Reset the previously populated storage entries to the identity value. After this call, all entries in the
// storage buffer are `identityVal`
fillWithIdentityVal(prevHead, prevSize);

// Reset the tree buffer with the identity value
Arrays.fill(treeStorage, identityVal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,16 @@ public void clear() {
final long prevHead = internalBuffer.head;
final int prevSize = size();

internalBuffer.clear();
// Reset the pointers in the ring buffer without clearing the storage array. This leaves existing `identityVal`
// entries in place for the next `evaluate()` call.
internalBuffer.head = internalBuffer.tail = 0;

calcHead = calcTail = 0;
// Reset the cleared storage entries to the identity value

// Reset the previously populated storage entries to the identity value. After this call, all entries in the
// storage buffer are `identityVal`
fillWithIdentityVal(prevHead, prevSize);

// Reset the tree buffer with the identity value
Arrays.fill(treeStorage, identityVal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,16 @@ public void clear() {
final long prevHead = internalBuffer.head;
final int prevSize = size();

internalBuffer.clear();
// Reset the pointers in the ring buffer without clearing the storage array. This leaves existing `identityVal`
// entries in place for the next `evaluate()` call.
internalBuffer.head = internalBuffer.tail = 0;

calcHead = calcTail = 0;
// Reset the cleared storage entries to the identity value

// Reset the previously populated storage entries to the identity value. After this call, all entries in the
// storage buffer are `identityVal`
fillWithIdentityVal(prevHead, prevSize);

// Reset the tree buffer with the identity value
Arrays.fill(treeStorage, identityVal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,16 @@ public void clear() {
final long prevHead = internalBuffer.head;
final int prevSize = size();

internalBuffer.clear();
// Reset the pointers in the ring buffer without clearing the storage array. This leaves existing `identityVal`
// entries in place for the next `evaluate()` call.
internalBuffer.head = internalBuffer.tail = 0;

calcHead = calcTail = 0;
// Reset the cleared storage entries to the identity value

// Reset the previously populated storage entries to the identity value. After this call, all entries in the
// storage buffer are `identityVal`
fillWithIdentityVal(prevHead, prevSize);

// Reset the tree buffer with the identity value
Arrays.fill(treeStorage, identityVal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,16 @@ public void clear() {
final long prevHead = internalBuffer.head;
final int prevSize = size();

internalBuffer.clear();
// Reset the pointers in the ring buffer without clearing the storage array. This leaves existing `identityVal`
// entries in place for the next `evaluate()` call.
internalBuffer.head = internalBuffer.tail = 0;

calcHead = calcTail = 0;
// Reset the cleared storage entries to the identity value

// Reset the previously populated storage entries to the identity value. After this call, all entries in the
// storage buffer are `identityVal`
fillWithIdentityVal(prevHead, prevSize);

// Reset the tree buffer with the identity value
Arrays.fill(treeStorage, identityVal);
}
Expand Down
50 changes: 35 additions & 15 deletions Base/src/main/java/io/deephaven/base/ringbuffer/ByteRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.deephaven.base.MathUtil;
import io.deephaven.base.verify.Assert;
import org.jetbrains.annotations.TestOnly;

import java.io.Serializable;
import java.util.NoSuchElementException;
Expand All @@ -19,7 +20,7 @@
* {@code long} values. Head and tail will not wrap around; instead we use storage arrays sized to 2^N to allow fast
* determination of storage indices through a mask operation.
*/
public class ByteRingBuffer implements Serializable {
public class ByteRingBuffer implements RingBuffer, Serializable {
static final long FIXUP_THRESHOLD = 1L << 62;
final boolean growable;
byte[] storage;
Expand All @@ -32,7 +33,7 @@ public class ByteRingBuffer implements Serializable {
*
* @param capacity minimum capacity of the ring buffer
*/
public ByteRingBuffer(int capacity) {
public ByteRingBuffer(final int capacity) {
this(capacity, true);
}

Expand All @@ -42,7 +43,7 @@ public ByteRingBuffer(int capacity) {
* @param capacity minimum capacity of ring buffer
* @param growable whether to allow growth when the buffer is full.
*/
public ByteRingBuffer(int capacity, boolean growable) {
public ByteRingBuffer(final int capacity, final boolean growable) {
Assert.leq(capacity, "ByteRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

this.growable = growable;
Expand All @@ -59,7 +60,7 @@ public ByteRingBuffer(int capacity, boolean growable) {
*
* @param increase Increase amount. The ring buffer's capacity will be increased by at least this amount.
*/
protected void grow(int increase) {
protected void grow(final int increase) {
final int size = size();
final long newCapacity = (long) storage.length + increase;
// assert that we are not asking for the impossible
Expand All @@ -83,7 +84,7 @@ protected void grow(int increase) {
*
* @param dest The destination buffer.
*/
protected void copyRingBufferToArray(byte[] dest) {
protected void copyRingBufferToArray(final byte[] dest) {
final int size = size();
final int storageHead = (int) (head & mask);

Expand All @@ -99,27 +100,35 @@ protected void copyRingBufferToArray(byte[] dest) {
System.arraycopy(storage, 0, dest, firstCopyLen, secondCopyLen);
}

@Override
public boolean isFull() {
return size() == storage.length;
}

@Override
public boolean isEmpty() {
return tail == head;
}

@Override
public int size() {
return Math.toIntExact(tail - head);
}

@Override
public int capacity() {
return storage.length;
}

@Override
public int remaining() {
return storage.length - size();
}

@Override
public void clear() {
// region object-bulk-clear
// endregion object-bulk-clear
tail = head = 0;
}

Expand All @@ -131,7 +140,7 @@ public void clear() {
* @throws UnsupportedOperationException when {@code growable} is {@code false} and buffer is full
* @return {@code true} if the byte was added successfully
*/
public boolean add(byte e) {
public boolean add(final byte e) {
if (isFull()) {
if (!growable) {
throw new UnsupportedOperationException("Ring buffer is full and growth is disabled");
Expand All @@ -151,7 +160,8 @@ public boolean add(byte e) {
* @param count the minimum number of empty entries in the buffer after this call
* @throws UnsupportedOperationException when {@code growable} is {@code false} and buffer is full
*/
public void ensureRemaining(int count) {
@Override
public void ensureRemaining(final int count) {
if (remaining() < count) {
if (!growable) {
throw new UnsupportedOperationException("Ring buffer is full and growth is disabled");
Expand All @@ -168,7 +178,7 @@ public void ensureRemaining(int count) {
*
* @param e the value to add to the buffer
*/
public void addUnsafe(byte e) {
public void addUnsafe(final byte e) {
// This is an extremely paranoid wrap check that in all likelihood will never run. With FIXUP_THRESHOLD at
// 1 << 62, and the user pushing 2^32 values per second(!), it will take 68 years to wrap this counter .
if (tail >= FIXUP_THRESHOLD) {
Expand All @@ -188,7 +198,7 @@ public void addUnsafe(byte e) {
* @param notFullResult value to return is the buffer is not full
* @return the overwritten entry if the buffer is full, the provided value otherwise
*/
public byte addOverwrite(byte e, byte notFullResult) {
public byte addOverwrite(final byte e, final byte notFullResult) {
byte val = notFullResult;
if (isFull()) {
val = remove();
Expand All @@ -204,7 +214,7 @@ public byte addOverwrite(byte e, byte notFullResult) {
* @param e the byte to be added to the buffer
* @return true if the value was added successfully, false otherwise
*/
public boolean offer(byte e) {
public boolean offer(final byte e) {
if (isFull()) {
return false;
}
Expand All @@ -218,7 +228,7 @@ public boolean offer(byte e) {
* @param count The number of elements to remove.
* @throws NoSuchElementException if the buffer is empty
*/
public byte[] remove(int count) {
public byte[] remove(final int count) {
final int size = size();
if (size < count) {
throw new NoSuchElementException();
Expand Down Expand Up @@ -264,7 +274,7 @@ public byte removeUnsafe() {
* @param onEmpty the value to return if the ring buffer is empty
* @return The removed element if the ring buffer was non-empty, otherwise the value of 'onEmpty'
*/
public byte poll(byte onEmpty) {
public byte poll(final byte onEmpty) {
if (isEmpty()) {
return onEmpty;
}
Expand All @@ -291,7 +301,7 @@ public byte element() {
* @param onEmpty the value to return if the ring buffer is empty
* @return The head element if the ring buffer is non-empty, otherwise the value of 'onEmpty'
*/
public byte peek(byte onEmpty) {
public byte peek(final byte onEmpty) {
if (isEmpty()) {
return onEmpty;
}
Expand All @@ -314,7 +324,7 @@ public byte front() {
* @throws NoSuchElementException if the buffer is empty
* @return The element at the specified offset
*/
public byte front(int offset) {
public byte front(final int offset) {
if (offset < 0 || offset >= size()) {
throw new NoSuchElementException();
}
Expand All @@ -341,7 +351,7 @@ public byte back() {
* @param onEmpty the value to return if the ring buffer is empty
* @return The tail element if the ring buffer is non-empty, otherwise the value of 'onEmpty'
*/
public byte peekBack(byte onEmpty) {
public byte peekBack(final byte onEmpty) {
if (isEmpty()) {
return onEmpty;
}
Expand Down Expand Up @@ -385,4 +395,14 @@ public void remove() {
throw new UnsupportedOperationException();
}
}

/**
* Get the storage array for this ring buffer. This is intended for testing and debugging purposes only.
*
* @return The storage array for this ring buffer.
*/
@TestOnly
public byte[] getStorage() {
return storage;
}
}
Loading