Skip to content

Commit

Permalink
Merge branch 'cassandra-4.0' into cassandra-4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
dcapwell committed May 1, 2024
2 parents 428fa1f + c14abb4 commit 7c79d91
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Do not go to disk for reading hints file sizes (CASSANDRA-19477)
* Fix system_views.settings to handle array types (CASSANDRA-19475)
Merged from 4.0:
* IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182)
* Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736)
* Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566)
* Fix few types issues and implement types compatibility tests (CASSANDRA-19479)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public GroupedSSTableContainer createGroupedSSTableContainer()
return new GroupedSSTableContainer(this);
}

public abstract void addSSTable(SSTableReader sstable);
public abstract void addSSTables(GroupedSSTableContainer sstables);

public abstract void removeSSTables(GroupedSSTableContainer sstables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ public Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableCont
return tasks;
}

@Override
public void addSSTable(SSTableReader sstable)
{
getStrategyFor(sstable).addSSTable(sstable);
}

@Override
public void addSSTables(GroupedSSTableContainer sstables)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,20 @@ public boolean hasDataForPendingRepair(TimeUUID sessionID)
}
}

@VisibleForTesting
public boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader sstable)
{
readLock.lock();
try
{
return pendingRepairs.hasPendingRepairSSTable(sessionID, sstable) || transientRepairs.hasPendingRepairSSTable(sessionID, sstable);
}
finally
{
readLock.unlock();
}
}

public void shutdown()
{
writeLock.lock();
Expand Down Expand Up @@ -646,7 +660,7 @@ else if (i < a.length)
private void handleFlushNotification(Iterable<SSTableReader> added)
{
for (SSTableReader sstable : added)
compactionStrategyFor(sstable).addSSTable(sstable);
getHolder(sstable).addSSTable(sstable);
}

private int getHolderIndex(SSTableReader sstable)
Expand Down Expand Up @@ -1185,6 +1199,8 @@ List<PendingRepairManager> getPendingRepairManagers()
*/
public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, TimeUUID pendingRepair, boolean isTransient) throws IOException
{
if (sstables.isEmpty())
return;
Set<SSTableReader> changed = new HashSet<>();

writeLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ public Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableCont
return tasks;
}

@Override
public void addSSTable(SSTableReader sstable)
{
Preconditions.checkArgument(managesSSTable(sstable), "Attempting to add sstable from wrong holder");
managers.get(router.getIndexForSSTable(sstable)).addSSTable(sstable);
}

AbstractCompactionTask getNextRepairFinishedTask()
{
List<TaskSupplier> repairFinishedSuppliers = getRepairFinishedTaskSuppliers();
Expand Down Expand Up @@ -281,4 +288,9 @@ public boolean containsSSTable(SSTableReader sstable)
{
return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
}

public boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader sstable)
{
return Iterables.any(managers, prm -> prm.hasPendingRepairSSTable(sessionID, sstable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ public boolean hasStrategy(AbstractCompactionStrategy strategy)

public synchronized boolean hasDataForSession(TimeUUID sessionID)
{
return strategies.keySet().contains(sessionID);
return strategies.containsKey(sessionID);
}

boolean containsSSTable(SSTableReader sstable)
Expand All @@ -482,6 +482,15 @@ public Collection<AbstractCompactionTask> createUserDefinedTasks(Collection<SSTa
return group.entrySet().stream().map(g -> strategies.get(g.getKey()).getUserDefinedTask(g.getValue(), gcBefore)).collect(Collectors.toList());
}

@VisibleForTesting
public synchronized boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader sstable)
{
AbstractCompactionStrategy strat = strategies.get(sessionID);
if (strat == null)
return false;
return strat.getSSTables().contains(sstable);
}

/**
* promotes/demotes sstables involved in a consistent repair that has been finalized, or failed
*/
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/db/lifecycle/Tracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ void notifyAdded(Iterable<SSTableReader> added, boolean isInitialSSTables)

public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
{
if (repairStatusesChanged.isEmpty())
return;
INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ public void doVerb(final Message<RepairMessage> message)
FailSession failure = (FailSession) message.payload;
ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure);
ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from(), failure);
ParticipateState p = ActiveRepairService.instance.participate(failure.sessionID);
if (p != null)
p.phase.fail("Failure message from " + message.from());
break;

case STATUS_REQ:
Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/repair/RepairRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.common.collect.Sets;

import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.repair.state.ParticipateState;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -202,6 +203,9 @@ private void fail(String reason)
reason = error != null ? error.getMessage() : "Some repair failed";
}
state.phase.fail(reason);
ParticipateState p = ActiveRepairService.instance.participate(state.id);
if (p != null)
p.phase.fail(reason);
String completionMessage = String.format("Repair command #%d finished with error", state.cmd);

// Note we rely on the first message being the reason for the failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@
import org.apache.commons.lang3.ArrayUtils;
import org.junit.Assert;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.QueryResult;
import org.apache.cassandra.distributed.api.Row;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.repair.consistent.LocalSession;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.TimeUUID;

import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking;

Expand Down Expand Up @@ -167,6 +173,34 @@ private static void validateExistingParentRepair(QueryResult rs, Consumer<Row> f
Assert.assertFalse("Only one repair expected, but found more than one", rs.hasNext());
}

public static void assertNoSSTableLeak(ICluster<IInvokableInstance> cluster, String ks, String table)
{
cluster.forEach(i -> {
String name = "node" + i.config().num();
i.forceCompact(ks, table); // cleanup happens in compaction, so run before checking
i.runOnInstance(() -> {
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table);
for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables())
{
TimeUUID pendingRepair = sstable.getSSTableMetadata().pendingRepair;
if (pendingRepair == null)
continue;
LocalSession session = ActiveRepairService.instance.consistent.local.getSession(pendingRepair);
// repair maybe async, so some participates may still think the repair is active, which means the sstable SHOULD link to it
if (session != null && !session.isCompleted())
continue;
// The session is complete, yet the sstable is not updated... is this still pending in compaction?
if (cfs.getCompactionStrategyManager().hasPendingRepairSSTable(pendingRepair, sstable))
continue;
// compaction does not know about the pending repair... race condition since this check started?
if (sstable.getSSTableMetadata().pendingRepair == null)
continue; // yep, race condition... ignore
throw new AssertionError(String.format("%s had leak detected on sstable %s", name, sstable.descriptor));
}
});
});
}

public enum RepairType {
FULL {
public String[] append(String... args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import static java.lang.String.format;
import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void simple() {
}

Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
});
}

Expand Down Expand Up @@ -398,6 +400,7 @@ public void snapshotFailure()
{
assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
}
assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import static java.lang.String.format;
import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
Expand Down Expand Up @@ -125,6 +126,7 @@ public void neighbourDown()
{
assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
}
assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
});
}

Expand Down Expand Up @@ -184,6 +186,7 @@ public void validationParticipentCrashesAndComesBack()
{
assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
}
assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
});
}
}

0 comments on commit 7c79d91

Please sign in to comment.