Skip to content

Commit

Permalink
JSON configuration and Jackson Streaming Object Processor (#5225)
Browse files Browse the repository at this point in the history
This PR adds a declarative JSON configuration object that allows users to specify the schema of a JSON message. It is meant to have good out-of-the-box defaults, while still allowing power users to modify some of the finer parsing details. The JSON configuration layer is not tied to any specific implementation; it is introspectible, and could have alternative implementations with other parsing backends.

Out of the box, there's an ObjectProcessor implementation based on the Jackson streaming APIs; that is, the data flows from byte[]s (or InputStream, relevant for very-large-files) to the output WritableChunks without the need for the intermediating Jackson databind layer (TreeNode). This saves a large layer of allocation that our current kafka json_spec layer relies upon. The ObjectProcessor layer means that this can be used in other places that expose ObjectProcessor layers and want 1-to-1 record-to-row (currently, Kafka).

Part of #5222
  • Loading branch information
devinrsmith committed Jun 18, 2024
1 parent 9c3427c commit 7150848
Show file tree
Hide file tree
Showing 142 changed files with 13,949 additions and 152 deletions.
37 changes: 37 additions & 0 deletions Base/src/main/java/io/deephaven/base/MathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
*/
public class MathUtil {

/**
* The maximum power of 2.
*/
public static final int MAX_POWER_OF_2 = 1 << 30;

/**
* Compute ceil(log2(x)). See {@link Integer#numberOfLeadingZeros(int)}.
*
Expand Down Expand Up @@ -108,4 +113,36 @@ public static int base10digits(int n) {
}
return base10guess;
}

/**
* Rounds up to the next power of 2 for {@code x}; if {@code x} is already a power of 2, {@code x} will be returned.
* Values outside the range {@code 1 <= x <= MAX_POWER_OF_2} will return {@code 1}.
*
* <p>
* Equivalent to {@code Math.max(Integer.highestOneBit(x - 1) << 1, 1)}.
*
* @param x the value
* @return the next power of 2 for {@code x}
* @see #MAX_POWER_OF_2
*/
public static int roundUpPowerOf2(int x) {
return Math.max(Integer.highestOneBit(x - 1) << 1, 1);
}

/**
* Rounds up to the next power of 2 for {@code size <= MAX_POWER_OF_2}, otherwise returns
* {@link ArrayUtil#MAX_ARRAY_SIZE}.
*
* <p>
* Equivalent to {@code size <= MAX_POWER_OF_2 ? roundUpPowerOf2(size) : ArrayUtil.MAX_ARRAY_SIZE}.
*
* @param size the size
* @return the
* @see #MAX_POWER_OF_2
* @see #roundUpPowerOf2(int)
* @see ArrayUtil#MAX_ARRAY_SIZE
*/
public static int roundUpArraySize(int size) {
return size <= MAX_POWER_OF_2 ? roundUpPowerOf2(size) : ArrayUtil.MAX_ARRAY_SIZE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// @formatter:off
package io.deephaven.base.ringbuffer;

import io.deephaven.base.ArrayUtil;
import io.deephaven.base.MathUtil;
import io.deephaven.base.verify.Assert;

import java.io.Serializable;
Expand All @@ -20,8 +20,6 @@
* determination of storage indices through a mask operation.
*/
public class ByteRingBuffer implements Serializable {
/** Maximum capacity is the highest power of two that can be allocated (i.e. <= than ArrayUtil.MAX_ARRAY_SIZE). */
static final int RING_BUFFER_MAX_CAPACITY = Integer.highestOneBit(ArrayUtil.MAX_ARRAY_SIZE);
static final long FIXUP_THRESHOLD = 1L << 62;
final boolean growable;
byte[] storage;
Expand All @@ -45,21 +43,13 @@ public ByteRingBuffer(int capacity) {
* @param growable whether to allow growth when the buffer is full.
*/
public ByteRingBuffer(int capacity, boolean growable) {
Assert.leq(capacity, "ByteRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(capacity, "ByteRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

this.growable = growable;

// use next larger power of 2 for our storage
final int newCapacity;
if (capacity < 2) {
// sensibly handle the size=0 and size=1 cases
newCapacity = 1;
} else {
newCapacity = Integer.highestOneBit(capacity - 1) << 1;
}

// reset the data structure members
storage = new byte[newCapacity];
storage = new byte[MathUtil.roundUpPowerOf2(capacity)];
mask = storage.length - 1;
tail = head = 0;
}
Expand All @@ -73,9 +63,9 @@ protected void grow(int increase) {
final int size = size();
final long newCapacity = (long) storage.length + increase;
// assert that we are not asking for the impossible
Assert.leq(newCapacity, "ByteRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(newCapacity, "ByteRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

final byte[] newStorage = new byte[Integer.highestOneBit((int) newCapacity - 1) << 1];
final byte[] newStorage = new byte[MathUtil.roundUpPowerOf2((int) newCapacity)];

// move the current data to the new buffer
copyRingBufferToArray(newStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
package io.deephaven.base.ringbuffer;

import io.deephaven.base.ArrayUtil;
import io.deephaven.base.MathUtil;
import io.deephaven.base.verify.Assert;

import java.io.Serializable;
Expand All @@ -16,8 +16,6 @@
* determination of storage indices through a mask operation.
*/
public class CharRingBuffer implements Serializable {
/** Maximum capacity is the highest power of two that can be allocated (i.e. <= than ArrayUtil.MAX_ARRAY_SIZE). */
static final int RING_BUFFER_MAX_CAPACITY = Integer.highestOneBit(ArrayUtil.MAX_ARRAY_SIZE);
static final long FIXUP_THRESHOLD = 1L << 62;
final boolean growable;
char[] storage;
Expand All @@ -41,21 +39,13 @@ public CharRingBuffer(int capacity) {
* @param growable whether to allow growth when the buffer is full.
*/
public CharRingBuffer(int capacity, boolean growable) {
Assert.leq(capacity, "CharRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(capacity, "CharRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

this.growable = growable;

// use next larger power of 2 for our storage
final int newCapacity;
if (capacity < 2) {
// sensibly handle the size=0 and size=1 cases
newCapacity = 1;
} else {
newCapacity = Integer.highestOneBit(capacity - 1) << 1;
}

// reset the data structure members
storage = new char[newCapacity];
storage = new char[MathUtil.roundUpPowerOf2(capacity)];
mask = storage.length - 1;
tail = head = 0;
}
Expand All @@ -69,9 +59,9 @@ protected void grow(int increase) {
final int size = size();
final long newCapacity = (long) storage.length + increase;
// assert that we are not asking for the impossible
Assert.leq(newCapacity, "CharRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(newCapacity, "CharRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

final char[] newStorage = new char[Integer.highestOneBit((int) newCapacity - 1) << 1];
final char[] newStorage = new char[MathUtil.roundUpPowerOf2((int) newCapacity)];

// move the current data to the new buffer
copyRingBufferToArray(newStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// @formatter:off
package io.deephaven.base.ringbuffer;

import io.deephaven.base.ArrayUtil;
import io.deephaven.base.MathUtil;
import io.deephaven.base.verify.Assert;

import java.io.Serializable;
Expand All @@ -20,8 +20,6 @@
* determination of storage indices through a mask operation.
*/
public class DoubleRingBuffer implements Serializable {
/** Maximum capacity is the highest power of two that can be allocated (i.e. <= than ArrayUtil.MAX_ARRAY_SIZE). */
static final int RING_BUFFER_MAX_CAPACITY = Integer.highestOneBit(ArrayUtil.MAX_ARRAY_SIZE);
static final long FIXUP_THRESHOLD = 1L << 62;
final boolean growable;
double[] storage;
Expand All @@ -45,21 +43,13 @@ public DoubleRingBuffer(int capacity) {
* @param growable whether to allow growth when the buffer is full.
*/
public DoubleRingBuffer(int capacity, boolean growable) {
Assert.leq(capacity, "DoubleRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(capacity, "DoubleRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

this.growable = growable;

// use next larger power of 2 for our storage
final int newCapacity;
if (capacity < 2) {
// sensibly handle the size=0 and size=1 cases
newCapacity = 1;
} else {
newCapacity = Integer.highestOneBit(capacity - 1) << 1;
}

// reset the data structure members
storage = new double[newCapacity];
storage = new double[MathUtil.roundUpPowerOf2(capacity)];
mask = storage.length - 1;
tail = head = 0;
}
Expand All @@ -73,9 +63,9 @@ protected void grow(int increase) {
final int size = size();
final long newCapacity = (long) storage.length + increase;
// assert that we are not asking for the impossible
Assert.leq(newCapacity, "DoubleRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(newCapacity, "DoubleRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

final double[] newStorage = new double[Integer.highestOneBit((int) newCapacity - 1) << 1];
final double[] newStorage = new double[MathUtil.roundUpPowerOf2((int) newCapacity)];

// move the current data to the new buffer
copyRingBufferToArray(newStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// @formatter:off
package io.deephaven.base.ringbuffer;

import io.deephaven.base.ArrayUtil;
import io.deephaven.base.MathUtil;
import io.deephaven.base.verify.Assert;

import java.io.Serializable;
Expand All @@ -20,8 +20,6 @@
* determination of storage indices through a mask operation.
*/
public class FloatRingBuffer implements Serializable {
/** Maximum capacity is the highest power of two that can be allocated (i.e. <= than ArrayUtil.MAX_ARRAY_SIZE). */
static final int RING_BUFFER_MAX_CAPACITY = Integer.highestOneBit(ArrayUtil.MAX_ARRAY_SIZE);
static final long FIXUP_THRESHOLD = 1L << 62;
final boolean growable;
float[] storage;
Expand All @@ -45,21 +43,13 @@ public FloatRingBuffer(int capacity) {
* @param growable whether to allow growth when the buffer is full.
*/
public FloatRingBuffer(int capacity, boolean growable) {
Assert.leq(capacity, "FloatRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(capacity, "FloatRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

this.growable = growable;

// use next larger power of 2 for our storage
final int newCapacity;
if (capacity < 2) {
// sensibly handle the size=0 and size=1 cases
newCapacity = 1;
} else {
newCapacity = Integer.highestOneBit(capacity - 1) << 1;
}

// reset the data structure members
storage = new float[newCapacity];
storage = new float[MathUtil.roundUpPowerOf2(capacity)];
mask = storage.length - 1;
tail = head = 0;
}
Expand All @@ -73,9 +63,9 @@ protected void grow(int increase) {
final int size = size();
final long newCapacity = (long) storage.length + increase;
// assert that we are not asking for the impossible
Assert.leq(newCapacity, "FloatRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(newCapacity, "FloatRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

final float[] newStorage = new float[Integer.highestOneBit((int) newCapacity - 1) << 1];
final float[] newStorage = new float[MathUtil.roundUpPowerOf2((int) newCapacity)];

// move the current data to the new buffer
copyRingBufferToArray(newStorage);
Expand Down
20 changes: 5 additions & 15 deletions Base/src/main/java/io/deephaven/base/ringbuffer/IntRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// @formatter:off
package io.deephaven.base.ringbuffer;

import io.deephaven.base.ArrayUtil;
import io.deephaven.base.MathUtil;
import io.deephaven.base.verify.Assert;

import java.io.Serializable;
Expand All @@ -20,8 +20,6 @@
* determination of storage indices through a mask operation.
*/
public class IntRingBuffer implements Serializable {
/** Maximum capacity is the highest power of two that can be allocated (i.e. <= than ArrayUtil.MAX_ARRAY_SIZE). */
static final int RING_BUFFER_MAX_CAPACITY = Integer.highestOneBit(ArrayUtil.MAX_ARRAY_SIZE);
static final long FIXUP_THRESHOLD = 1L << 62;
final boolean growable;
int[] storage;
Expand All @@ -45,21 +43,13 @@ public IntRingBuffer(int capacity) {
* @param growable whether to allow growth when the buffer is full.
*/
public IntRingBuffer(int capacity, boolean growable) {
Assert.leq(capacity, "IntRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(capacity, "IntRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

this.growable = growable;

// use next larger power of 2 for our storage
final int newCapacity;
if (capacity < 2) {
// sensibly handle the size=0 and size=1 cases
newCapacity = 1;
} else {
newCapacity = Integer.highestOneBit(capacity - 1) << 1;
}

// reset the data structure members
storage = new int[newCapacity];
storage = new int[MathUtil.roundUpPowerOf2(capacity)];
mask = storage.length - 1;
tail = head = 0;
}
Expand All @@ -73,9 +63,9 @@ protected void grow(int increase) {
final int size = size();
final long newCapacity = (long) storage.length + increase;
// assert that we are not asking for the impossible
Assert.leq(newCapacity, "IntRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(newCapacity, "IntRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

final int[] newStorage = new int[Integer.highestOneBit((int) newCapacity - 1) << 1];
final int[] newStorage = new int[MathUtil.roundUpPowerOf2((int) newCapacity)];

// move the current data to the new buffer
copyRingBufferToArray(newStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// @formatter:off
package io.deephaven.base.ringbuffer;

import io.deephaven.base.ArrayUtil;
import io.deephaven.base.MathUtil;
import io.deephaven.base.verify.Assert;

import java.io.Serializable;
Expand All @@ -20,8 +20,6 @@
* determination of storage indices through a mask operation.
*/
public class LongRingBuffer implements Serializable {
/** Maximum capacity is the highest power of two that can be allocated (i.e. <= than ArrayUtil.MAX_ARRAY_SIZE). */
static final int RING_BUFFER_MAX_CAPACITY = Integer.highestOneBit(ArrayUtil.MAX_ARRAY_SIZE);
static final long FIXUP_THRESHOLD = 1L << 62;
final boolean growable;
long[] storage;
Expand All @@ -45,21 +43,13 @@ public LongRingBuffer(int capacity) {
* @param growable whether to allow growth when the buffer is full.
*/
public LongRingBuffer(int capacity, boolean growable) {
Assert.leq(capacity, "LongRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(capacity, "LongRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

this.growable = growable;

// use next larger power of 2 for our storage
final int newCapacity;
if (capacity < 2) {
// sensibly handle the size=0 and size=1 cases
newCapacity = 1;
} else {
newCapacity = Integer.highestOneBit(capacity - 1) << 1;
}

// reset the data structure members
storage = new long[newCapacity];
storage = new long[MathUtil.roundUpPowerOf2(capacity)];
mask = storage.length - 1;
tail = head = 0;
}
Expand All @@ -73,9 +63,9 @@ protected void grow(int increase) {
final int size = size();
final long newCapacity = (long) storage.length + increase;
// assert that we are not asking for the impossible
Assert.leq(newCapacity, "LongRingBuffer capacity", RING_BUFFER_MAX_CAPACITY);
Assert.leq(newCapacity, "LongRingBuffer capacity", MathUtil.MAX_POWER_OF_2);

final long[] newStorage = new long[Integer.highestOneBit((int) newCapacity - 1) << 1];
final long[] newStorage = new long[MathUtil.roundUpPowerOf2((int) newCapacity)];

// move the current data to the new buffer
copyRingBufferToArray(newStorage);
Expand Down
Loading

0 comments on commit 7150848

Please sign in to comment.