Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-10516 Fix duplicate index error after CREATE INDEX IF NOT EXISTS on different tables #10940

Merged
merged 10 commits into from
Oct 31, 2023
4 changes: 2 additions & 2 deletions docs/_docs/sql-reference/ddl.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,11 @@ index_option := {INLINE_SIZE size | PARALLEL parallelism_level}

Parameters:

* `indexName` - the name of the index to be created.
* `indexName` - the name of the index to be created. The index name must be unique per schema.
* `ASC` - specifies ascending sort order (default).
* `DESC` - specifies descending sort order.
* `SPATIAL` - create the spatial index. Presently, only geometry types are supported.
* `IF NOT EXISTS` - do not throw an error if an index with the same name already exists. The database checks indexes' names only, and does not consider columns types or count.
* `IF NOT EXISTS` - do not throw an error if an index with the same name already exists. The database checks indexes' names only, and does not consider columns types or count. The index creation will be skipped if an index with the same name exist in the schema.
* `index_option` - additional options for index creation:
** `INLINE_SIZE` - specifies index inline size in bytes. Depending on the size, Ignite will place the whole indexed value or a part of it directly into index pages, thus omitting extra calls to data pages and increasing queries' performance. Index inlining is enabled by default and the size is pre-calculated automatically based on the table structure. To disable inlining, set the size to 0 (not recommended). Refer to the link:SQL/sql-tuning#increasing-index-inline-size[Increasing Index Inline Size] section for more details.
** `PARALLEL` - specifies the number of threads to be used in parallel for index creation. The greater number is set, the faster the index is created and built. If the value exceeds the number of CPUs, then it will be decreased to the number of cores. If the parameter is not specified, then the number of threads is calculated as 25% of the CPU cores available.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite;

import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.QueryEngineConfiguration;
import org.apache.ignite.internal.processors.query.schema.IndexWithSameNameTestBase;

