Skip to content

Commit

Permalink
Simplified MultiJoin GRPC structures.
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Feb 15, 2024
1 parent aee9eff commit b4a667f
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ public static Stream<TableReference> getSourceIds(Operation op) {
case COLUMN_STATISTICS:
return Stream.of(op.getColumnStatistics().getSourceId());
case MULTI_JOIN:
return op.getMultiJoin().getMultiJoinInputsList().isEmpty()
? op.getMultiJoin().getSourceIdsList().stream()
: op.getMultiJoin().getMultiJoinInputsList().stream().map(MultiJoinInput::getSourceId);
return op.getMultiJoin().getMultiJoinInputsList().stream().map(MultiJoinInput::getSourceId);
case OP_NOT_SET:
throw new IllegalStateException("Operation id not set");
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,9 +763,7 @@ message MultiJoinInput {
// omitted.
message MultiJoinTablesRequest {
Ticket result_id = 1;
repeated TableReference source_ids = 2;
repeated string columns_to_match = 3;
repeated MultiJoinInput multi_join_inputs = 4;
repeated MultiJoinInput multi_join_inputs = 2;
}

message RangeJoinTablesRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,26 @@ public MultiJoinGrpcImpl(
super(authWiring::checkPermissionMultiJoinTables,
BatchTableRequest.Operation::getMultiJoin,
MultiJoinTablesRequest::getResultId,
(MultiDependencyFunction<MultiJoinTablesRequest>) request -> request.getMultiJoinInputsList().isEmpty()
? request.getSourceIdsList()
: request.getMultiJoinInputsList().stream().map(MultiJoinInput::getSourceId)
.collect(Collectors.toList()));
(MultiDependencyFunction<MultiJoinTablesRequest>) request -> request.getMultiJoinInputsList().stream()
.map(MultiJoinInput::getSourceId).collect(Collectors.toList()));
}

@Override
public void validateRequest(final MultiJoinTablesRequest request) throws StatusRuntimeException {
GrpcErrorHelper.checkHasNoUnknownFields(request);

if (request.getSourceIdsList().isEmpty() && request.getMultiJoinInputsList().isEmpty()) {
if (request.getMultiJoinInputsList().isEmpty()) {
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Cannot join zero source tables.");
}
if (!request.getSourceIdsList().isEmpty() && !request.getMultiJoinInputsList().isEmpty()) {
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT,
"If `multi_join_inputs` are provided, `source_ids` must remain empty.");
}
if (!request.getColumnsToMatchList().isEmpty() && !request.getMultiJoinInputsList().isEmpty()) {
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT,
"If `multi_join_inputs` are provided, `columns_to_match` must remain empty.");
}

// Validate well-formed source table ids.
request.getSourceIdsList().forEach(Common::validate);
request.getMultiJoinInputsList().forEach(input -> Common.validate(input.getSourceId()));
// Verify well-formed MultiJoinInput.
request.getMultiJoinInputsList().forEach(input -> {
GrpcErrorHelper.checkHasNoUnknownFields(input);
// Columns to match must be non-empty.
GrpcErrorHelper.checkRepeatedFieldNonEmpty(input, MultiJoinInput.COLUMNS_TO_MATCH_FIELD_NUMBER);
// Validate the source table id.
Common.validate(input.getSourceId());
});
}

@Override
Expand All @@ -64,30 +59,20 @@ public Table create(final MultiJoinTablesRequest request,
final Table firstTable = sourceTables.get(0).get();
final Table[] allTables = sourceTables.stream().map(SessionState.ExportObject::get).toArray(Table[]::new);

// Were multiJoinInputs provided?
if (!request.getMultiJoinInputsList().isEmpty()) {
// Build the multiJoinInput array.
final io.deephaven.engine.table.MultiJoinInput[] multiJoinInputs =
new io.deephaven.engine.table.MultiJoinInput[request.getMultiJoinInputsCount()];
// Build the multiJoinInput array.
final io.deephaven.engine.table.MultiJoinInput[] multiJoinInputs =
new io.deephaven.engine.table.MultiJoinInput[request.getMultiJoinInputsCount()];

for (int i = 0; i < request.getMultiJoinInputsCount(); i++) {
final Table table = sourceTables.get(i).get();
for (int i = 0; i < request.getMultiJoinInputsCount(); i++) {
final Table table = sourceTables.get(i).get();

final MultiJoinInput mjInput = request.getMultiJoinInputs(i);
final String[] columnsToMatch = mjInput.getColumnsToMatchList().toArray(new String[0]);
final String[] columnsToAdd = mjInput.getColumnsToAddList().toArray(new String[0]);
final MultiJoinInput mjInput = request.getMultiJoinInputs(i);
final String[] columnsToMatch = mjInput.getColumnsToMatchList().toArray(new String[0]);
final String[] columnsToAdd = mjInput.getColumnsToAddList().toArray(new String[0]);

multiJoinInputs[i] =
io.deephaven.engine.table.MultiJoinInput.of(table, columnsToMatch, columnsToAdd);
}
return firstTable.getUpdateGraph(allTables).sharedLock().computeLocked(
() -> MultiJoinFactory.of(multiJoinInputs).table());
multiJoinInputs[i] = io.deephaven.engine.table.MultiJoinInput.of(table, columnsToMatch, columnsToAdd);
}

// Build from the provided tables and key columns.
final String[] columnsToMatch = request.getColumnsToMatchList().toArray(new String[0]);

return firstTable.getUpdateGraph(allTables).sharedLock().computeLocked(
() -> MultiJoinFactory.of(columnsToMatch, allTables).table());
() -> MultiJoinFactory.of(multiJoinInputs).table());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,6 @@ public void multiJoinStatic() {
}
}

