Skip to content

Commit

Permalink
IGNITE-22767 Working in Index and Table scan
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov committed Jul 29, 2024
1 parent f4b59db commit 75e7d87
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
Expand All @@ -41,6 +43,8 @@
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyTypeRegistry;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
Expand All @@ -52,11 +56,14 @@
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.query.calcite.exec.IndexFirstLastScan.createNotNullRowFilter;
import static org.apache.ignite.internal.processors.query.calcite.exec.IndexScan.transactionRows;

/**
* Ignite scannable cache index.
Expand Down Expand Up @@ -201,6 +208,37 @@ public Index queryIndex() {
else if (checkExpired)
rowFilter = IndexScan.createNotExpiredRowFilter();

if (!F.isEmpty(ectx.getTxWriteEntries())) {
BPlusTree.TreeRowClosure<IndexRow, IndexRow> _rowFilter = rowFilter;

IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges = transactionRows(
ectx.getTxWriteEntries(),
e -> true,
Function.identity()
);

if (!txChanges.get1().isEmpty()) {
rowFilter = new BPlusTree.TreeRowClosure<IndexRow, IndexRow>() {
@Override public boolean apply(
BPlusTree<IndexRow, IndexRow> tree,
BPlusIO<IndexRow> io,
long pageAddr,
int idx
) throws IgniteCheckedException {
if (!_rowFilter.apply(tree, io, pageAddr, idx))
return false;

IndexRow row = tree.getRow(io, pageAddr, idx);

return txChanges.get1().remove(row.cacheDataRow().key());
}
};

// TODO: no need to create list here.
cnt += txChanges.get2().size();
}
}

try {
for (int i = 0; i < iidx.segmentsCount(); ++i)
cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public static List<List<?>> executeSql(IgniteEx node, String sqlText, Object...
}
}

return node.cache(USERS).query(new SqlFieldsQuery(sqlText)
return node.cache(F.first(node.cacheNames())).query(new SqlFieldsQuery(sqlText)
.setArgs(args)
.setTimeout(5, TimeUnit.SECONDS)
).getAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@

import java.util.Collections;
import java.util.List;
import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
Expand All @@ -36,6 +33,7 @@
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;

import static org.apache.ignite.internal.processors.tx.TransactionIsolationTest.executeSql;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;

Expand All @@ -61,30 +59,20 @@ public void testVisibility() throws Exception {
.setKeyType(Integer.class.getName())
.setValueType(Integer.class.getName()))));

try (Transaction tx = srv.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 1_000, 10)) {
try (Transaction tx = srv.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 5_000, 10)) {
cache.put(1, 2);

assertEquals("Must see transaction related data", (Integer)2, cache.get(1));

List<List<?>> sqlData = executeSql(srv, "SELECT COUNT(*) FROM TBL.TBL");
List<Cache.Entry<Integer, Integer>> scanData = cache.query(new ScanQuery<Integer, Integer>()).getAll();

// Fails here.
assertEquals("Must see transaction related data", 1, scanData.size());
assertEquals("Must see transaction related data", 1L, sqlData.get(0).get(0));

tx.commit();
}

List<List<?>> sqlData = executeSql(srv, "SELECT COUNT(*) FROM TBL.TBL");
List<Cache.Entry<Integer, Integer>> scanData = cache.query(new ScanQuery<Integer, Integer>()).getAll();

assertEquals("Must see committed data", 1L, sqlData.get(0).get(0));
assertEquals("Must see committed data", 1, scanData.size());
}

/** */
public static List<List<?>> executeSql(IgniteEx node, String sqlText, Object... args) {
return node.cache("TBL").query(new SqlFieldsQuery(sqlText).setArgs(args)).getAll();
}
}

0 comments on commit 75e7d87

Please sign in to comment.