/** */
public class IndexWithSameNameCalciteTest extends IndexWithSameNameTestBase {
/** {@inheritDoc} */
@Override protected QueryEngineConfiguration getEngineConfiguration() {
return new CalciteQueryEngineConfiguration().setDefault(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
import org.apache.ignite.internal.processors.query.calcite.CancelTest;
import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
import org.apache.ignite.internal.processors.query.calcite.IndexWithSameNameCalciteTest;
import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
import org.apache.ignite.internal.processors.query.calcite.UnstableTopologyTest;
import org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest;
Expand Down Expand Up @@ -129,6 +130,7 @@
TimeoutIntegrationTest.class,
PartitionPruneTest.class,
JoinRehashIntegrationTest.class,
IndexWithSameNameCalciteTest.class,
})
public class IntegrationTestSuite {
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.ignite.internal.processors.security.compute.ComputePermissionCheckTest;
import org.apache.ignite.internal.processors.security.impl.TestSecurityData;
import org.apache.ignite.internal.processors.security.impl.TestSecurityPluginProvider;
import org.apache.ignite.internal.util.lang.ConsumerX;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.lang.gridfunc.AtomicIntegerFactoryCallable;
import org.apache.ignite.internal.util.lang.gridfunc.RunnableWrapperClosure;
Expand Down Expand Up @@ -776,12 +777,6 @@ private interface SupplierX<T> {
T get() throws Exception;
}

/** */
private interface ConsumerX<T> {
/** */
void accept(T t) throws Exception;
}

/** */
private interface BiConsumerX<T, U> {
/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) {
assert proposeMsg != null;

// Apply changes to public cache schema if operation is successful and original cache is still there.
if (!msg.hasError()) {
if (!msg.hasError() && !msg.nop()) {
DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(msg.operation().cacheName());

if (cacheDesc != null && F.eq(cacheDesc.deploymentId(), proposeMsg.deploymentId())) {
Expand Down Expand Up @@ -1889,9 +1889,9 @@ else if (op instanceof SchemaAlterTableDropColumnOperation) {
* @param op Operation.
* @param err Error (if any).
*/
public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable SchemaOperationException err) {
public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable SchemaOperationException err, boolean nop) {
synchronized (stateMux) {
SchemaFinishDiscoveryMessage msg = new SchemaFinishDiscoveryMessage(op, err);
SchemaFinishDiscoveryMessage msg = new SchemaFinishDiscoveryMessage(op, err, nop);

try {
ctx.discovery().sendCustomEvent(msg);
Expand Down Expand Up @@ -3865,16 +3865,17 @@ public <R> R executeQuery(GridCacheQueryType qryType, String qry, @Nullable Grid
* @param destNodeId Destination node ID.
* @param opId Operation ID.
* @param err Error.
* @param nop No-op flag.
*/
public void sendStatusMessage(UUID destNodeId, UUID opId, SchemaOperationException err) {
public void sendStatusMessage(UUID destNodeId, UUID opId, SchemaOperationException err, boolean nop) {
if (log.isDebugEnabled())
log.debug("Sending schema operation status message [opId=" + opId + ", crdNode=" + destNodeId +
", err=" + err + ']');
", err=" + err + ", nop=" + nop + ']');

try {
byte[] errBytes = marshalSchemaError(opId, err);

SchemaOperationStatusMessage msg = new SchemaOperationStatusMessage(opId, errBytes);
SchemaOperationStatusMessage msg = new SchemaOperationStatusMessage(opId, errBytes, nop);

// Messages must go to dedicated schema pool. We cannot push them to query pool because in this case
// they could be blocked with other query requests.
Expand All @@ -3883,7 +3884,7 @@ public void sendStatusMessage(UUID destNodeId, UUID opId, SchemaOperationExcepti
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send schema status response [opId=" + opId + ", destNodeId=" + destNodeId +
", err=" + e + ']');
", err=" + e + ", nop=" + nop + ']');
}
}

Expand Down Expand Up @@ -3915,7 +3916,7 @@ private void processStatusMessage(SchemaOperationStatusMessage msg) {
log.debug("Received status message [opId=" + msg.operationId() +
", sndNodeId=" + msg.senderNodeId() + ']');

op.manager().onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()));
op.manager().onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()), msg.nop());

return;
}
Expand Down Expand Up @@ -3945,7 +3946,7 @@ private void unwindPendingMessages(UUID opId, SchemaOperationManager mgr) {
SchemaOperationStatusMessage msg = it.next();

if (F.eq(msg.operationId(), opId)) {
mgr.onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()));
mgr.onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()), msg.nop());

it.remove();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;

Expand All @@ -55,7 +56,7 @@ public class SchemaOperationManager {
private Collection<UUID> nodeIds;

/** Node results. */
private Map<UUID, SchemaOperationException> nodeRess;
private Map<UUID, T2<SchemaOperationException, Boolean>> nodeRess;

/** Current coordinator node. */
private ClusterNode crd;
Expand Down Expand Up @@ -132,9 +133,9 @@ private void onLocalNodeFinished(IgniteInternalFuture fut) {

synchronized (mux) {
if (isLocalCoordinator())
onNodeFinished(ctx.localNodeId(), err);
onNodeFinished(ctx.localNodeId(), err, worker.nop());
else
qryProc.sendStatusMessage(crd.id(), operationId(), err);
qryProc.sendStatusMessage(crd.id(), operationId(), err, worker.nop());
}
}

Expand All @@ -144,28 +145,29 @@ private void onLocalNodeFinished(IgniteInternalFuture fut) {
* @param nodeId Node ID.
* @param err Error.
*/
public void onNodeFinished(UUID nodeId, @Nullable SchemaOperationException err) {
public void onNodeFinished(UUID nodeId, @Nullable SchemaOperationException err, boolean nop) {
synchronized (mux) {
assert isLocalCoordinator();

if (nodeRess.containsKey(nodeId)) {
if (log.isDebugEnabled())
log.debug("Received duplicate result [opId=" + operationId() + ", nodeId=" + nodeId +
", err=" + err + ']');
", err=" + err + ", nop=" + nop + ']');

return;
}

if (nodeIds.contains(nodeId)) {
if (log.isDebugEnabled())
log.debug("Received result [opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']');
log.debug("Received result [opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err +
", nop=" + nop + ']');

nodeRess.put(nodeId, err);
nodeRess.put(nodeId, new T2<>(err, nop));
}
else {
if (log.isDebugEnabled())
log.debug("Received result from non-tracked node (joined after operation started, will ignore) " +
"[opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']');
"[opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ", nop=" + nop + ']');
}

checkFinished();
Expand Down Expand Up @@ -217,22 +219,27 @@ private void checkFinished() {
if (nodeIds.size() == nodeRess.size()) {
// Initiate finish request.
SchemaOperationException err = null;
boolean nop = false;

for (Map.Entry<UUID, SchemaOperationException> nodeRes : nodeRess.entrySet()) {
if (nodeRes.getValue() != null) {
err = nodeRes.getValue();
for (Map.Entry<UUID, T2<SchemaOperationException, Boolean>> nodeRes : nodeRess.entrySet()) {
err = nodeRes.getValue().get1();

if (err != null)
break;
}

nop |= nodeRes.getValue().get2();
}

if (log.isDebugEnabled())
log.debug("Collected all results, about to send finish message [opId=" + operationId() +
", err=" + err + ']');
", err=" + err + ", nop=" + nop + ']');

// In case of no-op operation results from all nodes must be the same.
assert err != null || !nop || nodeRess.entrySet().stream().allMatch(e -> e.getValue().get2()) : nodeRess;

crdFinished = true;

qryProc.onCoordinatorFinished(worker.operation(), err);
qryProc.onCoordinatorFinished(worker.operation(), err, nop);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage
/** Original propose message. */
private transient SchemaProposeDiscoveryMessage proposeMsg;

/** No-op flag. */
private final boolean nop;

/**
* Constructor.
*
* @param op Original operation.
* @param err Error.
* @param nop No-op flag.
*/
public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationException err) {
public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationException err, boolean nop) {
super(op);

this.err = err;
this.nop = nop;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -91,6 +96,13 @@ public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) {
this.proposeMsg = proposeMsg;
}

/**
* @return <code>True</code> if message in no-op.
*/
public boolean nop() {
return nop;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", super.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class SchemaOperationStatusMessage implements Message {
@GridDirectTransient
private UUID sndNodeId;

/** No-op flag. */
private boolean nop;

/**
* Default constructor.
*/
Expand All @@ -57,10 +60,12 @@ public SchemaOperationStatusMessage() {
*
* @param opId Operation ID.
* @param errBytes Error bytes.
* @param nop No-op flag.
*/
public SchemaOperationStatusMessage(UUID opId, byte[] errBytes) {
public SchemaOperationStatusMessage(UUID opId, byte[] errBytes, boolean nop) {
this.opId = opId;
this.errBytes = errBytes;
this.nop = nop;
}

/**
Expand Down Expand Up @@ -114,6 +119,12 @@ public void senderNodeId(UUID sndNodeId) {
return false;

writer.incrementState();

case 2:
if (!writer.writeBoolean("nop", nop))
return false;

writer.incrementState();
}

return true;
Expand All @@ -138,6 +149,14 @@ public void senderNodeId(UUID sndNodeId) {
case 1:
errBytes = reader.readByteArray("errBytes");

if (!reader.isLastRead())
return false;

reader.incrementState();

case 2:
nop = reader.readBoolean("nop");

if (!reader.isLastRead())
return false;

Expand All @@ -147,14 +166,21 @@ public void senderNodeId(UUID sndNodeId) {
return reader.afterMessageRead(SchemaOperationStatusMessage.class);
}

/**
* @return <code>True</code> if message is no-op.
*/
public boolean nop() {
return nop;
}

/** {@inheritDoc} */
@Override public short directType() {
return -53;
}

/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 2;
return 3;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.ConsumerX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -374,12 +375,6 @@ static PriorityQueueCollisionSpiEx spiEx(Ignite n) {
}
}

/** */
private interface ConsumerX<T> {
/** */
void accept(T t) throws Exception;
}

/** */
private static class SimpleTask extends ComputeTaskAdapter<Void, Void> {
/** */
Expand Down
Loading
Loading