Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: a new PartitionedTableService API in Python #6175

Merged
merged 45 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
11c1a48
Define Py PartitionedTableServiceBackend intf
jmao-denver Oct 7, 2024
d7d81d0
Skeleton work
jmao-denver Oct 8, 2024
4097aca
Full pass through PythonTableDataService; no real barrage message dat…
nbauernfeind Oct 8, 2024
43960d0
Schema/TableDefinition creation
jmao-denver Oct 8, 2024
66fedf9
Excerice existing_partitions
jmao-denver Oct 9, 2024
6a2b50f
Implement remaining functions in Py wrapper
jmao-denver Oct 14, 2024
9868c79
Use the Java BiConsumer/Consumer func intf
jmao-denver Oct 15, 2024
1af6997
Add range support in column_values()
jmao-denver Oct 15, 2024
ddad0c9
Improve error handling for public API
jmao-denver Oct 16, 2024
89ec201
Improve test backend
jmao-denver Oct 16, 2024
7d26147
Rename test module to be consistent
jmao-denver Oct 16, 2024
32d5d04
Implement arrow parsing for partition columns and column values.
nbauernfeind Oct 16, 2024
91df14a
Fix column ordering and tests
jmao-denver Oct 17, 2024
6a6a2bf
Add one more test
jmao-denver Oct 17, 2024
2cc3367
Update the test to fully exercice the backend
jmao-denver Oct 17, 2024
53db0c4
Nate's Java Refactor
nbauernfeind Oct 17, 2024
7b8c8d3
Paired Review
nbauernfeind Oct 17, 2024
3ba7092
Improve docstring and add new tests
jmao-denver Oct 18, 2024
c29282c
Minor Comment Clarification
nbauernfeind Oct 21, 2024
c48d65d
Fix broken test by unsubscribing via ReferenceCountedLivenessReferent…
nbauernfeind Oct 22, 2024
bcd681e
Fixup jmock TableLocation expectations
nbauernfeind Oct 22, 2024
fbb377f
Fixup other jmock related test failures
nbauernfeind Oct 22, 2024
be7b0a5
More concise naming and new test scenarios
jmao-denver Oct 23, 2024
842c0a9
Correct naming mistake
jmao-denver Oct 23, 2024
bea6155
Ryan's feedback
nbauernfeind Oct 23, 2024
ab74e91
Merge remote-tracking branch 'upstream/main' into 6171-py-TableService
nbauernfeind Oct 24, 2024
6ca321c
merge related api change fix
nbauernfeind Oct 24, 2024
e31e7a3
Apply suggestions from code review
jmao-denver Oct 25, 2024
ee5c861
Respond to Ryan's review comments
jmao-denver Oct 25, 2024
a587d80
Ryan's Rnd2 Feedback
nbauernfeind Oct 26, 2024
5288a7d
Merge remote-tracking branch 'upstream/main' into 6171-py-TableService
nbauernfeind Oct 28, 2024
6fcfb97
Ryan's feedback from live demo
nbauernfeind Oct 28, 2024
1e1626b
Ryan's javadoc feedback
nbauernfeind Oct 28, 2024
dbd26fd
More callbacks support in Python and ehn docs
jmao-denver Oct 28, 2024
58c9f50
Minor docstring correction
jmao-denver Oct 28, 2024
25386a8
Improve doc
jmao-denver Oct 29, 2024
7f05df4
Fix sphinx reference errors
jmao-denver Oct 29, 2024
18130de
Minor change to pydoc word choice
nbauernfeind Oct 29, 2024
e5dd633
Ryan's 3rd Rnd Feedback
nbauernfeind Oct 29, 2024
aef9cb4
Update extensions/barrage/src/main/java/io/deephaven/extensions/barra…
nbauernfeind Oct 30, 2024
efb05b4
Update extensions/barrage/src/main/java/io/deephaven/extensions/barra…
nbauernfeind Oct 30, 2024
0ef147e
Ryan/Jianfeng Feedback
nbauernfeind Oct 31, 2024
ba0e6a8
Remove one extra space in docstring
jmao-denver Oct 31, 2024
e536e9e
Apply suggestions from code review
nbauernfeind Oct 31, 2024
e0d8504
Ryan's suggested approach to ensure exactly one cancellation call
nbauernfeind Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -132,23 +133,42 @@ private BackendAccessor(
*/
public BarrageUtil.ConvertedArrowSchema[] getTableSchema(
@NotNull final TableKeyImpl tableKey) {
final BarrageUtil.ConvertedArrowSchema[] schemas = new BarrageUtil.ConvertedArrowSchema[2];
final AsyncState<BarrageUtil.ConvertedArrowSchema[]> asyncState = new AsyncState<>();

final Consumer<ByteBuffer[]> onRawSchemas = byteBuffers -> {
final BarrageUtil.ConvertedArrowSchema[] schemas = new BarrageUtil.ConvertedArrowSchema[2];

if (byteBuffers.length != schemas.length) {
throw new IllegalArgumentException(String.format(
"%s: table_schema returned too many IPC messages. Expected %d, received %d.",
tableKey, schemas.length, byteBuffers.length));
asyncState.setError(new IllegalArgumentException(String.format(
"Provided too many IPC messages. Expected %d, received %d.",
schemas.length, byteBuffers.length)));
return;
}

for (int ii = 0; ii < schemas.length; ++ii) {
schemas[ii] = BarrageUtil.convertArrowSchema(ArrowToTableConverter.parseArrowSchema(
ArrowToTableConverter.parseArrowIpcMessage(byteBuffers[ii])));
try {
schemas[ii] = BarrageUtil.convertArrowSchema(
ArrowToTableConverter.parseArrowSchema(
ArrowToTableConverter.parseArrowIpcMessage(
byteBuffers[ii])));
} catch (final Exception e) {
final String schemaType = ii % 2 == 0 ? "data table" : "partitioning column";
asyncState.setError(new IllegalArgumentException(String.format(
"failed to parse %s schema message", schemaType), e));
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}

asyncState.setResult(schemas);
};

pyTableDataService.call("_table_schema", tableKey.key, onRawSchemas);
final Consumer<String> onFailure = errorString -> asyncState.setError(
new UncheckedDeephavenException(errorString));

return schemas;
pyTableDataService.call("_table_schema", tableKey.key, onRawSchemas, onFailure);

return asyncState.awaitResult(err -> new TableDataException(String.format(
"%s: table_schema failed", tableKey), err));
}

/**
Expand All @@ -162,11 +182,24 @@ public void getTableLocations(
@NotNull final TableDefinition definition,
@NotNull final TableKeyImpl tableKey,
@NotNull final Consumer<TableLocationKeyImpl> listener) {
final AsyncState<Boolean> asyncState = new AsyncState<>();

final BiConsumer<TableLocationKeyImpl, ByteBuffer[]> convertingListener =
(tableLocationKey, byteBuffers) -> processTableLocationKey(definition, tableKey, listener,
tableLocationKey, byteBuffers);
(tableLocationKey, byteBuffers) -> {
try {
processTableLocationKey(definition, tableKey, listener, tableLocationKey, byteBuffers);
} catch (final RuntimeException e) {
asyncState.setError(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a similar case for table_schema, you wrapped in an IllegalArgumentException.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right; I would rather not wrap in table_schema -- that should catch a RuntimeException and not Exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, technically the wrapper in table_schema specifies which schema failed to parse.. I'll clarify tomorrow.

}
};

pyTableDataService.call("_table_locations", tableKey.key, convertingListener);
final Runnable onSuccess = () -> asyncState.setResult(true);
final Consumer<String> onFailure = errorString -> asyncState.setError(
new UncheckedDeephavenException(errorString));

pyTableDataService.call("_table_locations", tableKey.key, convertingListener, onSuccess, onFailure);
asyncState.awaitResult(err -> new TableDataException(String.format(
"%s: table_locations failed", tableKey), err));
}

/**
Expand All @@ -190,15 +223,23 @@ public SafeCloseable subscribeToTableLocations(
@NotNull final TableKeyImpl tableKey,
@NotNull final Consumer<TableLocationKeyImpl> tableLocationListener,
@NotNull final Runnable successCallback,
@NotNull final Consumer<String> failureCallback) {
@NotNull final Consumer<RuntimeException> failureCallback) {
final BiConsumer<TableLocationKeyImpl, ByteBuffer[]> convertingListener =
(tableLocationKey, byteBuffers) -> processTableLocationKey(definition, tableKey,
tableLocationListener,
tableLocationKey, byteBuffers);
(tableLocationKey, byteBuffers) -> {
try {
processTableLocationKey(
definition, tableKey, tableLocationListener, tableLocationKey, byteBuffers);
} catch (final RuntimeException e) {
failureCallback.accept(e);
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}
};

final Consumer<String> onFailure = errorString -> failureCallback.accept(
new UncheckedDeephavenException(errorString));

final PyObject cancellationCallback = pyTableDataService.call(
"_subscribe_to_table_locations", tableKey.key, convertingListener, successCallback,
failureCallback);
"_subscribe_to_table_locations", tableKey.key,
convertingListener, successCallback, onFailure);
return () -> cancellationCallback.call("__call__");
}

Expand Down Expand Up @@ -295,7 +336,17 @@ public void getTableLocationSize(
@NotNull final TableKeyImpl tableKey,
@NotNull final TableLocationKeyImpl tableLocationKey,
@NotNull final LongConsumer listener) {
pyTableDataService.call("_table_location_size", tableKey.key, tableLocationKey.locationKey, listener);
final AsyncState<Long> asyncState = new AsyncState<>();

final LongConsumer onSize = asyncState::setResult;
final Consumer<String> onFailure = errorString -> asyncState.setError(
new UncheckedDeephavenException(errorString));

pyTableDataService.call("_table_location_size", tableKey.key, tableLocationKey.locationKey,
onSize, onFailure);

listener.accept(asyncState.awaitResult(err -> new TableDataException(String.format(
"%s:%s: table_location_size failed", tableKey, tableLocationKey), err)));
}

/**
Expand Down Expand Up @@ -343,35 +394,37 @@ public List<WritableChunk<Values>> getColumnValues(
final int minimumSize,
final int maximumSize) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved

final ArrayList<WritableChunk<Values>> resultChunks = new ArrayList<>();
final AsyncState<List<WritableChunk<Values>>> asyncState = new AsyncState<>();

final String columnName = columnDefinition.getName();
final Consumer<ByteBuffer[]> onMessages = messages -> {
if (messages.length < 2) {
throw new IllegalArgumentException(String.format("%s:%s: column_values(%s, %d, %d, %d) expected at "
+ "least 2 IPC messages describing the wire format of the column followed by column "
+ "values, but received %d messages", tableKey, tableLocationKey, columnName,
firstRowPosition, minimumSize, maximumSize, messages.length));
asyncState.setError(new IllegalArgumentException(String.format(
"expected at least 2 IPC messages describing the wire format of the column followed by "
+ "column values, but received %d messages",
messages.length)));
return;
}
resultChunks.ensureCapacity(messages.length - 1);
final ArrayList<WritableChunk<Values>> resultChunks = new ArrayList<>(messages.length - 1);

nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
final Schema schema = ArrowToTableConverter.parseArrowSchema(
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
ArrowToTableConverter.parseArrowIpcMessage(messages[0]));
final BarrageUtil.ConvertedArrowSchema schemaPlus = BarrageUtil.convertArrowSchema(schema);

if (schema.fieldsLength() > 1) {
throw new IllegalArgumentException(String.format("%s:%s: column_values(%s, %d, %d, %d) received "
+ "more than one field. Received %d fields for columns %s",
tableKey, tableLocationKey, columnName, firstRowPosition, minimumSize, maximumSize,
asyncState.setError(new IllegalArgumentException(String.format(
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
"Received more than one field. Received %d fields for columns %s.",
schema.fieldsLength(),
IntStream.range(0, schema.fieldsLength())
.mapToObj(ci -> schema.fields(ci).name())
.reduce((a, b) -> a + ", " + b).orElse("")));
.reduce((a, b) -> a + ", " + b).orElse(""))));
return;
}
if (!columnDefinition.isCompatible(schemaPlus.tableDef.getColumns().get(0))) {
throw new IllegalArgumentException(String.format("%s:%s: column_values(%s, %d, %d, %d) received "
+ "incompatible column definition. Expected %s, but received %s.",
tableKey, tableLocationKey, columnName, firstRowPosition, minimumSize, maximumSize,
columnDefinition, schemaPlus.tableDef.getColumns().get(0)));
asyncState.setError(new IllegalArgumentException(String.format(
"Received incompatible column definition. Expected %s, but received %s.",
columnDefinition, schemaPlus.tableDef.getColumns().get(0))));
return;
}

final ChunkReader reader = schemaPlus.computeChunkReaders(
Expand All @@ -381,10 +434,9 @@ public List<WritableChunk<Values>> getColumnValues(
for (; mi < messages.length; ++mi) {
final BarrageProtoUtil.MessageInfo recordBatchMessageInfo = parseArrowIpcMessage(messages[mi]);
if (recordBatchMessageInfo.header.headerType() != MessageHeader.RecordBatch) {
throw new IllegalArgumentException(String.format("%s:%s: column_values(%s, %d, %d, %d) IPC "
+ "message %d is not a valid Arrow RecordBatch IPC message",
tableKey, tableLocationKey, columnName, firstRowPosition, minimumSize, maximumSize,
mi));
asyncState.setError(new IllegalArgumentException(String.format(
"IPC message %d is not a valid Arrow RecordBatch IPC message", mi)));
return;
}
final RecordBatch batch = (RecordBatch) recordBatchMessageInfo.header.header(new RecordBatch());

Expand All @@ -399,17 +451,28 @@ public List<WritableChunk<Values>> getColumnValues(
}
} catch (final IOException ioe) {
SafeCloseable.closeAll(resultChunks.iterator());
throw new UncheckedDeephavenException(String.format("%s:%s: column_values(%s, %d, %d, %d) failed "
+ "to read IPC message %d", tableKey, tableLocationKey, columnName,
firstRowPosition, minimumSize, maximumSize, mi), ioe);
asyncState.setError(new UncheckedDeephavenException(String.format(
"failed to read IPC message %d", mi), ioe));
return;
} catch (final RuntimeException e) {
SafeCloseable.closeAll(resultChunks.iterator());
asyncState.setError(e);
return;
}

asyncState.setResult(resultChunks);
};

final Consumer<String> onFailure = errorString -> asyncState.setError(
new UncheckedDeephavenException(errorString));

pyTableDataService.call("_column_values",
tableKey.key, tableLocationKey.locationKey, columnName, firstRowPosition,
minimumSize, maximumSize, onMessages);
minimumSize, maximumSize, onMessages, onFailure);

return resultChunks;
return asyncState.awaitResult(err -> new TableDataException(String.format(
"%s:%s: column_values(%s, %d, %d, %d) failed",
tableKey, tableLocationKey, columnName, firstRowPosition, minimumSize, maximumSize), err));
}
}

Expand Down Expand Up @@ -538,7 +601,8 @@ protected void activateUnderlyingDataSource() {
localSubscription.cancellationCallback = backend.subscribeToTableLocations(
tableDefinition, key, this::handleTableLocationKeyAdded,
() -> activationSuccessful(localSubscription),
errorString -> activationFailed(localSubscription, new TableDataException(errorString)));
error -> activationFailed(localSubscription, new TableDataException(
String.format("%s: subscribe_to_table_locations failed", key), error)));
}

@Override
Expand Down Expand Up @@ -703,14 +767,16 @@ protected void activateUnderlyingDataSource() {
final TableLocationKeyImpl location = (TableLocationKeyImpl) getKey();

final Subscription localSubscription = subscription = new Subscription();
localSubscription.cancellationCallback = backend.subscribeToTableLocationSize(key, location, newSize -> {
final LongConsumer subscriptionFilter = newSize -> {
if (localSubscription != subscription) {
// we've been cancelled and/or replaced
return;
}

onSizeChanged(newSize);
}, () -> activationSuccessful(localSubscription),
};
localSubscription.cancellationCallback = backend.subscribeToTableLocationSize(
key, location, subscriptionFilter, () -> activationSuccessful(localSubscription),
errorString -> activationFailed(localSubscription, new TableDataException(errorString)));
}

Expand Down Expand Up @@ -863,4 +929,35 @@ public long size() {
private static class Subscription {
SafeCloseable cancellationCallback;
}

// this tool is used to simplify backend asynchronous RPC patterns for otherwise synchronous operations
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
private static class AsyncState<T> {
private T result;
private RuntimeException error;

public synchronized void setResult(@NotNull final T result) {
this.result = result;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
notifyAll();
}

public synchronized void setError(@NotNull final RuntimeException error) {
this.error = error;
notifyAll();
}

public synchronized T awaitResult(@NotNull final Function<Exception, RuntimeException> errorMapper) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems weird that errorMapper accepts an Exception, but error is a RuntimeException. Maybe we want to be more permissive in specifying error, or just get rid of errorMapper?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted the mapper to guarantee that we can throw it without declaring that the method call throws. I wanted to be permissive on the way into the mapper so we could setError to an IOException, for example.

However, I do see that I ended up wrapping the IOException in an UncheckedDeephavenException anyway .. and then that might be wrapped in a TableDataException.

The benefit of the mapper is to capture that the error may have originated off-thread and therefore both stack traces may have real value to the user. As well as, to move specifics about the call into the mapper (for consistency in error message) and allow for the internal error to not necessarily care the origin of the request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, actually, the Exception entry to the helper is because of the interrupted exception in awaitResult. Will make the holder more permissible.

while (result == null && error == null) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw errorMapper.apply(e);
}
}
if (error != null) {
throw errorMapper.apply(error);
}
return result;
}
}
}
Loading