Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: a new PartitionedTableService API in Python #6175

Merged
merged 45 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
11c1a48
Define Py PartitionedTableServiceBackend intf
jmao-denver Oct 7, 2024
d7d81d0
Skeleton work
jmao-denver Oct 8, 2024
4097aca
Full pass through PythonTableDataService; no real barrage message dat…
nbauernfeind Oct 8, 2024
43960d0
Schema/TableDefinition creation
jmao-denver Oct 8, 2024
66fedf9
Excerice existing_partitions
jmao-denver Oct 9, 2024
6a2b50f
Implement remaining functions in Py wrapper
jmao-denver Oct 14, 2024
9868c79
Use the Java BiConsumer/Consumer func intf
jmao-denver Oct 15, 2024
1af6997
Add range support in column_values()
jmao-denver Oct 15, 2024
ddad0c9
Improve error handling for public API
jmao-denver Oct 16, 2024
89ec201
Improve test backend
jmao-denver Oct 16, 2024
7d26147
Rename test module to be consistent
jmao-denver Oct 16, 2024
32d5d04
Implement arrow parsing for partition columns and column values.
nbauernfeind Oct 16, 2024
91df14a
Fix column ordering and tests
jmao-denver Oct 17, 2024
6a6a2bf
Add one more test
jmao-denver Oct 17, 2024
2cc3367
Update the test to fully exercice the backend
jmao-denver Oct 17, 2024
53db0c4
Nate's Java Refactor
nbauernfeind Oct 17, 2024
7b8c8d3
Paired Review
nbauernfeind Oct 17, 2024
3ba7092
Improve docstring and add new tests
jmao-denver Oct 18, 2024
c29282c
Minor Comment Clarification
nbauernfeind Oct 21, 2024
c48d65d
Fix broken test by unsubscribing via ReferenceCountedLivenessReferent…
nbauernfeind Oct 22, 2024
bcd681e
Fixup jmock TableLocation expectations
nbauernfeind Oct 22, 2024
fbb377f
Fixup other jmock related test failures
nbauernfeind Oct 22, 2024
be7b0a5
More concise naming and new test scenarios
jmao-denver Oct 23, 2024
842c0a9
Correct naming mistake
jmao-denver Oct 23, 2024
bea6155
Ryan's feedback
nbauernfeind Oct 23, 2024
ab74e91
Merge remote-tracking branch 'upstream/main' into 6171-py-TableService
nbauernfeind Oct 24, 2024
6ca321c
merge related api change fix
nbauernfeind Oct 24, 2024
e31e7a3
Apply suggestions from code review
jmao-denver Oct 25, 2024
ee5c861
Respond to Ryan's review comments
jmao-denver Oct 25, 2024
a587d80
Ryan's Rnd2 Feedback
nbauernfeind Oct 26, 2024
5288a7d
Merge remote-tracking branch 'upstream/main' into 6171-py-TableService
nbauernfeind Oct 28, 2024
6fcfb97
Ryan's feedback from live demo
nbauernfeind Oct 28, 2024
1e1626b
Ryan's javadoc feedback
nbauernfeind Oct 28, 2024
dbd26fd
More callbacks support in Python and ehn docs
jmao-denver Oct 28, 2024
58c9f50
Minor docstring correction
jmao-denver Oct 28, 2024
25386a8
Improve doc
jmao-denver Oct 29, 2024
7f05df4
Fix sphinx reference errors
jmao-denver Oct 29, 2024
18130de
Minor change to pydoc word choice
nbauernfeind Oct 29, 2024
e5dd633
Ryan's 3rd Rnd Feedback
nbauernfeind Oct 29, 2024
aef9cb4
Update extensions/barrage/src/main/java/io/deephaven/extensions/barra…
nbauernfeind Oct 30, 2024
efb05b4
Update extensions/barrage/src/main/java/io/deephaven/extensions/barra…
nbauernfeind Oct 30, 2024
0ef147e
Ryan/Jianfeng Feedback
nbauernfeind Oct 31, 2024
ba0e6a8
Remove one extra space in docstring
jmao-denver Oct 31, 2024
e536e9e
Apply suggestions from code review
nbauernfeind Oct 31, 2024
e0d8504
Ryan's suggested approach to ensure exactly one cancellation call
nbauernfeind Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,62 @@
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.Context;
import io.deephaven.util.type.TypeUtils;
import org.jetbrains.annotations.NotNull;

