Skip to content

Commit

Permalink
IGNITE-20579 Finally restrict atomic operations inside the transactio…
Browse files Browse the repository at this point in the history
…n (remove the system property) (#10979)
  • Loading branch information
anton-vinogradov authored Oct 16, 2023
1 parent 2380ff3 commit 63476a7
Show file tree
Hide file tree
Showing 41 changed files with 228 additions and 557 deletions.
11 changes: 0 additions & 11 deletions modules/core/src/main/java/org/apache/ignite/IgniteCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,6 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
*/
public <K1, V1> IgniteCache<K1, V1> withKeepBinary();

/**
* If you want to use atomic operations inside transactions you should allow it before transaction start.
* To enable this behavior by default you can set system property
* {@link IgniteSystemProperties#IGNITE_ALLOW_ATOMIC_OPS_IN_TX IGNITE_ALLOW_ATOMIC_OPS_IN_TX} to {@code true}.
*
* @param <V1> Type of the cache value.
* @param <K1> Type of the cache key.
* @return Cache with atomic operations allowed in transactions.
*/
public <K1, V1> IgniteCache<K1, V1> withAllowAtomicOpsInTx();

/**
* Executes {@link #localLoadCache(IgniteBiPredicate, Object...)} on all cache nodes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,14 +596,6 @@ public final class IgniteSystemProperties {
@SystemProperty("Disables performance suggestions output on start")
public static final String IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED = "IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED";

/**
* Flag indicating whether atomic operations allowed to be used inside transactions.
* Since 2.15.0 atomic operations inside transactions are not allowed by default.
*/
@SystemProperty(value = "Allows atomic operations inside transactions",
defaults = "false")
public static final String IGNITE_ALLOW_ATOMIC_OPS_IN_TX = "IGNITE_ALLOW_ATOMIC_OPS_IN_TX";

/**
* Flag indicating whether atomic and transactional caches are allowed inside the same cache group.
* Since 2.16.0 mixed cache groups are not allowed by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,15 @@

import java.io.Serializable;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.ReadRepairStrategy;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_ALLOW_ATOMIC_OPS_IN_TX;

/**
* Cache operation context.
*/
public class CacheOperationContext implements Serializable {
/** */
public static final boolean DFLT_ALLOW_ATOMIC_OPS_IN_TX = false;

/** */
public static final boolean allowAtomicOpsInTx() {
return IgniteSystemProperties.getBoolean(IGNITE_ALLOW_ATOMIC_OPS_IN_TX, DFLT_ALLOW_ATOMIC_OPS_IN_TX);
}

/** */
private static final long serialVersionUID = 0L;

Expand All @@ -59,9 +48,6 @@ public static final boolean allowAtomicOpsInTx() {
/** Keep binary flag. */
private final boolean keepBinary;

/** Allow atomic cache in transaction. */
private final boolean allowAtomicOpsInTx;

/** Expiry policy. */
private final ExpiryPolicy expiryPlc;

Expand All @@ -79,7 +65,6 @@ public CacheOperationContext() {
recovery = false;
readRepairStrategy = null;
dataCenterId = null;
allowAtomicOpsInTx = allowAtomicOpsInTx();
}

/**
Expand All @@ -96,8 +81,7 @@ public CacheOperationContext(
boolean noRetries,
@Nullable Byte dataCenterId,
boolean recovery,
@Nullable ReadRepairStrategy readRepairStrategy,
boolean allowAtomicOpsInTx
@Nullable ReadRepairStrategy readRepairStrategy
) {
this.skipStore = skipStore;
this.keepBinary = keepBinary;
Expand All @@ -106,7 +90,6 @@ public CacheOperationContext(
this.dataCenterId = dataCenterId;
this.recovery = recovery;
this.readRepairStrategy = readRepairStrategy;
this.allowAtomicOpsInTx = allowAtomicOpsInTx;
}

/**
Expand Down Expand Up @@ -136,8 +119,7 @@ public CacheOperationContext keepBinary() {
noRetries,
dataCenterId,
recovery,
readRepairStrategy,
allowAtomicOpsInTx);
readRepairStrategy);
}

/**
Expand Down Expand Up @@ -170,8 +152,7 @@ public CacheOperationContext setSkipStore(boolean skipStore) {
noRetries,
dataCenterId,
recovery,
readRepairStrategy,
allowAtomicOpsInTx);
readRepairStrategy);
}

/**
Expand All @@ -195,8 +176,7 @@ public CacheOperationContext withExpiryPolicy(ExpiryPolicy plc) {
noRetries,
dataCenterId,
recovery,
readRepairStrategy,
allowAtomicOpsInTx);
readRepairStrategy);
}

/**
Expand All @@ -211,8 +191,7 @@ public CacheOperationContext setNoRetries(boolean noRetries) {
noRetries,
dataCenterId,
recovery,
readRepairStrategy,
allowAtomicOpsInTx);
readRepairStrategy);
}

/**
Expand All @@ -227,8 +206,7 @@ public CacheOperationContext setDataCenterId(byte dataCenterId) {
noRetries,
dataCenterId,
recovery,
readRepairStrategy,
allowAtomicOpsInTx);
readRepairStrategy);
}

/**
Expand All @@ -243,8 +221,7 @@ public CacheOperationContext setRecovery(boolean recovery) {
noRetries,
dataCenterId,
recovery,
readRepairStrategy,
allowAtomicOpsInTx);
readRepairStrategy);
}

/**
Expand All @@ -259,8 +236,7 @@ public CacheOperationContext setReadRepairStrategy(ReadRepairStrategy readRepair
noRetries,
dataCenterId,
recovery,
readRepairStrategy,
allowAtomicOpsInTx);
readRepairStrategy);
}

/**
Expand All @@ -284,28 +260,6 @@ public boolean noRetries() {
return noRetries;
}

/**
* @return Operation context.
*/
public CacheOperationContext setAllowAtomicOpsInTx() {
return new CacheOperationContext(
skipStore,
keepBinary,
expiryPlc,
noRetries,
dataCenterId,
recovery,
readRepairStrategy,
true);
}

/**
* @return Allow in transactions flag.
*/
public boolean allowedAtomicOpsInTx() {
return allowAtomicOpsInTx;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheOperationContext.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,28 +173,6 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) {
}
}

/** {@inheritDoc} */
@Override public IgniteCache<K, V> withAllowAtomicOpsInTx() {
if (context().atomic() && !opCtx.allowedAtomicOpsInTx() && context().tm().tx() != null) {
throw new IllegalStateException("Enabling atomic operations during active transaction is not allowed. " +
"Enable atomic operations before transaction start.");
}

CacheOperationGate opGate = onEnter();

try {
boolean allowed = opCtx.allowedAtomicOpsInTx();

if (allowed)
return this;

return new GatewayProtectedCacheProxy<>(delegate, opCtx.setAllowAtomicOpsInTx(), lock);
}
finally {
onLeave(opGate);
}
}

/** {@inheritDoc} */
@Override public GatewayProtectedCacheProxy<K, V> withNoRetries() {
CacheOperationGate opGate = onEnter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@

import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static org.apache.ignite.internal.processors.cache.CacheOperationContext.allowAtomicOpsInTx;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
Expand Down Expand Up @@ -485,8 +484,7 @@ public void active(boolean active) {
false,
null,
false,
null,
allowAtomicOpsInTx());
null);

return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
Expand All @@ -500,8 +498,7 @@ public void active(boolean active) {
false,
null,
false,
null,
allowAtomicOpsInTx());
null);

return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx);
}
Expand All @@ -522,8 +519,7 @@ public void active(boolean active) {
false,
null,
false,
null,
allowAtomicOpsInTx());
null);

