Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run test by simulating #41

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions framework/tst/dslabs/framework/testing/ClientWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import dslabs.framework.Result;
import dslabs.framework.Timer;
import dslabs.framework.VizIgnore;
import dslabs.framework.testing.runner.SimulatedImpl;
import dslabs.framework.testing.utils.Cloning;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -39,15 +41,16 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;

import static org.apache.commons.lang3.math.NumberUtils.max;

@EqualsAndHashCode(of = {"client", "results"}, callSuper = false)
@ToString(of = {"client", "results"})
@EqualsAndHashCode(of = { "client", "results" }, callSuper = false)
@ToString(of = { "client", "results" })
public final class ClientWorker extends Node {

@Data
Expand All @@ -64,6 +67,17 @@ private static class InterRequestTimer implements Timer {
// Properties
// TODO: move this to Workload
@VizIgnore @Getter private final boolean recordCommandsAndResults;
// lambda cannot be fast-cloned
// @Setter private LongSupplier currentTimeMillis = (LongSupplier & Serializable) () -> System.currentTimeMillis();
@Setter private SimulatedImpl simulatedImpl;

long currentTimeMillis() {
if (simulatedImpl != null) {
return simulatedImpl.currentTimeMillis();
} else {
return System.currentTimeMillis();
}
}

// Mutable state
@VizIgnore private boolean initialized = false;
Expand All @@ -74,17 +88,15 @@ private static class InterRequestTimer implements Timer {
@VizIgnore private long lastSendTimeMillis;

// Resulting state
@Getter @VizIgnore private final List<Command> sentCommands =
new ArrayList<>();
@Getter @VizIgnore private final List<Command> sentCommands = new ArrayList<>();
@Getter private final List<Result> results = new ArrayList<>();
@Getter @VizIgnore private boolean resultsOk = true;
@Getter @VizIgnore private Pair<Result, Result> expectedAndReceived = null;
@VizIgnore private long maxWaitTimeMillis = 0;


public <C extends Node & Client> ClientWorker(@NonNull C client,
@NonNull Workload workload,
boolean recordCommandsAndResults) {
@NonNull Workload workload,
boolean recordCommandsAndResults) {
super(client.address());
this.client = client;
this.recordCommandsAndResults = recordCommandsAndResults;
Expand Down Expand Up @@ -121,7 +133,7 @@ public synchronized void addCommand(String command, String result) {
public synchronized long maxWaitTimeMilis() {
if (waitingOnResult) {
return max(maxWaitTimeMillis,
System.currentTimeMillis() - lastSendTimeMillis);
currentTimeMillis() - lastSendTimeMillis);
}
return maxWaitTimeMillis;
}
Expand Down Expand Up @@ -149,14 +161,13 @@ private void sendNextCommandWhilePossible() {
}

maxWaitTimeMillis = max(maxWaitTimeMillis,
System.currentTimeMillis() - lastSendTimeMillis);
currentTimeMillis() - lastSendTimeMillis);

if (workload.hasResults() &&
!Objects.equals(expectedResult, result)) {
resultsOk = false;
if (expectedAndReceived == null) {
expectedAndReceived =
new ImmutablePair<>(expectedResult, result);
expectedAndReceived = new ImmutablePair<>(expectedResult, result);
}
}

Expand Down Expand Up @@ -193,8 +204,7 @@ private void sendNextCommandWhilePossible() {

private void sendNextCommand() {
if (workload.hasResults()) {
Pair<Command, Result> commandAndResult =
workload.nextCommandAndResult(clientNode().address());
Pair<Command, Result> commandAndResult = workload.nextCommandAndResult(clientNode().address());
lastCommand = commandAndResult.getLeft();
expectedResult = commandAndResult.getRight();
client.sendCommand(lastCommand);
Expand All @@ -205,7 +215,7 @@ private void sendNextCommand() {

waitingToSend = false;
waitingOnResult = true;
lastSendTimeMillis = System.currentTimeMillis();
lastSendTimeMillis = currentTimeMillis();
}

public synchronized boolean done() {
Expand All @@ -220,11 +230,10 @@ public synchronized void waitUntilDone() throws InterruptedException {

public synchronized void waitUntilDone(long timeoutMillis)
throws InterruptedException {
long startTime = System.currentTimeMillis();
long startTime = currentTimeMillis();

while (!done()) {
long timeLeft =
timeoutMillis - (System.currentTimeMillis() - startTime);
long timeLeft = timeoutMillis - (currentTimeMillis() - startTime);
if (timeLeft > 0) {
wait(timeLeft);
} else {
Expand All @@ -246,8 +255,8 @@ public final synchronized void init() {

@Override
public final synchronized void handleMessage(Message message,
Address sender,
Address destination) {
Address sender,
Address destination) {
clientNode().handleMessage(message, sender, destination);
sendNextCommandWhilePossible();
}
Expand Down
9 changes: 5 additions & 4 deletions framework/tst/dslabs/framework/testing/TimerEnvelope.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import dslabs.framework.Address;
import dslabs.framework.Timer;
import dslabs.framework.VizIgnore;
import dslabs.framework.testing.utils.GlobalSettings;

import java.io.Serializable;
import java.util.Random;
import lombok.Data;
Expand All @@ -37,11 +39,10 @@
* Note: this class has a natural ordering that is inconsistent with equals.
*/
@Data
@EqualsAndHashCode(
of = {"to", "timer", "minTimerLengthMillis", "maxTimerLengthMillis"})
@EqualsAndHashCode(of = { "to", "timer", "minTimerLengthMillis", "maxTimerLengthMillis" })
public final class TimerEnvelope
implements Serializable, Comparable<TimerEnvelope> {
private static final Random rand = new Random();
private static final Random rand = new Random(GlobalSettings.rand().nextLong());

private final Address to;
private final Timer timer;
Expand All @@ -52,7 +53,7 @@ public final class TimerEnvelope
@VizIgnore private final long startTimeNanos;

public TimerEnvelope(Address to, Timer timer, int minTimerLengthMillis,
int maxTimerLengthMillis) {
int maxTimerLengthMillis) {
this.to = to;
this.timer = timer;
this.minTimerLengthMillis = minTimerLengthMillis;
Expand Down
3 changes: 2 additions & 1 deletion framework/tst/dslabs/framework/testing/Workload.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import dslabs.framework.Address;
import dslabs.framework.Command;
import dslabs.framework.Result;
import dslabs.framework.testing.utils.GlobalSettings;
import dslabs.framework.testing.utils.SerializableFunction;
import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -146,7 +147,7 @@ private static Pair<String, Map<String, List<String>>> doReplacements(
randomness = new HashMap<>();
}

Random rand = new Random();
Random rand = new Random(GlobalSettings.rand().nextLong());

Pattern token = Pattern.compile("%(?:r(\\d*)|n(\\d*)|i(?:-1|\\+1)?|a)");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ public void evaluate() throws Throwable {
};

protected void shutdownStartedThreads() throws InterruptedException {
if (GlobalSettings.simulated() && isRunTest()) {
runState.shutdownStartedThreads();
return;
}

for (Thread thread : startedThreads) {
thread.interrupt();
}
Expand All @@ -202,6 +207,10 @@ protected final void assertRunInvariantsHold() {
}

protected final void startThread(Runnable runnable) {
if (GlobalSettings.simulated() && isRunTest()) {
runState.startThread(runnable);
return;
}
Thread thread = new Thread(runnable);
startedThreads.add(thread);
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import dslabs.framework.Address;
import dslabs.framework.testing.MessageEnvelope;
import dslabs.framework.testing.TestSettings;
import dslabs.framework.testing.utils.GlobalSettings;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -40,7 +42,7 @@
* Safe for concurrent access.
*/
public class RunSettings extends TestSettings<RunSettings> {
private final static Random rand = new Random();
private final static Random rand = new Random(GlobalSettings.rand().nextLong());

/* Defaults */
private static final double DEFAULT_UNRELIABLE_FRACTION_DELIVERED = 0.5;
Expand Down
81 changes: 78 additions & 3 deletions framework/tst/dslabs/framework/testing/runner/RunState.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import dslabs.framework.testing.TimerEnvelope;
import dslabs.framework.testing.runner.Network.Inbox;
import dslabs.framework.testing.utils.Cloning;
import dslabs.framework.testing.utils.GlobalSettings;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -48,7 +50,20 @@
@Log
@ToString(callSuper = true)
public class RunState extends AbstractState {
@Getter private final Network network = new Network();
// although `SimulatedImpl` can also work with `Network`, by inserting
// messages and already-due timers into `Network` then immediately `poll`
// them out, but that is unnecessarily stupid and poorly performant
// luckily `network` is only minimal made use by lab 2 and lab 3, which
// makes it fairly easy to proxy the use caes
// @Getter
private final Network network = new Network();

public Network network() {
if (simulated) {
return new SimulatedNetwork(simulatedImpl);
}
return network;
}

private volatile RunSettings settings;

Expand All @@ -58,6 +73,11 @@ public class RunState extends AbstractState {
*/
@Getter private volatile boolean exceptionThrown = false;

// used by `SimulatedImpl`
void exceptionThrown(boolean value) {
exceptionThrown = value;
}

// All accesses to these variables must be protected by synchronized(this)
private Thread mainThread;
private final Map<Address, Thread> nodeThreads = new HashMap<>();
Expand All @@ -69,13 +89,19 @@ public class RunState extends AbstractState {

// TODO: break up synchronization a bit

SimulatedImpl simulatedImpl;
boolean simulated = GlobalSettings.simulated();

public RunState(Set<Address> servers, Set<Address> clientWorkers,
Set<Address> clients, StateGenerator stateGenerator) {
Set<Address> clients, StateGenerator stateGenerator) {
super(servers, clientWorkers, clients, stateGenerator);
if (simulated) {
simulatedImpl = new SimulatedImpl(this);
}
}

public RunState(Set<Address> servers, Set<Address> clientWorkers,
StateGenerator stateGenerator) {
StateGenerator stateGenerator) {
this(servers, clientWorkers, Collections.emptySet(), stateGenerator);
}

Expand All @@ -87,6 +113,11 @@ public RunState(StateGenerator stateGenerator) {
@Override
protected synchronized void setupNode(Address address) {
final Node node = node(address);
if (simulated) {
simulatedImpl.setupNode(node);
return;
}

final Inbox inbox = network.inbox(address);

node.config(me -> {
Expand Down Expand Up @@ -183,6 +214,11 @@ private synchronized void takeSingleThreadedStep() {
* if interrupted while waiting
*/
public void waitFor() throws InterruptedException {
if (simulated) {
simulatedImpl.waitFor();
return;
}

if (settings.timeLimited() && settings.waitForClients() &&
Iterables.size(clientWorkers()) > 0) {
for (ClientWorker c : clientWorkers()) {
Expand Down Expand Up @@ -218,6 +254,11 @@ public void run(RunSettings settings) throws InterruptedException {
settings = new RunSettings();
}

if (simulated) {
simulatedImpl.run(settings);
return;
}

if (settings.multiThreaded()) {
if (startInternal(settings)) {
waitFor();
Expand Down Expand Up @@ -264,6 +305,10 @@ public void run(RunSettings settings) throws InterruptedException {
}

public void start(RunSettings settings) {
if (simulated) {
simulatedImpl.start(settings);
return;
}
startInternal(settings);
}

Expand Down Expand Up @@ -324,6 +369,11 @@ private void startNodeThread(Address address) {
* is interrupted, does not ensure a full shutdown, only initiates one.
*/
public synchronized void stop() throws InterruptedException {
if (simulated) {
simulatedImpl.stop();
return;
}

// Don't allow simultaneous stops
while (shuttingDown) {
wait();
Expand Down Expand Up @@ -352,6 +402,31 @@ public synchronized void stop() throws InterruptedException {
running = false;
}

public void sleep(long millis) throws InterruptedException {
if (simulated) {
simulatedImpl.sleep(millis);
} else {
Thread.sleep(millis);
}
}

public long currentTimeMillis() {
if (simulated) {
return simulatedImpl.currentTimeMillis();
} else {
return System.currentTimeMillis();
}
}

public void startThread(Runnable runnable) {
assert simulated;
simulatedImpl.startThread(runnable);
}

public void shutdownStartedThreads() throws InterruptedException {
assert simulated;
simulatedImpl.shutdownStartedThreads();
}

@Override
public Iterable<TimerEnvelope> timers(Address address) {
Expand Down
Loading