Skip to content

Commit

Permalink
Merge pull request #3274 from bipinprasad/storm3639
Browse files Browse the repository at this point in the history
[STORM-3639] Replace asserts in daemon code.
  • Loading branch information
jnioche authored Dec 4, 2023
2 parents c158dee + 5e9a2da commit 2785507
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -828,9 +828,15 @@ private static Map<String, Map<WorkerSlot, WorkerResources>> computeTopoToNodePo
return ret;
}

/**
* Check new assignments with existing assignments and determine difference is any.
*
* @param existingAssignments non-null map of topology-id to existing assignments.
* @param newAssignments non-null map of topology-id to new assignments.
* @return true if there is a change in assignments, false otherwise.
*/
private boolean auditAssignmentChanges(Map<String, Assignment> existingAssignments,
Map<String, Assignment> newAssignments) {
assert existingAssignments != null && newAssignments != null;
boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty();
long numRemovedExec = 0;
long numRemovedSlot = 0;
Expand Down Expand Up @@ -1966,8 +1972,12 @@ private void waitForDesiredCodeReplication(Map<String, Object> topoConf, String

private TopologyDetails readTopologyDetails(String topoId, StormBase base) throws KeyNotFoundException,
AuthorizationException, IOException, InvalidTopologyException {
assert (base != null);
assert (topoId != null);
if (base == null) {
throw new InvalidTopologyException("Cannot readTopologyDetails: StormBase parameter value is null");
}
if (topoId == null) {
throw new InvalidTopologyException("Cannot readTopologyDetails: topoId parameter value is null");
}

Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, topoCache);
StormTopology topo = readStormTopologyAsNimbus(topoId, topoCache);
Expand Down Expand Up @@ -2067,7 +2077,9 @@ private List<List<Integer>> computeExecutors(StormBase base, Map<String, Object>
StormTopology topology)
throws InvalidTopologyException {

assert (base != null);
if (base == null) {
throw new InvalidTopologyException("Cannot computeExecutors: StormBase parameter value is null");
}

Map<String, Integer> compToExecutors = base.get_component_executors();
List<List<Integer>> ret = new ArrayList<>();
Expand Down Expand Up @@ -2677,7 +2689,9 @@ private int getTopologyLaunchHeartbeatTimeoutSec(String topoId) {
private void startTopology(String topoName, String topoId, TopologyStatus initStatus, String owner,
String principal, Map<String, Object> topoConf, StormTopology stormTopology)
throws InvalidTopologyException {
assert (TopologyStatus.ACTIVE == initStatus || TopologyStatus.INACTIVE == initStatus);
if (TopologyStatus.ACTIVE != initStatus && TopologyStatus.INACTIVE != initStatus) {
throw new InvalidTopologyException("Cannot startTopology: initStatus should be ACTIVE or INACTIVE, not " + initStatus.name());
}
Map<String, Integer> numExecutors = new HashMap<>();
StormTopology topology = StormCommon.systemTopology(topoConf, stormTopology);
for (Entry<String, Object> entry : StormCommon.allComponents(topology).entrySet()) {
Expand Down Expand Up @@ -3195,7 +3209,9 @@ public void submitTopologyWithOpts(String topoName, String uploadedJarLocation,
try {
submitTopologyWithOptsCalls.mark();
assertIsLeader();
assert (options != null);
if (options == null) {
throw new InvalidTopologyException("Cannot submitTopologyWithOpts: SubmitOptions parameter value is null");
}
validateTopologyName(topoName);
checkAuthorization(topoName, null, "submitTopology");
assertTopoActive(topoName, false);
Expand Down Expand Up @@ -5321,10 +5337,12 @@ private class ClusterSummaryMetricSet implements Runnable {
ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) {
this.metricsRegistry = metricsRegistry;
//Break the code if out of sync to thrift protocol
assert ClusterSummary._Fields.values().length == 3
&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS
&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES
&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES;
if (ClusterSummary._Fields.values().length != 3
|| ClusterSummary._Fields.findByName("supervisors") != ClusterSummary._Fields.SUPERVISORS
|| ClusterSummary._Fields.findByName("topologies") != ClusterSummary._Fields.TOPOLOGIES
|| ClusterSummary._Fields.findByName("nimbuses") != ClusterSummary._Fields.NIMBUSES) {
throw new AssertionError("Out of sync with thrift protocol");
}

final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ public BasicContainer(ContainerType type, Map<String, Object> conf, String super
AdvancedFSOps ops, String profileCmd) throws IOException {
super(type, conf, supervisorId, supervisorPort, port, assignment,
resourceIsolationManager, workerId, topoConf, ops, metricsRegistry, containerMemoryTracker);
assert (localState != null);
if (localState == null) {
throw new IOException("LocalState parameter value is null");
}
this.localState = localState;

if (type.isRecovery() && !type.isOnlyKillable()) {
Expand Down Expand Up @@ -209,7 +211,11 @@ public static String getStormVersionFor(final Map<String, Object> conf, final St
*/
protected void createNewWorkerId() {
type.assertFull();
assert (workerId == null);
if (workerId != null) {
String err = "Incorrect usage of createNewWorkerId(), current workerId is " + workerId + ", expecting null";
LOG.error(err);
throw new AssertionError(err);
}
synchronized (localState) {
workerId = Utils.uuid();
Map<String, Integer> workerToPort = localState.getApprovedWorkers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,15 @@ protected Container(ContainerType type, Map<String, Object> conf, String supervi
int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
String workerId, Map<String, Object> topoConf, AdvancedFSOps ops,
StormMetricsRegistry metricsRegistry, ContainerMemoryTracker containerMemoryTracker) throws IOException {
assert (type != null);
assert (conf != null);
assert (supervisorId != null);
if (type == null) {
throw new IOException("ContainerType parameter is null");
}
if (conf == null) {
throw new IOException("conf parameter value is null");
}
if (supervisorId == null) {
throw new IOException("SupervisorId parameter value is null");
}

symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);

Expand All @@ -137,14 +143,24 @@ protected Container(ContainerType type, Map<String, Object> conf, String supervi
}

if (this.type.isOnlyKillable()) {
assert (this.assignment == null);
assert (this.port <= 0);
assert (this.workerId != null);
if (this.assignment != null) {
throw new IOException("With ContainerType==OnlyKillable, expecting LocalAssignment member variable to be null");
}
if (this.port > 0) {
throw new IOException("With ContainerType==OnlyKillable, expecting port member variable <=0 but found " + this.port);
}
if (this.workerId == null) {
throw new IOException("With ContainerType==OnlyKillable, expecting WorkerId member variable to be assigned");
}
topologyId = null;
this.topoConf = null;
} else {
assert (assignment != null);
assert (port > 0);
if (this.assignment == null) {
throw new IOException("With ContainerType!=OnlyKillable, expecting LocalAssignment member variable to be assigned");
}
if (this.port <= 0) {
throw new IOException("With ContainerType!=OnlyKillable, expecting port member variable >0 but found " + this.port);
}
topologyId = assignment.get_topology_id();
if (!this.ops.doRequiredTopoFilesExist(this.conf, topologyId)) {
LOG.info(
Expand Down Expand Up @@ -175,7 +191,9 @@ public String toString() {
}

protected Map<String, Object> readTopoConf() throws IOException {
assert (topologyId != null);
if (topologyId == null) {
throw new IOException("Cannot readTopoConf: member variable topologyId is null");
}
return ConfigUtils.readSupervisorStormConf(conf, topologyId);
}

Expand Down
Loading

0 comments on commit 2785507

Please sign in to comment.