Skip to content

Commit

Permalink
IGNITE-10516 Add messages check to test
Browse files Browse the repository at this point in the history
  • Loading branch information
shishkovilja committed Oct 10, 2023
1 parent 70b82f2 commit 0c5a48c
Showing 1 changed file with 138 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,39 @@

package org.apache.ignite.internal.processors.query.schema;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.QueryEngineConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.schema.management.IndexDescriptor;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.logging.log4j.Level;
import org.junit.Test;

import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
import static org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
import static org.apache.ignite.internal.processors.query.schema.IndexWithSameNameTestBase.ListeningTcpDiscoverySpi.discoSpi;

/** */
public abstract class IndexWithSameNameTestBase extends GridCommonAbstractTest {
Expand All @@ -57,7 +71,16 @@ public abstract class IndexWithSameNameTestBase extends GridCommonAbstractTest {

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

ListeningTcpDiscoverySpi discoSpi = new ListeningTcpDiscoverySpi(SchemaFinishDiscoveryMessage.class);
discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder());

TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
commSpi.record(SchemaOperationStatusMessage.class);

return cfg.setDiscoverySpi(discoSpi)
.setCommunicationSpi(commSpi)
.setSqlConfiguration(new SqlConfiguration()
.setQueryEnginesConfiguration(getEngineConfiguration()))
.setDataStorageConfiguration(new DataStorageConfiguration()
Expand All @@ -75,59 +98,143 @@ public abstract class IndexWithSameNameTestBase extends GridCommonAbstractTest {
*/
@Test
public void testRestart() throws Exception {
int serversCnt = 3;
int srvCnt = 3;

startGrids(serversCnt).cluster().state(ClusterState.ACTIVE);

resetLog4j(Level.DEBUG, false, "org.apache.ignite.internal.processors.query.GridQueryProcessor");
startGrids(srvCnt).cluster().state(ClusterState.ACTIVE);

GridQueryProcessor qryProc = grid(0).context().query();

qryProc.querySqlFields(new SqlFieldsQuery("CREATE TABLE T1 (k1 INT PRIMARY KEY, v1 INT)"), true).getAll();
qryProc.querySqlFields(new SqlFieldsQuery("CREATE TABLE T2 (k2 INT PRIMARY KEY, v2 INT)"), true).getAll();

String testIdxName = "TEST_IDX";

qryProc.querySqlFields(new SqlFieldsQuery("CREATE TABLE T1 (k1 INT PRIMARY KEY, v1 INT)"), true).getAll();
// This index will be created.
qryProc.querySqlFields(new SqlFieldsQuery("CREATE INDEX IF NOT EXISTS " + testIdxName + " ON T1 (k1, v1)"),
true).getAll();

qryProc.querySqlFields(new SqlFieldsQuery("CREATE TABLE T2 (k2 INT PRIMARY KEY, v2 INT)"), true).getAll();
checkNoOpMessages(srvCnt, false);

// This index must not be created. Query must be no-op.
qryProc.querySqlFields(new SqlFieldsQuery("CREATE INDEX IF NOT EXISTS " + testIdxName + " ON T2 (k2, v2)"),
true).getAll();

for (int i = 0; i < serversCnt; i++)
checkIndex(i, testIdxName, F.asList("K1", "V1"));
checkNoOpMessages(srvCnt, true);

checkIndex(srvCnt, testIdxName, F.asList("K1", "V1"));

grid(0).cluster().state(ClusterState.INACTIVE);
awaitPartitionMapExchange();

stopAllGrids();

startGrids(serversCnt);
startGrids(srvCnt);

for (int i = 0; i < serversCnt; i++)
checkIndex(i, testIdxName, F.asList("K1", "V1"));
checkIndex(srvCnt, testIdxName, F.asList("K1", "V1"));
}

/**
* @param nodeIdx Node index.
* @param srvCnt Servers count.
* @param nop Expected no-op flag.
*/
private void checkNoOpMessages(int srvCnt, boolean nop) {
for (int i = 0; i < srvCnt; i++) {
assertTrue(spi(grid(i))
.recordedMessages(false)
.stream()
.allMatch(msg -> ((SchemaOperationStatusMessage)msg).nop() == nop));

assertTrue(discoSpi(grid(i))
.recordedMessages()
.stream()
.allMatch(msg -> ((SchemaFinishDiscoveryMessage)msg).nop() == nop));
}
}

/**
* @param srvCnt Servers count.
* @param idxName Index name.
* @param expIdxFields Expected index fields.
*/
private void checkIndex(int nodeIdx, String idxName, Collection<String> expIdxFields) {
Collection<IndexDescriptor> indexes = grid(nodeIdx).context().query().schemaManager().allIndexes();

List<IndexDescriptor> filteredIdxs = indexes.stream()
.filter(idx -> idxName.equalsIgnoreCase(idx.name()))
.collect(Collectors.toList());

assertEquals("There should be only one index", 1, filteredIdxs.size());

Set<String> actualFields = filteredIdxs.get(0)
.keyDefinitions()
.keySet()
.stream()
.filter(f -> !KEY_FIELD_NAME.equalsIgnoreCase(f))
.collect(Collectors.toSet());
private void checkIndex(int srvCnt, String idxName, Collection<String> expIdxFields) {
for (int i = 0; i < srvCnt; i++) {
Collection<IndexDescriptor> indexes = grid(i).context()
.query()
.schemaManager()
.allIndexes();

List<IndexDescriptor> filteredIdxs = indexes.stream()
.filter(idx -> idxName.equalsIgnoreCase(idx.name()))
.collect(Collectors.toList());

assertEquals("There should be only one index", 1, filteredIdxs.size());

Set<String> actualFields = filteredIdxs.get(0)
.keyDefinitions()
.keySet()
.stream()
.filter(f -> !KEY_FIELD_NAME.equalsIgnoreCase(f))
.collect(Collectors.toSet());

assertEqualsCollectionsIgnoringOrder(expIdxFields, actualFields);
}
}

assertEqualsCollectionsIgnoringOrder(expIdxFields, actualFields);
/**
*
*/
public static class ListeningTcpDiscoverySpi extends TcpDiscoverySpi {
/** Filtered messages. */
private final List<DiscoveryCustomMessage> recordedMsgs = new CopyOnWriteArrayList<>();

/** Filtered message types. */
private final Set<Class<?>> recordedMsgTypes;

/**
* @param msgTypes Message types.
*/
private ListeningTcpDiscoverySpi(Class<? extends DiscoveryCustomMessage>... msgTypes) {
recordedMsgTypes = F.asSet(msgTypes);
}

/** {@inheritDoc} */
@Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
DiscoverySpiCustomMessage spiCustomMsg = ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(),
U.resolveClassLoader(ignite().configuration()));

DiscoveryCustomMessage discoCustomMsg = ((CustomMessageWrapper)spiCustomMsg).delegate();

if (recordedMsgTypes.contains(discoCustomMsg.getClass())) {
synchronized (this) {
recordedMsgs.add(discoCustomMsg);
}
}
}
catch (Throwable e) {
log.error("Unexpected error", e);

fail(e.getMessage());
}
}
}

/**
* @param ignite Ignite.
*/
public static ListeningTcpDiscoverySpi discoSpi(Ignite ignite) {
return (ListeningTcpDiscoverySpi)ignite.configuration().getDiscoverySpi();
}

/**
*
*/
public synchronized List<DiscoveryCustomMessage> recordedMessages() {
List<DiscoveryCustomMessage> recordedMsgs0 = new ArrayList<>(recordedMsgs);

recordedMsgs.clear();

return recordedMsgs0;
}
}
}

0 comments on commit 0c5a48c

Please sign in to comment.