return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
Expand All @@ -537,23 +533,7 @@ public void active(boolean active) {
true,
null,
false,
null,
allowAtomicOpsInTx());

return new GridCacheProxyImpl<>(ctx, this, opCtx);
}

/** {@inheritDoc} */
@Override public final IgniteInternalCache<K, V> withAllowAtomicOpsInTx() {
CacheOperationContext opCtx = new CacheOperationContext(
false,
false,
null,
false,
null,
false,
null,
allowAtomicOpsInTx());
null);

return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private boolean checkState(boolean lock, boolean stopErr) {
* @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
*/
public boolean enterIfNotStopped() {
onEnter(null);
onEnter(false);

// Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
rwLock.readLock().lock();
Expand All @@ -121,7 +121,7 @@ public boolean enterIfNotStopped() {
* @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
*/
public boolean enterIfNotStoppedNoLock() {
onEnter(null);
onEnter(false);

return checkState(false, false);
}
Expand Down Expand Up @@ -174,7 +174,7 @@ public void leave() {

ctx.tm().enterNearTxSystemSection();

onEnter(opCtx);
onEnter(true);

Lock lock = rwLock.readLock();

Expand All @@ -199,7 +199,7 @@ public void leave() {
* @return Previous operation context set on this thread.
*/
@Nullable public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) {
onEnter(opCtx);
onEnter(true);

checkState(false, false);

Expand Down Expand Up @@ -250,16 +250,16 @@ public void leaveNoLock(CacheOperationContext prev) {
}

/**
* @param opCtx Cache operation context.
* @param checkAtomicOpsInTx Atomicity check is required.
*/
private void onEnter(CacheOperationContext opCtx) {
private void onEnter(boolean checkAtomicOpsInTx) {
ctx.itHolder().checkWeakQueue();

if (ctx.deploymentEnabled())
ctx.deploy().onEnter();

if (opCtx != null)
checkAtomicOpsInTx(opCtx);
if (checkAtomicOpsInTx)
checkAtomicOpsInTx();
}

/**
Expand Down Expand Up @@ -358,15 +358,12 @@ private enum State {
*
* @throws IgniteException - in case of atomic operation inside transaction without permission.
*/
private void checkAtomicOpsInTx(CacheOperationContext opCtx) throws IgniteException {
if (ctx.atomic() && !opCtx.allowedAtomicOpsInTx()) {
if (ctx.grid().transactions().tx() != null) {
throw new IgniteException("Transaction spans operations on atomic cache " +
"(don't use atomic cache inside transaction or set up flag by cache.allowedAtomicOpsInTx()). " +
"Since 2.15.0 atomic operations inside transactions are not allowed by default. " +
"To return the previous behaviour and to allow operations with atomic caches in transactions " +
"you can set system property IGNITE_ALLOW_ATOMIC_OPS_IN_TX to true.");
}
private void checkAtomicOpsInTx() throws IgniteException {
if (ctx.atomic() && ctx.grid().transactions().tx() != null) {
throw new IgniteException("Transaction spans operations on atomic cache " +
"(don't use atomic cache inside a transaction). " +
"Since 2.15.0 atomic operations inside transactions are not allowed by default. " +
"Since 2.16.0 atomic operations inside transactions are finally forbidden.");
}
}
}
Loading

0 comments on commit 63476a7

Please sign in to comment.