/**
* Convert an arbitrary chunk to a chunk of boxed objects.
*/
public class ChunkBoxer {

/**
* Return a chunk that contains boxed Objects representing the primitive values in primitives.
* Return a chunk that contains boxed {@link Object Objects} representing the primitive values in {@code values}.
*/
public interface BoxerKernel extends Context {
/**
* Convert all primitives to an object.
* Box all values into {@link Object Objects} if they are not already {@code Objects}.
*
* @param primitives the primitives to convert
* @param values the values to box
*
* @return a chunk containing primitives as an object
* @return a chunk containing values as {@code Objects} (not owned by the caller)
*/
ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives);
ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values);
}

/**
* Box the value at {@code offset} in {@code values}.
* <p>
* Please use a {@link #getBoxer(ChunkType, int) ChunkBoxer} when boxing multiple values in order to amortize the
* cost of implementation lookup and avoid virtual dispatch.
*
* @param values The chunk containing the value to box
* @param offset The offset of the value to box
* @return The boxed value
* @param <BOXED_TYPE> The type of the boxed value
*/
@SuppressWarnings("unchecked")
public static <BOXED_TYPE> BOXED_TYPE boxedGet(@NotNull final Chunk<? extends Values> values, int offset) {
final ChunkType type = values.getChunkType();
switch (type) {
case Boolean:
return (BOXED_TYPE) Boolean.valueOf(values.asBooleanChunk().get(offset));
case Char:
return (BOXED_TYPE) TypeUtils.box(values.asCharChunk().get(offset));
case Byte:
return (BOXED_TYPE) TypeUtils.box(values.asByteChunk().get(offset));
case Short:
return (BOXED_TYPE) TypeUtils.box(values.asShortChunk().get(offset));
case Int:
return (BOXED_TYPE) TypeUtils.box(values.asIntChunk().get(offset));
case Long:
return (BOXED_TYPE) TypeUtils.box(values.asLongChunk().get(offset));
case Float:
return (BOXED_TYPE) TypeUtils.box(values.asFloatChunk().get(offset));
case Double:
return (BOXED_TYPE) TypeUtils.box(values.asDoubleChunk().get(offset));
case Object:
return (BOXED_TYPE) values.asObjectChunk().get(offset);
}
throw new IllegalArgumentException("Unknown type: " + type);
}

