Skip to content

Commit

Permalink
IGNITE-20546 Fix transactions expiration set for reinsert (#10972)
Browse files Browse the repository at this point in the history
  • Loading branch information
timoninmaxim authored Oct 4, 2023
1 parent d2af35b commit 1c20202
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private void onEntriesLocked() {
cctx.database().checkpointReadLock();

try {
if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
if ((txEntry.op() == CREATE || txEntry.op() == UPDATE || txEntry.op() == TRANSFORM) &&
txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
if (expiry != null) {
cached.unswap(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,96 @@

package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.TouchedExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;

/**
*
*/
/** */
@RunWith(Parameterized.class)
public class CacheQueryFilterExpiredTest extends GridCommonAbstractTest {
/** */
@Parameterized.Parameter
public CacheMode cacheMode;

/** */
@Parameterized.Parameter(1)
public CacheAtomicityMode cacheAtomicityMode;

/** */
@Parameterized.Parameter(2)
public boolean eagerTtl;

/** */
@Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}, eagerTtl={2}")
public static List<Object[]> params() {
List<Object[]> params = new ArrayList<>();

Stream.of(REPLICATED, PARTITIONED).forEach(cacheMode ->
Stream.of(ATOMIC, TRANSACTIONAL).forEach(cacheAtomicityMode ->
Stream.of(false, true).forEach(eagerTtl ->
params.add(new Object[] {cacheMode, cacheAtomicityMode, eagerTtl})
)
)
);

return params;
}

/** {@inheritDoc} */
@Override protected void afterTest() {
stopAllGrids();
}

/**
* @throws Exception If failed.
*/
@Test
public void testFilterExpired() throws Exception {
try (Ignite ignite = startGrid(0)) {
checkFilterExpired(ignite, ATOMIC, false);

checkFilterExpired(ignite, ATOMIC, true);

checkFilterExpired(ignite, TRANSACTIONAL, false);
try (Ignite ignite = startGrids(2)) {
checkFilterExpired(ignite);
}
}

checkFilterExpired(ignite, TRANSACTIONAL, true);
/**
* @throws Exception If failed.
*/
@Test
public void testInsertExpired() throws Exception {
try (Ignite ignite = startGrids(2)) {
checkInsertExpired(ignite);
}
}

/**
* @param ignite Node.
* @param atomicityMode Cache atomicity mode.
* @param eagerTtl Value for {@link CacheConfiguration#setEagerTtl(boolean)}.
* @throws Exception If failed.
*/
private void checkFilterExpired(Ignite ignite, CacheAtomicityMode atomicityMode, boolean eagerTtl) throws Exception {
private void checkFilterExpired(Ignite ignite) throws Exception {
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setAtomicityMode(cacheAtomicityMode);
ccfg.setCacheMode(cacheMode);
ccfg.setEagerTtl(eagerTtl);
ccfg.setIndexedTypes(Integer.class, Integer.class);

Expand Down Expand Up @@ -94,4 +139,37 @@ private void checkFilterExpired(Ignite ignite, CacheAtomicityMode atomicityMode,
ignite.destroyCache(ccfg.getName());
}
}

/** */
private void checkInsertExpired(Ignite ignite) throws Exception {
CacheConfiguration<Integer, Integer> ccfgFrom = new CacheConfiguration<>("CACHE1");
ccfgFrom.setAtomicityMode(cacheAtomicityMode);
ccfgFrom.setCacheMode(cacheMode);
ccfgFrom.setEagerTtl(eagerTtl);
ccfgFrom.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_DAY));
ccfgFrom.setIndexedTypes(Integer.class, Integer.class);

CacheConfiguration<Integer, Integer> ccfgTo = new CacheConfiguration<>("CACHE2");
ccfgTo.setAtomicityMode(cacheAtomicityMode);
ccfgTo.setCacheMode(cacheMode);
ccfgTo.setEagerTtl(eagerTtl);
ccfgTo.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, 2_000)));
ccfgTo.setIndexedTypes(Integer.class, Integer.class);

final IgniteCache<Integer, Integer> cacheFrom = ignite.createCache(ccfgFrom);
final IgniteCache<Integer, Integer> cacheTo = ignite.createCache(ccfgTo);

for (int i = 0; i < 10; i++)
cacheFrom.put(i, i);

cacheFrom.query(new SqlFieldsQuery("INSERT INTO cache2.INTEGER(_key, _val) SELECT _key, _val FROM cache1.INTEGER;")).getAll();

assertEquals(10, cacheTo.query(new SqlFieldsQuery("select _key, _val from cache2.INTEGER")).getAll().size());

assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return cacheTo.query(new SqlFieldsQuery("select _key, _val from cache2.INTEGER")).getAll().isEmpty();
}
}, 5_000, 1_000));
}
}

0 comments on commit 1c20202

Please sign in to comment.