From 1c2020212fb8c28a4ecac8a78ddc04bce2a66469 Mon Sep 17 00:00:00 2001 From: Maksim Timonin Date: Wed, 4 Oct 2023 12:25:55 +0300 Subject: [PATCH] IGNITE-20546 Fix transactions expiration set for reinsert (#10972) --- .../dht/GridDhtTxPrepareFuture.java | 2 +- .../cache/CacheQueryFilterExpiredTest.java | 106 +++++++++++++++--- 2 files changed, 93 insertions(+), 15 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 4f8cbc9aabf8f..12d2c3114d004 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -380,7 +380,7 @@ private void onEntriesLocked() { cctx.database().checkpointReadLock(); try { - if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) && + if ((txEntry.op() == CREATE || txEntry.op() == UPDATE || txEntry.op() == TRANSFORM) && txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) { if (expiry != null) { cached.unswap(true); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryFilterExpiredTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryFilterExpiredTest.java index 9edf9c35561af..6913fe66477b7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryFilterExpiredTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryFilterExpiredTest.java @@ -17,11 +17,17 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; import javax.cache.expiry.TouchedExpiryPolicy; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.configuration.CacheConfiguration; @@ -29,39 +35,78 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; -/** - * - */ +/** */ +@RunWith(Parameterized.class) public class CacheQueryFilterExpiredTest extends GridCommonAbstractTest { + /** */ + @Parameterized.Parameter + public CacheMode cacheMode; + + /** */ + @Parameterized.Parameter(1) + public CacheAtomicityMode cacheAtomicityMode; + + /** */ + @Parameterized.Parameter(2) + public boolean eagerTtl; + + /** */ + @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}, eagerTtl={2}") + public static List params() { + List params = new ArrayList<>(); + + Stream.of(REPLICATED, PARTITIONED).forEach(cacheMode -> + Stream.of(ATOMIC, TRANSACTIONAL).forEach(cacheAtomicityMode -> + Stream.of(false, true).forEach(eagerTtl -> + params.add(new Object[] {cacheMode, cacheAtomicityMode, eagerTtl}) + ) + ) + ); + + return params; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + /** * @throws Exception If failed. */ @Test public void testFilterExpired() throws Exception { - try (Ignite ignite = startGrid(0)) { - checkFilterExpired(ignite, ATOMIC, false); - - checkFilterExpired(ignite, ATOMIC, true); - - checkFilterExpired(ignite, TRANSACTIONAL, false); + try (Ignite ignite = startGrids(2)) { + checkFilterExpired(ignite); + } + } - checkFilterExpired(ignite, TRANSACTIONAL, true); + /** + * @throws Exception If failed. + */ + @Test + public void testInsertExpired() throws Exception { + try (Ignite ignite = startGrids(2)) { + checkInsertExpired(ignite); } } /** * @param ignite Node. - * @param atomicityMode Cache atomicity mode. - * @param eagerTtl Value for {@link CacheConfiguration#setEagerTtl(boolean)}. * @throws Exception If failed. */ - private void checkFilterExpired(Ignite ignite, CacheAtomicityMode atomicityMode, boolean eagerTtl) throws Exception { + private void checkFilterExpired(Ignite ignite) throws Exception { CacheConfiguration ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); - ccfg.setAtomicityMode(atomicityMode); + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setCacheMode(cacheMode); ccfg.setEagerTtl(eagerTtl); ccfg.setIndexedTypes(Integer.class, Integer.class); @@ -94,4 +139,37 @@ private void checkFilterExpired(Ignite ignite, CacheAtomicityMode atomicityMode, ignite.destroyCache(ccfg.getName()); } } + + /** */ + private void checkInsertExpired(Ignite ignite) throws Exception { + CacheConfiguration ccfgFrom = new CacheConfiguration<>("CACHE1"); + ccfgFrom.setAtomicityMode(cacheAtomicityMode); + ccfgFrom.setCacheMode(cacheMode); + ccfgFrom.setEagerTtl(eagerTtl); + ccfgFrom.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_DAY)); + ccfgFrom.setIndexedTypes(Integer.class, Integer.class); + + CacheConfiguration ccfgTo = new CacheConfiguration<>("CACHE2"); + ccfgTo.setAtomicityMode(cacheAtomicityMode); + ccfgTo.setCacheMode(cacheMode); + ccfgTo.setEagerTtl(eagerTtl); + ccfgTo.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, 2_000))); + ccfgTo.setIndexedTypes(Integer.class, Integer.class); + + final IgniteCache cacheFrom = ignite.createCache(ccfgFrom); + final IgniteCache cacheTo = ignite.createCache(ccfgTo); + + for (int i = 0; i < 10; i++) + cacheFrom.put(i, i); + + cacheFrom.query(new SqlFieldsQuery("INSERT INTO cache2.INTEGER(_key, _val) SELECT _key, _val FROM cache1.INTEGER;")).getAll(); + + assertEquals(10, cacheTo.query(new SqlFieldsQuery("select _key, _val from cache2.INTEGER")).getAll().size()); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return cacheTo.query(new SqlFieldsQuery("select _key, _val from cache2.INTEGER")).getAll().isEmpty(); + } + }, 5_000, 1_000)); + } }