@Test
public void multiJoinStaticFromInputs() {
final MultiJoinTablesRequest request = prototypeMultiJoinInputs();
final ExportedTableCreationResponse response = channel().tableBlocking().multiJoinTables(request);
try {
assertThat(response.getSuccess()).isTrue();
assertThat(response.getIsStatic()).isTrue();
assertThat(response.getSize()).isEqualTo(1);
} finally {
release(response);
}
}

@Test
public void missingResultId() {
final MultiJoinTablesRequest request = MultiJoinTablesRequest.newBuilder(prototype())
Expand All @@ -58,45 +45,36 @@ public void missingResultId() {
@Test
public void zeroTables() {
final MultiJoinTablesRequest request = MultiJoinTablesRequest.newBuilder(prototype())
.clearSourceIds()
.clearMultiJoinInputs()
.build();
assertError(request, Code.INVALID_ARGUMENT,
"Cannot join zero source tables.");
}

@Test
public void sourceTablesAndInputProvided() {
MultiJoinTablesRequest prototype = prototype();
final MultiJoinTablesRequest request = MultiJoinTablesRequest.newBuilder(prototypeMultiJoinInputs())
.addAllSourceIds(prototype.getSourceIdsList())
.build();
assertError(request, Code.INVALID_ARGUMENT,
"If `multi_join_inputs` are provided, `source_ids` must remain empty.");
}
public void columnsToMatchNotProvided() {
final TableReference t1 = ref(TableTools.emptyTable(1).view("Key=ii", "First=ii"));
final TableReference t2 = ref(TableTools.emptyTable(1).view("Key=ii", "Second=ii*2"));

@Test
public void columnsToMatchAndInputProvided() {
final MultiJoinTablesRequest request = MultiJoinTablesRequest.newBuilder(prototypeMultiJoinInputs())
final MultiJoinInput input1 = MultiJoinInput.newBuilder()
.setSourceId(t1)
.addColumnsToMatch("OutputKey=Key")
.addColumnsToAdd("First")
.build();
final MultiJoinInput input2 = MultiJoinInput.newBuilder()
.setSourceId(t2)
.build();
assertError(request, Code.INVALID_ARGUMENT,
"If `multi_join_inputs` are provided, `columns_to_match` must remain empty.");
}

private MultiJoinTablesRequest prototype() {
final TableReference t1 = ref(TableTools.emptyTable(1).view("Key=ii", "First=ii"));
final TableReference t2 = ref(TableTools.emptyTable(1).view("Key=ii", "Second=ii*2"));
final TableReference t3 = ref(TableTools.emptyTable(1).view("Key=ii", "Third=ii*3", "Extra=ii*4"));
return MultiJoinTablesRequest.newBuilder()
final MultiJoinTablesRequest request = MultiJoinTablesRequest.newBuilder()
.setResultId(ExportTicketHelper.wrapExportIdInTicket(1))
.addSourceIds(t1)
.addSourceIds(t2)
.addSourceIds(t3)
.addColumnsToMatch("OutputKey=Key")
.addMultiJoinInputs(input1)
.addMultiJoinInputs(input2)
.build();
assertError(request, Code.INVALID_ARGUMENT,
"must have at least one columns_to_match");
}

private MultiJoinTablesRequest prototypeMultiJoinInputs() {
private MultiJoinTablesRequest prototype() {
final TableReference t1 = ref(TableTools.emptyTable(1).view("Key=ii", "First=ii"));
final TableReference t2 = ref(TableTools.emptyTable(1).view("Key=ii", "Second=ii*2"));
final TableReference t3 = ref(TableTools.emptyTable(1).view("Key=ii", "Third=ii*3"));
Expand Down

0 comments on commit b4a667f

Please sign in to comment.