public static BoxerKernel getBoxer(ChunkType type, int capacity) {
Expand Down Expand Up @@ -55,8 +93,8 @@ public static BoxerKernel getBoxer(ChunkType type, int capacity) {

private static class ObjectBoxer implements BoxerKernel {
@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
return primitives.asObjectChunk();
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
return values.asObjectChunk();
}
}

Expand All @@ -79,13 +117,13 @@ private static class BooleanBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final BooleanChunk<? extends Values> booleanChunk = primitives.asBooleanChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final BooleanChunk<? extends Values> booleanChunk = values.asBooleanChunk();
for (int ii = 0; ii < values.size(); ++ii) {
// noinspection UnnecessaryBoxing
objectChunk.set(ii, Boolean.valueOf(booleanChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -96,12 +134,12 @@ private static class CharBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final CharChunk<? extends Values> charChunk = primitives.asCharChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(charChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final CharChunk<? extends Values> charChunk = values.asCharChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(charChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -112,12 +150,12 @@ private static class ByteBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final ByteChunk<? extends Values> byteChunk = primitives.asByteChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(byteChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final ByteChunk<? extends Values> byteChunk = values.asByteChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(byteChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -128,12 +166,12 @@ private static class ShortBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final ShortChunk<? extends Values> shortChunk = primitives.asShortChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(shortChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final ShortChunk<? extends Values> shortChunk = values.asShortChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(shortChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -144,12 +182,12 @@ private static class IntBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final IntChunk<? extends Values> intChunk = primitives.asIntChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(intChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final IntChunk<? extends Values> intChunk = values.asIntChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(intChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -160,12 +198,12 @@ private static class LongBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final LongChunk<? extends Values> longChunk = primitives.asLongChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(longChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final LongChunk<? extends Values> longChunk = values.asLongChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(longChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -176,12 +214,12 @@ private static class FloatBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final FloatChunk<? extends Values> floatChunk = primitives.asFloatChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
objectChunk.set(ii, io.deephaven.util.type.TypeUtils.box(floatChunk.get(ii)));
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final FloatChunk<? extends Values> floatChunk = values.asFloatChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(floatChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand All @@ -192,12 +230,12 @@ private static class DoubleBoxer extends BoxerCommon {
}

@Override
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> primitives) {
final DoubleChunk<? extends Values> doubleChunk = primitives.asDoubleChunk();
for (int ii = 0; ii < primitives.size(); ++ii) {
public ObjectChunk<?, ? extends Values> box(Chunk<? extends Values> values) {
final DoubleChunk<? extends Values> doubleChunk = values.asDoubleChunk();
for (int ii = 0; ii < values.size(); ++ii) {
objectChunk.set(ii, TypeUtils.box(doubleChunk.get(ii)));
}
objectChunk.setSize(primitives.size());
objectChunk.setSize(values.size());
return objectChunk;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class SourceTable<IMPL_TYPE extends SourceTable<IMPL_TYPE>> exte
/**
* The update source object for refreshing locations and location sizes.
*/
private Runnable locationChangePoller;
private LocationChangePoller locationChangePoller;

/**
* Construct a new disk-backed table.
Expand Down Expand Up @@ -336,6 +336,9 @@ protected void destroy() {
if (updateSourceRegistrar != null) {
if (locationChangePoller != null) {
updateSourceRegistrar.removeSource(locationChangePoller);
// NB: we do not want to null out any locationChangePoller.locationBuffer here, as they may still be in
// use by a notification delivery running currently with this destroy.
locationChangePoller.locationBuffer.reset();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ interface Listener extends BasicTableDataListener {
* or 1 handleException callbacks during invocation and continuing after completion, on a thread determined by the
* implementation. Don't hold a lock that prevents notification delivery while subscribing!
* <p>
* This method only guarantees eventually consistent state. To force a state update, use run() after subscription
* completes.
* This method only guarantees eventually consistent state. To force a state update, use refresh() after
* subscription completes.
*
* @param listener A listener
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ default void handleTableLocationKeysUpdate(
* <b>must not</b> hold any lock that prevents notification delivery while subscribing. Callers <b>must</b> guard
* against duplicate notifications.
* <p>
* This method only guarantees eventually consistent state. To force a state update, use run() after subscription
* completes.
* This method only guarantees eventually consistent state. To force a state update, use refresh() after
* subscription completes.
*
* @param listener A listener.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public synchronized LocationUpdate processPending() {
if (tableLocationProvider.supportsSubscriptions()) {
tableLocationProvider.subscribe(this);
} else {
// NB: Providers that don't support subscriptions don't tick - this single call to run is
// NB: Providers that don't support subscriptions don't tick - this single call to refresh is
// sufficient.
tableLocationProvider.refresh();
final Collection<LiveSupplier<ImmutableTableLocationKey>> tableLocationKeys = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public synchronized boolean processPending() {
if (tableLocation.supportsSubscriptions()) {
tableLocation.subscribe(this);
} else {
// NB: Locations that don't support subscriptions don't tick - this single call to run is
// NB: Locations that don't support subscriptions don't tick - this single call to refresh is
// sufficient.
tableLocation.refresh();
handleUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,25 @@ public class RegionedColumnSourceManager implements ColumnSourceManager, Delegat
: TableDefinition.inferFrom(columnSourceMap);

if (isRefreshing) {
livenessNode = new LivenessArtifact() {};
livenessNode = new LivenessArtifact() {
@Override
protected void destroy() {
super.destroy();
// NB: we do not want to null out any subscriptionBuffers here, as they may still be in use by a
// notification delivery running currently with this destroy. We also do not want to clear the table
// location maps as these locations may still be useful for static tables.
for (final EmptyTableLocationEntry entry : emptyTableLocations.values()) {
if (entry.subscriptionBuffer != null) {
entry.subscriptionBuffer.reset();
}
}
for (final IncludedTableLocationEntry entry : includedTableLocations.values()) {
if (entry.subscriptionBuffer != null) {
entry.subscriptionBuffer.reset();
}
}
}
};
} else {
// This RCSM wil be managing table locations to prevent them from being de-scoped but will not otherwise
// participate in the liveness management process.
Expand Down Expand Up @@ -519,7 +537,6 @@ public final synchronized boolean isEmpty() {
return sharedColumnSources;
}

@Override
public LivenessNode asLivenessNode() {
return livenessNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
try {
allowLivenessRelease();
super.tearDown();
} finally {
if (coalesced != null) {
Expand All @@ -217,6 +218,22 @@ public void tearDown() throws Exception {
}
}

private void allowLivenessRelease() {
checking(new Expectations() {
{
allowing(locationProvider).supportsSubscriptions();
allowing(locationProvider).unsubscribe(with(any(TableLocationProvider.Listener.class)));
will(returnValue(true));
for (int li = 0; li < tableLocations.length; ++li) {
final TableLocation tableLocation = tableLocations[li];
allowing(tableLocation).supportsSubscriptions();
will(returnValue(true));
allowing(tableLocation).unsubscribe(with(any(TableLocation.Listener.class)));
}
}
});
}

private Map<String, ? extends ColumnSource<?>> getIncludedColumnsMap(final int... indices) {
return IntStream.of(indices)
.mapToObj(ci -> new Pair<>(TABLE_DEFINITION.getColumns().get(ci).getName(), columnSources[ci]))
Expand Down Expand Up @@ -443,6 +460,7 @@ public Object invoke(Invocation invocation) {
errorNotification.reset();
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
updateGraph.runWithinUnitTestCycle(() -> {
allowLivenessRelease();
SUT.refresh();
updateGraph.markSourcesRefreshedForUnitTests();
}, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,23 @@ public void testRefreshing() {
checkIndexes();
assertEquals(Arrays.asList(tableLocation0A, tableLocation1A, tableLocation0B, tableLocation1B),
SUT.includedLocations());

// expect table locations to be cleaned up via LivenessScope release as the test exits
IntStream.range(0, tableLocations.length).forEachOrdered(li -> {
final TableLocation tl = tableLocations[li];
checking(new Expectations() {
{
oneOf(tl).supportsSubscriptions();
if (li % 2 == 0) {
// Even locations don't support subscriptions
will(returnValue(false));
} else {
will(returnValue(true));
oneOf(tl).unsubscribe(with(subscriptionBuffers[li]));
}
}
});
});
}

private static void maybePrintStackTrace(@NotNull final Exception e) {
Expand Down
1 change: 1 addition & 0 deletions extensions/barrage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {

implementation libs.arrow.vector
implementation libs.arrow.format
implementation project(path: ':extensions-source-support')

compileOnly project(':util-immutables')
annotationProcessor libs.immutables.value
Expand Down
Loading