Skip to content

Commit

Permalink
GH-99 Add support of subqueries
Browse files Browse the repository at this point in the history
  • Loading branch information
steleal committed Mar 15, 2024
1 parent 2fba7f7 commit 25906f5
Show file tree
Hide file tree
Showing 5 changed files with 407 additions and 1 deletion.
77 changes: 76 additions & 1 deletion src/main/java/ru/rt/restream/reindexer/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public class Query<T> {
private static final int QUERY_UPDATE_FIELD_V2 = 25;
private static final int QUERY_BETWEEN_FIELDS_CONDITION = 26;
private static final int QUERY_ALWAYS_FALSE_CONDITION = 27;
private static final int QUERY_ALWAYS_TRUE_CONDITION = 28;
private static final int QUERY_SUB_QUERY_CONDITION = 29;
private static final int QUERY_FIELD_SUB_QUERY_CONDITION = 30;

/**
* Condition types.
Expand Down Expand Up @@ -187,7 +190,7 @@ public enum Condition {

private final List<ReindexerNamespace<?>> namespaces = new ArrayList<>();

private Deque<Integer> openedBrackets = new ArrayDeque<>();
private final Deque<Integer> openedBrackets = new ArrayDeque<>();

private final QueryLogBuilder logBuilder = new QueryLogBuilder();

Expand Down Expand Up @@ -388,6 +391,69 @@ public Query<T> where(String indexName, Condition condition, Object... values) {
return this;
}

/**
* Queries are possible only on the indexed fields, marked with reindex annotation.
*
* @param subquery query returning aggregated values
* @param condition condition value {@link Condition}
* @param values values to match
* @return the {@link Query} for further customizations
*/
public Query<T> where(Query<?> subquery, Condition condition, Collection<?> values) {
if (values == null) {
values = Collections.emptyList();
}
return where(subquery, condition, values.toArray());
}

/**
* Queries are possible only on the indexed fields, marked with reindex annotation.
*
* @param subquery query returning aggregated values
* @param condition condition value {@link Condition}
* @param values values to match
* @return the {@link Query} for further customizations
*/
public Query<T> where(Query<?> subquery, Condition condition, Object... values) {
logBuilder.where(nextOperation, subquery, condition.code, values);
buffer.putVarUInt32(QUERY_SUB_QUERY_CONDITION)
.putVarUInt32(nextOperation)
.putVBytes(subquery.buffer.bytes())
.putVarUInt32(condition.code);

this.nextOperation = OP_AND;
this.queryCount++;

buffer.putVarUInt32(values.length);
for (Object key : values) {
putValue(key);
}

return this;
}

/**
* Queries are possible only on the indexed fields, marked with reindex annotation.
*
* @param indexName index name
* @param condition condition value {@link Condition}
* @param subquery query returning aggregated values
* @return the {@link Query} for further customizations
*/
public Query<T> where(String indexName, Condition condition, Query<?> subquery) {
logBuilder.where(nextOperation, indexName, condition.code, subquery);
buffer.putVarUInt32(QUERY_FIELD_SUB_QUERY_CONDITION)
.putVarUInt32(nextOperation)
.putVString(indexName)
.putVarUInt32(condition.code)
.putVBytes(subquery.buffer.bytes());

this.nextOperation = OP_AND;
this.queryCount++;

return this;
}

/**
* Where - Add comparing two fields where condition to DB query.
*
Expand Down Expand Up @@ -1140,4 +1206,13 @@ public List<String> getJoinFields() {
return joinFields;
}

/**
* Get constructed sql log string.
*
* @return SQL-like representation of reindexer query
*/
String getSql() {
return logBuilder.getSql();
}

}
8 changes: 8 additions & 0 deletions src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ void where(int operationCode, String field, int conditionCode, Collection<?> val
whereEntries.add(queryEntry);
}
}

void where(int operationCode, String field, int conditionCode, Object... values) {
QueryEntry queryEntry = new QueryEntry();
queryEntry.operation = getOperation(operationCode);
Expand All @@ -294,6 +295,10 @@ void where(int operationCode, String field, int conditionCode, Object... values)
}
}

void where(int operationCode, Query<?> subquery, int conditionCode, Object... values) {
where(operationCode, mapToString(subquery), conditionCode, values);
}

void whereBetweenFields(int operationCode, String firstField, int conditionCode, String secondField) {
QueryEntry queryEntry = new QueryEntry();
queryEntry.operation = getOperation(operationCode);
Expand Down Expand Up @@ -661,6 +666,9 @@ private String mapToString(Object whereEntryValue) {
return Arrays.stream((Object[]) whereEntryValue)
.map(v -> v instanceof String ? addQuotes(v) : String.valueOf(v))
.collect(Collectors.joining(", ", "{", "}"));
} else if (whereEntryValue instanceof Query<?>) {
Query<?> subquery = (Query<?>) whereEntryValue;
return "(" + subquery.getSql() + ")";
}
return whereEntryValue instanceof String ? addQuotes(whereEntryValue) : String.valueOf(whereEntryValue);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2020 Restream
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ru.rt.restream.reindexer.connector;

import ru.rt.restream.category.BuiltinTest;

/**
* Tests for Builtin implementation.
*/
@BuiltinTest
public class BuiltinSubQueryTest extends SubQueryTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2020 Restream
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ru.rt.restream.reindexer.connector;

import ru.rt.restream.category.CprotoTest;

/**
* Tests for Cproto implementation.
*/
@CprotoTest
public class CprotoSubQueryTest extends SubQueryTest {

}
Loading

0 comments on commit 25906f5

Please sign in to comment.