Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fix duplicate index error after CREATE INDEX IF NOT EXISTS on different tables
  • Loading branch information
shishkovilja committed Sep 19, 2023
1 parent 0080ae9 commit 2c78b2e
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.schema.management.IndexDescriptor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.logging.log4j.Level;
import org.junit.Test;

import static org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;

/** */
public class IndexWithSameNameTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();

cleanPersistenceDir();
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();

cleanPersistenceDir();

super.afterTest();
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setSqlConfiguration(new SqlConfiguration()
.setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration().setDefault(true)))
.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)));
}

/**
*
*/
@Test
public void testRestart() throws Exception {
int serversCnt = 3;

startGrids(serversCnt).cluster().state(ClusterState.ACTIVE);

resetLog4j(Level.DEBUG, false, "org.apache.ignite.internal.processors.query.GridQueryProcessor");

GridQueryProcessor qryProc = grid(0).context().query();

String testIdxName = "TEST_IDX";

qryProc.querySqlFields(new SqlFieldsQuery("CREATE TABLE T1 (k1 INT PRIMARY KEY, v1 INT)"), true).getAll();
qryProc.querySqlFields(new SqlFieldsQuery("CREATE INDEX IF NOT EXISTS " + testIdxName + " ON T1 (k1, v1)"),
true).getAll();

qryProc.querySqlFields(new SqlFieldsQuery("CREATE TABLE T2 (k2 INT PRIMARY KEY, v2 INT)"), true).getAll();
qryProc.querySqlFields(new SqlFieldsQuery("CREATE INDEX IF NOT EXISTS " + testIdxName + " ON T2 (k2, v2)"),
true).getAll();

for (int i = 0; i < serversCnt; i++)
checkIndex(i, testIdxName, F.asList("K1", "V1"));

grid(0).cluster().state(ClusterState.INACTIVE);
awaitPartitionMapExchange();

stopAllGrids();

startGrids(serversCnt);

for (int i = 0; i < serversCnt; i++)
checkIndex(i, testIdxName, F.asList("K1", "V1"));
}

