Skip to content

Commit

Permalink
Merge pull request #3391 from bipinprasad/storm3764
Browse files Browse the repository at this point in the history
[STORM-3764] Fix NPE in SchedulingSearcherState.backtrack()
  • Loading branch information
jnioche authored Oct 25, 2023
2 parents 27b3878 + 5a79077 commit 26defaa
Showing 1 changed file with 57 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -678,4 +678,61 @@ public void testScheduleLeftOverAckers() throws Exception {
assertEquals(expectedScheduling, foundScheduling);
}
}

@Test
public void testScheduleLeftOverAckers() throws Exception {
int spoutParallelism = 1;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout(), spoutParallelism);

String topoName = "testTopology";
StormTopology stormToplogy = builder.createTopology();

INimbus iNimbus = new INimbusTest();
Config conf = createGrasClusterConfig(50, 400, 0, null, Collections.emptyMap());

Map<String, SupervisorDetails> supMap = genSupervisors(1, 1, 100, 1100);
Map<String, SupervisorDetails> tmpSupMap = genSupervisors(2, 1, 100, 400);
supMap.put("r000s001", tmpSupMap.get("r000s001"));
LOG.info("{}", tmpSupMap.get("r000s001"));

conf.put(Config.TOPOLOGY_PRIORITY, 0);
conf.put(Config.TOPOLOGY_NAME, topoName);
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 1000);
conf.put(Config.TOPOLOGY_SUBMITTER_USER, "user");
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 2);
conf.put(Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER, 1);

conf.put(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB, 500);
conf.put(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT, 0);

TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
genExecsAndComps(StormCommon.systemTopology(conf, stormToplogy)), currentTime, "user");

Topologies topologies = new Topologies(topo);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);

scheduler = new ResourceAwareScheduler();

scheduler.prepare(conf, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);

// First it tries too schedule spout [0, 0] with a bound acker [1, 1] to sup1 r000s000.
// However, sup2 r000s001 only has 400 on-heap mem which can not fit the left over acker [2, 2]
// So it backtrack to [0, 0] and put it to sup2 r000s001.
// Then put two ackers both as left-over ackers to sup1 r000s000.
HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
expectedScheduling.add(new HashSet<>(Arrays.asList(
new ExecutorDetails(0, 0)))); // spout
expectedScheduling.add(new HashSet<>(Arrays.asList(
new ExecutorDetails(1, 1), // acker
new ExecutorDetails(2, 2)))); // acker

HashSet<HashSet<ExecutorDetails>> foundScheduling = new HashSet<>();
SchedulerAssignment assignment = cluster.getAssignmentById("testTopology-id");
for (Collection<ExecutorDetails> execs : assignment.getSlotToExecutors().values()) {
foundScheduling.add(new HashSet<>(execs));
}
assertEquals(expectedScheduling, foundScheduling);
}
}

0 comments on commit 26defaa

Please sign in to comment.