Skip to content

Commit

Permalink
IGNITE-22732 Transaction aware SQL (#11584)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Oct 15, 2024
1 parent e948408 commit 0e76399
Show file tree
Hide file tree
Showing 96 changed files with 5,915 additions and 1,203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
Expand Down Expand Up @@ -120,6 +122,7 @@
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.configuration.TransactionConfiguration.TX_AWARE_QUERIES_SUPPORTED_MODES;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;

/** */
Expand Down Expand Up @@ -486,6 +489,8 @@ private <T> List<T> parseAndProcessQuery(
String sql,
Object... params
) throws IgniteSQLException {
ensureTransactionModeSupported(qryCtx);

SchemaPlus schema = schemaHolder.schema(schemaName);

assert schema != null : "Schema not found: " + schemaName;
Expand Down Expand Up @@ -588,7 +593,32 @@ private Object contextKey(QueryContext qryCtx) {

SqlFieldsQuery sqlFieldsQry = qryCtx.unwrap(SqlFieldsQuery.class);

return sqlFieldsQry != null ? F.asList(sqlFieldsQry.isLocal(), sqlFieldsQry.isEnforceJoinOrder()) : null;
return sqlFieldsQry != null
? F.asList(sqlFieldsQry.isLocal(), sqlFieldsQry.isEnforceJoinOrder(), queryTransactionVersion(qryCtx) == null)
: null;
}

/** */
private static GridCacheVersion queryTransactionVersion(@Nullable QueryContext qryCtx) {
return qryCtx == null ? null : qryCtx.unwrap(GridCacheVersion.class);
}

/** */
private void ensureTransactionModeSupported(@Nullable QueryContext qryCtx) {
if (!ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled())
return;

GridCacheVersion ver = queryTransactionVersion(qryCtx);

if (ver == null)
return;

final GridNearTxLocal userTx = ctx.cache().context().tm().tx(ver);

if (TX_AWARE_QUERIES_SUPPORTED_MODES.contains(userTx.isolation()))
return;

throw new IllegalStateException("Transaction isolation mode not supported for SQL queries: " + userTx.isolation());
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ protected AbstractIndexScan(
/** {@inheritDoc} */
@Override public synchronized Iterator<Row> iterator() {
if (ranges == null)
return new IteratorImpl(idx.find(null, null, true, true, indexQueryContext()));
return new IteratorImpl(indexCursor(null, null, true, true));

IgniteClosure<RangeCondition<Row>, IteratorImpl> clo = range -> {
IdxRow lower = range.lower() == null ? null : row2indexRow(range.lower());
IdxRow upper = range.upper() == null ? null : row2indexRow(range.upper());

return new IteratorImpl(
idx.find(lower, upper, range.lowerInclude(), range.upperInclude(), indexQueryContext()));
indexCursor(lower, upper, range.lowerInclude(), range.upperInclude()));
};

if (!ranges.multiBounds()) {
Expand All @@ -88,6 +88,11 @@ protected AbstractIndexScan(
return F.flat(F.iterator(ranges, clo, true));
}

/** */
protected GridCursor<IdxRow> indexCursor(IdxRow lower, IdxRow upper, boolean lowerInclude, boolean upperInclude) {
return idx.find(lower, upper, lowerInclude, upperInclude, indexQueryContext());
}

/** */
protected abstract IdxRow row2indexRow(Row bound);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long fragm
NoOpMemoryTracker.INSTANCE,
NoOpIoTracker.INSTANCE,
0,
ImmutableMap.of());
ImmutableMap.of(),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,37 @@
package org.apache.ignite.internal.processors.query.calcite.exec;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.RowTracker;
import org.apache.ignite.internal.processors.query.calcite.message.QueryTxEntry;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.prepare.AbstractQueryContext;
Expand All @@ -45,8 +58,12 @@
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.query.calcite.util.Commons.checkRange;

Expand Down Expand Up @@ -102,6 +119,9 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
/** */
private final long timeout;

/** */
private final Collection<QueryTxEntry> qryTxEntries;

/** */
private final long startTs;

Expand All @@ -127,7 +147,8 @@ public ExecutionContext(
MemoryTracker qryMemoryTracker,
IoTracker ioTracker,
long timeout,
Map<String, Object> params
Map<String, Object> params,
@Nullable Collection<QueryTxEntry> qryTxEntries
) {
super(qctx);

Expand All @@ -142,6 +163,7 @@ public ExecutionContext(
this.ioTracker = ioTracker;
this.params = params;
this.timeout = timeout;
this.qryTxEntries = qryTxEntries;

startTs = U.currentTimeMillis();

Expand Down Expand Up @@ -289,6 +311,84 @@ public void setCorrelated(@NotNull Object value, int id) {
correlations[id] = value;
}

/**
* @return Transaction write map.
*/
public Collection<QueryTxEntry> getQryTxEntries() {
return qryTxEntries;
}

/** */
public static Collection<QueryTxEntry> transactionChanges(
Collection<IgniteTxEntry> writeEntries
) {
if (F.isEmpty(writeEntries))
return null;

Collection<QueryTxEntry> res = new ArrayList<>();

for (IgniteTxEntry e : writeEntries) {
CacheObject val = e.value();

if (!F.isEmpty(e.entryProcessors()))
val = e.applyEntryProcessors(val);

res.add(new QueryTxEntry(e.cacheId(), e.key(), val, e.explicitVersion()));
}

return res;
}

/**
* @param cacheId Cache id.
* @param parts Partitions set.
* @param mapper Mapper to specific data type.
* @return First, set of object changed in transaction, second, list of transaction data in required format.
* @param <R> Required type.
*/
public <R> IgniteBiTuple<Set<KeyCacheObject>, List<R>> transactionChanges(
int cacheId,
int[] parts,
Function<CacheDataRow, R> mapper
) {
if (F.isEmpty(qryTxEntries))
return F.t(Collections.emptySet(), Collections.emptyList());

// Expecting parts are sorted or almost sorted and amount of transaction entries are relatively small.
if (parts != null && !F.isSorted(parts))
Arrays.sort(parts);

Set<KeyCacheObject> changedKeys = new HashSet<>(qryTxEntries.size());
List<R> newAndUpdatedRows = new ArrayList<>(qryTxEntries.size());

for (QueryTxEntry e : qryTxEntries) {
int part = e.key().partition();

assert part != -1;

if (e.cacheId() != cacheId)
continue;

if (parts != null && Arrays.binarySearch(parts, part) < 0)
continue;

changedKeys.add(e.key());

CacheObject val = e.value();

if (val != null) { // Mix only updated or inserted entries. In case val == null entry removed.
newAndUpdatedRows.add(mapper.apply(new CacheDataRowAdapter(
e.key(),
val,
e.version(),
CU.EXPIRE_TIME_ETERNAL // Expire time calculated on commit, can use eternal here.
)));
}
}

return F.t(changedKeys, newAndUpdatedRows);
}

/**
* Executes a query task.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
Expand Down Expand Up @@ -608,6 +609,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(

MemoryTracker qryMemoryTracker = qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());

final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(), ctx.cache().context());

ExecutionContext<Row> ectx = new ExecutionContext<>(
qry.context(),
taskExecutor(),
Expand All @@ -620,7 +623,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
qryMemoryTracker,
createIoTracker(locNodeId, qry.localQueryId()),
timeout,
qryParams);
qryParams,
userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries()));

Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
exchangeService(), failureProcessor()).go(fragment.root());
Expand Down Expand Up @@ -659,7 +663,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
fragmentsPerNode.get(nodeId).intValue(),
qry.parameters(),
parametersMarshalled,
timeout
timeout,
ectx.getQryTxEntries()
);

messageService().send(nodeId, req);
Expand Down Expand Up @@ -867,7 +872,8 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()),
createIoTracker(nodeId, msg.originatingQryId()),
msg.timeout(),
Commons.parametersMap(msg.parameters())
Commons.parametersMap(msg.parameters()),
msg.queryTransactionEntries()
);

executeFragment(qry, (FragmentPlan)qryPlan, ectx);
Expand Down
Loading

0 comments on commit 0e76399

Please sign in to comment.