/**
* @param nodeIdx Node index.
* @param idxName Index name.
* @param expIdxFields Expected index fields.
*/
private void checkIndex(int nodeIdx, String idxName, Collection<String> expIdxFields) {
Collection<IndexDescriptor> indexes = grid(nodeIdx).context().query().schemaManager().allIndexes();

List<IndexDescriptor> filteredIdxs = indexes.stream()
.filter(idx -> idxName.equalsIgnoreCase(idx.name()))
.collect(Collectors.toList());

assertEquals("There should be only one index", 1, filteredIdxs.size());

Set<String> actualFields = filteredIdxs.get(0)
.keyDefinitions()
.keySet()
.stream()
.filter(f -> !KEY_FIELD_NAME.equalsIgnoreCase(f))
.collect(Collectors.toSet());

assertEqualsCollectionsIgnoringOrder(expIdxFields, actualFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) {
assert proposeMsg != null;

// Apply changes to public cache schema if operation is successful and original cache is still there.
if (!msg.hasError()) {
if (!msg.hasError() && !msg.nop()) {
DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(msg.operation().cacheName());

if (cacheDesc != null && F.eq(cacheDesc.deploymentId(), proposeMsg.deploymentId())) {
Expand Down Expand Up @@ -1889,9 +1889,9 @@ else if (op instanceof SchemaAlterTableDropColumnOperation) {
* @param op Operation.
* @param err Error (if any).
*/
public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable SchemaOperationException err) {
public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable SchemaOperationException err, boolean nop) {
synchronized (stateMux) {
SchemaFinishDiscoveryMessage msg = new SchemaFinishDiscoveryMessage(op, err);
SchemaFinishDiscoveryMessage msg = new SchemaFinishDiscoveryMessage(op, err, nop);

try {
ctx.discovery().sendCustomEvent(msg);
Expand Down Expand Up @@ -3865,16 +3865,17 @@ public <R> R executeQuery(GridCacheQueryType qryType, String qry, @Nullable Grid
* @param destNodeId Destination node ID.
* @param opId Operation ID.
* @param err Error.
* @param nop No-op flag.
*/
public void sendStatusMessage(UUID destNodeId, UUID opId, SchemaOperationException err) {
public void sendStatusMessage(UUID destNodeId, UUID opId, SchemaOperationException err, boolean nop) {
if (log.isDebugEnabled())
log.debug("Sending schema operation status message [opId=" + opId + ", crdNode=" + destNodeId +
", err=" + err + ']');
", err=" + err + ", nop=" + nop + ']');

try {
byte[] errBytes = marshalSchemaError(opId, err);

SchemaOperationStatusMessage msg = new SchemaOperationStatusMessage(opId, errBytes);
SchemaOperationStatusMessage msg = new SchemaOperationStatusMessage(opId, errBytes, nop);

// Messages must go to dedicated schema pool. We cannot push them to query pool because in this case
// they could be blocked with other query requests.
Expand All @@ -3883,7 +3884,7 @@ public void sendStatusMessage(UUID destNodeId, UUID opId, SchemaOperationExcepti
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send schema status response [opId=" + opId + ", destNodeId=" + destNodeId +
", err=" + e + ']');
", err=" + e + ", nop=" + nop + ']');
}
}

Expand Down Expand Up @@ -3915,7 +3916,7 @@ private void processStatusMessage(SchemaOperationStatusMessage msg) {
log.debug("Received status message [opId=" + msg.operationId() +
", sndNodeId=" + msg.senderNodeId() + ']');

op.manager().onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()));
op.manager().onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()), msg.nop());

return;
}
Expand Down Expand Up @@ -3945,7 +3946,7 @@ private void unwindPendingMessages(UUID opId, SchemaOperationManager mgr) {
SchemaOperationStatusMessage msg = it.next();

if (F.eq(msg.operationId(), opId)) {
mgr.onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()));
mgr.onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()), msg.nop());

it.remove();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;

Expand All @@ -55,7 +56,7 @@ public class SchemaOperationManager {
private Collection<UUID> nodeIds;

/** Node results. */
private Map<UUID, SchemaOperationException> nodeRess;
private Map<UUID, IgniteBiTuple<SchemaOperationException, Boolean>> nodeRess;

/** Current coordinator node. */
private ClusterNode crd;
Expand Down Expand Up @@ -132,9 +133,9 @@ private void onLocalNodeFinished(IgniteInternalFuture fut) {

synchronized (mux) {
if (isLocalCoordinator())
onNodeFinished(ctx.localNodeId(), err);
onNodeFinished(ctx.localNodeId(), err, worker.nop());
else
qryProc.sendStatusMessage(crd.id(), operationId(), err);
qryProc.sendStatusMessage(crd.id(), operationId(), err, worker.nop());
}
}

Expand All @@ -144,28 +145,29 @@ private void onLocalNodeFinished(IgniteInternalFuture fut) {
* @param nodeId Node ID.
* @param err Error.
*/
public void onNodeFinished(UUID nodeId, @Nullable SchemaOperationException err) {
public void onNodeFinished(UUID nodeId, @Nullable SchemaOperationException err, boolean nop) {
synchronized (mux) {
assert isLocalCoordinator();

if (nodeRess.containsKey(nodeId)) {
if (log.isDebugEnabled())
log.debug("Received duplicate result [opId=" + operationId() + ", nodeId=" + nodeId +
", err=" + err + ']');
", err=" + err + ", nop=" + nop + ']');

return;
}

if (nodeIds.contains(nodeId)) {
if (log.isDebugEnabled())
log.debug("Received result [opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']');
log.debug("Received result [opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err +
", nop=" + nop + ']');

nodeRess.put(nodeId, err);
nodeRess.put(nodeId, F.t(err, nop));
}
else {
if (log.isDebugEnabled())
log.debug("Received result from non-tracked node (joined after operation started, will ignore) " +
"[opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']');
"[opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ", nop=" + nop + ']');
}

checkFinished();
Expand Down Expand Up @@ -217,22 +219,24 @@ private void checkFinished() {
if (nodeIds.size() == nodeRess.size()) {
// Initiate finish request.
SchemaOperationException err = null;
boolean nop = false;

for (Map.Entry<UUID, SchemaOperationException> nodeRes : nodeRess.entrySet()) {
if (nodeRes.getValue() != null) {
err = nodeRes.getValue();
for (Map.Entry<UUID, IgniteBiTuple<SchemaOperationException, Boolean>> nodeRes : nodeRess.entrySet()) {
err = nodeRes.getValue().get1();

if (err != null)
break;
}

nop |= nodeRes.getValue().get2();
}

if (log.isDebugEnabled())
log.debug("Collected all results, about to send finish message [opId=" + operationId() +
", err=" + err + ']');
", err=" + err + ", nop=" + nop + ']');

crdFinished = true;

qryProc.onCoordinatorFinished(worker.operation(), err);
qryProc.onCoordinatorFinished(worker.operation(), err, nop);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage
/** Original propose message. */
private transient SchemaProposeDiscoveryMessage proposeMsg;

/** No-op flag. */
private final boolean nop;

/**
* Constructor.
*
* @param op Original operation.
* @param err Error.
* @param nop No-op flag.
*/
public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationException err) {
public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationException err, boolean nop) {
super(op);

this.err = err;
this.nop = nop;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -91,6 +96,14 @@ public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) {
this.proposeMsg = proposeMsg;
}


/**
* @return <code>True</code> if message in no-op.
*/
public boolean nop() {
return nop;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", super.toString());
Expand Down
Loading

0 comments on commit 2c78b2e

Please sign in to comment.