Skip to content

Commit

Permalink
Fix bug that blocks creating/launching instances across multiple ECS …
Browse files Browse the repository at this point in the history
…profiles and subnets

If you have two ECS cluster profiles which have different subnet configurations, it is possible for the plugin to get stuck and be unable to create instances. This is because when instances/containers are searched for on EC2, they are not filtered by the `cluster-name`, so you may get returned instances for a different cluster profile than the one you are trying to create. That may be intentional, as the goal here is to launch an instance in the best subnet, and cluster profiles may be sharing subnets.

If you end up with state
cluster-profile-A --> instance-A --> subnet-A

and you then need to launch an instance in

cluster-profile-B in one of (subnet-B, subnet-C)

... the previous code fails to handle it, because is finds an instance, but then filters by relevant subnets, and then doesn't handle the case where there are no subnets after counting by index.
  • Loading branch information
chadlwilson committed Feb 12, 2022
1 parent ce6db5c commit 60ef53c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static com.thoughtworks.gocd.elasticagent.ecs.domain.SpotRequestState.ACTIVE;
import static com.thoughtworks.gocd.elasticagent.ecs.domain.SpotRequestState.OPEN;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.*;
import static org.apache.commons.collections4.CollectionUtils.union;
import static org.apache.commons.lang3.StringUtils.containsAny;
Expand All @@ -46,7 +45,7 @@ public class SpotInstanceService {
private final ContainerInstanceHelper containerInstanceHelper;
private final TerminateOperation terminateOperation;
private final SpotRequestMatcher spotRequestMatcher;
private Set<SpotInstanceRequest> untaggedSpotRequests = Collections.synchronizedSet(new HashSet<>());
private final Set<SpotInstanceRequest> untaggedSpotRequests = Collections.synchronizedSet(new HashSet<>());
private static final SpotInstanceService spotInstanceService = new SpotInstanceService();

private SpotInstanceService() {
Expand Down Expand Up @@ -150,15 +149,13 @@ public void tagSpotInstances(PluginSettings pluginSettings) {

public void refreshUnTaggedSpotRequests(PluginSettings pluginSettings) {
synchronized (untaggedSpotRequests) {
List<SpotInstanceRequest> allSpotRequestsforCluster = spotInstanceHelper.getAllSpotRequestsForCluster(pluginSettings);
List<String> spotRequestIds = allSpotRequestsforCluster.stream().map(SpotInstanceRequest::getSpotInstanceRequestId).collect(toList());
List<SpotInstanceRequest> allSpotRequestsForCluster = spotInstanceHelper.getAllSpotRequestsForCluster(pluginSettings);
List<String> spotRequestIds = allSpotRequestsForCluster.stream().map(SpotInstanceRequest::getSpotInstanceRequestId).collect(toList());

LOG.debug("[refresh-spot-requests] All SpotRequests for Cluster: '{}'", spotRequestIds.stream().collect(joining()));
LOG.debug("[refresh-spot-requests] All SpotRequests for Cluster: '{}'", String.join("", spotRequestIds));
LOG.debug("[refresh-spot-requests] All Untagged spot requests: '{}'", untaggedSpotRequestIds(untaggedSpotRequests));

untaggedSpotRequests.removeIf(request -> {
return spotRequestIds.contains(request.getSpotInstanceRequestId());
});
untaggedSpotRequests.removeIf(request -> spotRequestIds.contains(request.getSpotInstanceRequestId()));
}
}

Expand All @@ -170,7 +167,7 @@ private String untaggedSpotRequestIds(Set<SpotInstanceRequest> untaggedSpotReque
private List<SpotInstanceRequest> spotRequestsWithoutRegisteredInstances(PluginSettings pluginSettings, Platform platform,
List<Instance> allRegisteredSpotInstancesForPlatform) {
List<String> registeredInstanceIds = allRegisteredSpotInstancesForPlatform.stream()
.map(instance -> instance.getInstanceId()).collect(toList());
.map(Instance::getInstanceId).collect(toList());

List<SpotInstanceRequest> spotRequests = spotInstanceHelper
.getAllOpenOrSpotRequestsWithRunningInstances(pluginSettings, pluginSettings.getClusterName(), platform);
Expand Down Expand Up @@ -226,7 +223,7 @@ private String instanceIds(List<Instance> idleInstancesWithoutTag) {
}

private Predicate<Instance> getIdleInstancePredicate() {
return instance -> !instance.getTags().stream().anyMatch(tag -> tag.getKey().equals(LAST_SEEN_IDLE));
return instance -> instance.getTags().stream().noneMatch(tag -> tag.getKey().equals(LAST_SEEN_IDLE));
}

private Map<String, List<SpotInstanceRequest>> groupSpotRequestsByPlatform(List<SpotInstanceRequest> spotInstanceRequests) {
Expand All @@ -246,7 +243,7 @@ private void tagSpotRequest(PluginSettings pluginSettings, ElasticAgentProfilePr
LOG.debug("[create-agent] Tagging open spot request.");

spotInstanceRequest.withTags(new Tag().withKey("platform").withValue(elasticAgentProfileProperties.platform().name()));
spotInstanceHelper.tagSpotResources(pluginSettings, asList(spotInstanceRequest.getSpotInstanceRequestId()), elasticAgentProfileProperties.platform());
spotInstanceHelper.tagSpotResources(pluginSettings, List.of(spotInstanceRequest.getSpotInstanceRequestId()), elasticAgentProfileProperties.platform());
} catch (Exception e) {
LOG.error("[create-agent] There were errors while tagging spot instance request with id: '{}' cancelling the request.",
spotInstanceRequest.getSpotInstanceRequestId(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,23 @@ public Subnet selectSubnetWithMinimumEC2Instances(PluginSettings pluginSettings,
private Subnet findSubnetWithMinimumInstances(List<Subnet> subnets, List<Instance> instances) {
final Map<Subnet, Long> instancePerSubnet = instancePerSubnet(subnets, instances);

// Can happen if all found instances happen to be from other elastic agent profiles (which may or may not
// be sharing a profile with our plugin configuration
if (instancePerSubnet.isEmpty()) {
return subnets.get(0);
}

return subnets.stream()
.filter(subnet -> !instancePerSubnet.containsKey(subnet)).findFirst()
.filter(subnet -> !instancePerSubnet.containsKey(subnet))
.findFirst()
.orElse(Collections.min(instancePerSubnet.entrySet(), Comparator.comparingDouble(Map.Entry::getValue)).getKey());
}

private Map<Subnet, Long> instancePerSubnet(List<Subnet> subnets, List<Instance> instances) {
final Map<String, Subnet> subnetMap = subnets.stream().collect(Collectors.toMap(Subnet::getSubnetId, subnet -> subnet));

return instances.stream()
.filter(instance -> subnetMap.keySet().contains(instance.getSubnetId()))
.filter(instance -> subnetMap.containsKey(instance.getSubnetId()))
.collect(Collectors.groupingBy(subnet -> subnetMap.get(subnet.getSubnetId()), Collectors.counting()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ void shouldReturnFirstAvailableSubnetIdIfNoContainerInstanceRunning() {
assertThat(argumentCaptor.getValue().getSubnetIds()).isEqualTo(Arrays.asList("subnet-1"));
}

@Test
void shouldReturnFirstAvailableSubnetIdIfContainerInstanceRunningInUnrelatedSubnet() {
final ArgumentCaptor<DescribeSubnetsRequest> argumentCaptor = ArgumentCaptor.forClass(DescribeSubnetsRequest.class);

when(pluginSettings.getSubnetIds()).thenReturn(Arrays.asList("subnet-1"));
when(ec2Client.describeSubnets(argumentCaptor.capture())).thenReturn(new DescribeSubnetsResult().withSubnets(
new Subnet().withSubnetId("subnet-1").withState("available")
));

Instance instanceInOtherSubnet = new Instance().withSubnetId("some-other-subnet");

final Subnet subnet = subnetSelector.selectSubnetWithMinimumEC2Instances(pluginSettings, pluginSettings.getSubnetIds(), List.of(instanceInOtherSubnet));

assertThat(subnet.getSubnetId()).isEqualTo("subnet-1");
assertThat(argumentCaptor.getValue().getSubnetIds()).isEqualTo(Arrays.asList("subnet-1"));
}

@Test
void shouldReturnSubnetIdWhichIsHavingMinimumEC2InstanceRunning() {
final ArgumentCaptor<DescribeSubnetsRequest> argumentCaptor = ArgumentCaptor.forClass(DescribeSubnetsRequest.class);
Expand Down

0 comments on commit 60ef53c

Please sign in to comment.