Skip to content

Commit

Permalink
Merge pull request #26 from Bouncheck/replace-uninterruptible-wait
Browse files Browse the repository at this point in the history
Replace uninterruptible wait
  • Loading branch information
CodeLieutenant authored Oct 21, 2024
2 parents dec6aac + cf3140d commit 6d3313a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
23 changes: 21 additions & 2 deletions tools/stress/src/org/apache/cassandra/stress/StressAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,15 @@ private StressMetrics run(OpDistributionFactory operations,

final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis, settings);

final CountDownLatch anyFailed = new CountDownLatch(1);
final CountDownLatch releaseConsumers = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(threadCount);
final CountDownLatch start = new CountDownLatch(threadCount);
final Consumer[] consumers = new Consumer[threadCount];
for (int i = 0; i < threadCount; i++)
{
consumers[i] = new Consumer(operations, isWarmup,
done, start, releaseConsumers, workManager, metrics, rateLimiter);
done, start, releaseConsumers, anyFailed, workManager, metrics, rateLimiter);
}

// starting worker threadCount
Expand Down Expand Up @@ -266,7 +267,16 @@ private StressMetrics run(OpDistributionFactory operations,

if (durationUnits != null)
{
Uninterruptibles.sleepUninterruptibly(duration, durationUnits);
try {
if(settings.errors.failFast) {
// I'm assuming Consumers don't finish successfully ahead of set duration
anyFailed.await(duration, durationUnits);
} else {
done.await(duration, durationUnits);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
workManager.stop();
}
else if (opCount <= 0)
Expand Down Expand Up @@ -402,13 +412,15 @@ public class Consumer extends Thread implements MeasurementSink
private final CountDownLatch done;
private final CountDownLatch start;
private final CountDownLatch releaseConsumers;
private final CountDownLatch anyFailed;
public final Queue<OpMeasurement> measurementsRecycling;
public final Queue<OpMeasurement> measurementsReporting;
public Consumer(OpDistributionFactory operations,
boolean isWarmup,
CountDownLatch done,
CountDownLatch start,
CountDownLatch releaseConsumers,
CountDownLatch anyFailed,
WorkManager workManager,
StressMetrics metrics,
UniformRateLimiter rateLimiter)
Expand All @@ -417,6 +429,7 @@ public Consumer(OpDistributionFactory operations,
this.done = done;
this.start = start;
this.releaseConsumers = releaseConsumers;
this.anyFailed = anyFailed;
this.metrics = metrics;
this.opStream = new StreamOfOperations(opDistribution, rateLimiter, workManager);
this.measurementsRecycling = new SpscArrayQueue<OpMeasurement>(8*1024);
Expand Down Expand Up @@ -462,6 +475,10 @@ public void run()

while (true)
{
if (settings.errors.failFast && anyFailed.getCount() == 0) {
success = false;
break;
}
// Assumption: All ops are thread local, operations are never shared across threads.
Operation op = opStream.nextOp();
if (op == null)
Expand Down Expand Up @@ -493,6 +510,7 @@ public void run()
output.printException(e);

success = false;
anyFailed.countDown();
opStream.abort();
metrics.cancel();
return;
Expand All @@ -503,6 +521,7 @@ public void run()
{
System.err.println(e.getMessage());
success = false;
anyFailed.countDown();
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class SettingsErrors implements Serializable
{

public final boolean ignore;
public final boolean failFast;
public final int tries;
public final boolean skipReadValidation;
public final boolean skipUnsupportedColumns;
Expand All @@ -51,6 +52,7 @@ private enum DelayPolicy {
public SettingsErrors(Options options)
{
ignore = options.ignore.setByUser();
failFast = options.failFast.setByUser();
this.tries = Math.max(1, Integer.parseInt(options.retries.value()) + 1);
skipReadValidation = options.skipReadValidation.setByUser();
skipUnsupportedColumns = options.skipUnsupportedColumns.setByUser();
Expand Down Expand Up @@ -93,6 +95,7 @@ public static final class Options extends GroupedOptions
{
final OptionSimple retries = new OptionSimple("retries=", "[0-9]+", "9", "Number of tries to perform for each operation before failing", false);
final OptionSimple ignore = new OptionSimple("ignore", "", null, "Do not fail on errors", false);
final OptionSimple failFast = new OptionSimple("fail-fast", "", null, "Fail on first thread failure when running for set <duration>", false);
final OptionSimple skipReadValidation = new OptionSimple("skip-read-validation", "", null, "Skip read validation and message output", false);
final OptionSimple skipUnsupportedColumns = new OptionSimple("skip-unsupported-columns", "", null, "Skip unsupported columns, such as maps and embedded collections, when generating data for a user profile.", false);

Expand All @@ -103,7 +106,7 @@ public static final class Options extends GroupedOptions
@Override
public List<? extends Option> options()
{
return Arrays.asList(retries, ignore, skipReadValidation, skipUnsupportedColumns,
return Arrays.asList(retries, ignore, failFast, skipReadValidation, skipUnsupportedColumns,
delayPolicy, minDelayMs, maxDelayMs);
}

Expand All @@ -128,6 +131,7 @@ public boolean happy() {
public void printSettings(ResultLogger out)
{
out.printf(" Ignore: %b%n", ignore);
out.printf(" Fail fast setting: %b%n", failFast);
out.printf(" Tries: %d%n", tries);
if (delayPolicy == DelayPolicy.CONSTANT && minDelayMs == 0) {
return;
Expand Down

0 comments on commit 6d3313a

Please sign in to comment.