Skip to content

Commit

Permalink
feat(deployment): add required capabilities feature (#924)
Browse files Browse the repository at this point in the history
  • Loading branch information
fahadmohammed01 authored Apr 22, 2021
1 parent 3b1cab6 commit 306c09e
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -53,6 +54,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static com.aws.greengrass.deployment.DeploymentService.DEPLOYMENT_FAILURE_CAUSE_KEY;
import static com.aws.greengrass.deployment.DeploymentService.DEPLOYMENT_SERVICE_TOPICS;
import static com.aws.greengrass.deployment.DeploymentStatusKeeper.DEPLOYMENT_ID_KEY_NAME;
import static com.aws.greengrass.deployment.DeploymentStatusKeeper.DEPLOYMENT_STATUS_DETAILS_KEY_NAME;
Expand All @@ -63,6 +65,8 @@
import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType;
import static com.aws.greengrass.util.Utils.copyFolderRecursively;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(GGExtension.class)
Expand All @@ -83,6 +87,7 @@ void before(ExtensionContext context) throws Exception {
NoOpPathOwnershipHandler.register(kernel);
ConfigPlatformResolver.initKernelWithMultiPlatformConfig(kernel,
DeploymentServiceIntegrationTest.class.getResource("onlyMain.yaml"));

// ensure deployment service starts
CountDownLatch deploymentServiceLatch = new CountDownLatch(1);
kernel.getContext().addGlobalStateChangeListener((service, oldState, newState) -> {
Expand All @@ -92,6 +97,7 @@ void before(ExtensionContext context) throws Exception {
}
});
setDeviceConfig(kernel, DeviceConfiguration.DEPLOYMENT_POLLING_FREQUENCY_SECONDS, 1L);

kernel.launch();
assertTrue(deploymentServiceLatch.await(10, TimeUnit.SECONDS));
deploymentQueue = kernel.getContext().get(DeploymentQueue.class);
Expand Down Expand Up @@ -191,6 +197,69 @@ public void onStreamClosed() {
}
}

@Test
void GIVEN_a_cloud_deployment_WHEN_receives_deployment_THEN_service_runs_and_deployment_succeeds() throws Exception {
CountDownLatch cdlDeployRedSignal = new CountDownLatch(1);
Consumer<GreengrassLogMessage> listener = m -> {

if (m.getMessage() != null) {
if (m.getMessage().contains("Current deployment finished") && m.getContexts().get("DeploymentId").equals("deployRedSignal")) {
cdlDeployRedSignal.countDown();
}
}
};

try (AutoCloseable l = TestUtils.createCloseableLogListener(listener)) {


CountDownLatch redSignalServiceLatch = new CountDownLatch(1);
kernel.getContext().addGlobalStateChangeListener((service, oldState, newState) -> {
if (service.getName().equals("RedSignal") && newState.equals(State.RUNNING)) {
redSignalServiceLatch.countDown();

}
});

submitSampleJobDocument(DeploymentServiceIntegrationTest.class.getResource("FleetConfigWithRedSignalService.json")
.toURI(), "deployRedSignal", DeploymentType.SHADOW); // DeploymentType.SHADOW is used here and it
// is same for DeploymentType.IOT_JOBS
assertTrue(redSignalServiceLatch.await(30, TimeUnit.SECONDS));
assertTrue(cdlDeployRedSignal.await(30, TimeUnit.SECONDS));
}
}

@Test
void GIVEN_cloud_deployment_has_required_capabilities_WHEN_receives_deployment_THEN_fail_with_proper_detailed_status() throws Exception {
CountDownLatch cdlDeployRedSignal = new CountDownLatch(1);
Consumer<GreengrassLogMessage> listener = m -> {
if (m.getMessage() != null) {
if (m.getMessage().contains("Current deployment finished") && m.getContexts().get("DeploymentId").equals("deployRedSignal")) {
cdlDeployRedSignal.countDown();
}
}
};

CountDownLatch deploymentCDL = new CountDownLatch(1);
DeploymentStatusKeeper deploymentStatusKeeper = kernel.getContext().get(DeploymentStatusKeeper.class);
deploymentStatusKeeper.registerDeploymentStatusConsumer(DeploymentType.SHADOW, (status) -> {
if (status.get(DEPLOYMENT_ID_KEY_NAME).equals("deployRedSignal") &&
status.get(DEPLOYMENT_STATUS_KEY_NAME).equals("FAILED")) {
deploymentCDL.countDown();
assertThat(((Map) status.get(DEPLOYMENT_STATUS_DETAILS_KEY_NAME)).get(DEPLOYMENT_FAILURE_CAUSE_KEY),
equalTo("The current nucleus version doesn't support one or more capabilities that are required by "
+ "this deployment: LARGE_CONFIGURATION, ANOTHER_CAPABILITY"));
}
return true;
},"dummy");

try (AutoCloseable l = TestUtils.createCloseableLogListener(listener)) {
submitSampleJobDocument(DeploymentServiceIntegrationTest.class.getResource("FleetConfigWithRequiredCapability.json")
.toURI(), "deployRedSignal", DeploymentType.SHADOW);
assertTrue(cdlDeployRedSignal.await(30, TimeUnit.SECONDS));
assertTrue(deploymentCDL.await(10,TimeUnit.SECONDS));
}
}

@Test
void WHEN_multiple_local_deployment_scheduled_THEN_all_deployments_succeed() throws Exception {

Expand Down Expand Up @@ -247,9 +316,9 @@ void WHEN_multiple_local_deployment_scheduled_THEN_all_deployments_succeed() thr

submitLocalDocument(request);

firstDeploymentCDL.await(10, TimeUnit.SECONDS);
secondDeploymentCDL.await(10, TimeUnit.SECONDS);
thirdDeploymentCDL.await(10, TimeUnit.SECONDS);
assertTrue(firstDeploymentCDL.await(10, TimeUnit.SECONDS), "First deployment did not succeed");
assertTrue(secondDeploymentCDL.await(10, TimeUnit.SECONDS), "Second deployment did not succeed");
assertTrue(thirdDeploymentCDL.await(10, TimeUnit.SECONDS), "Third deployment did not succeed");
}

@Test
Expand Down Expand Up @@ -286,6 +355,33 @@ void GIVEN_local_deployment_WHEN_component_has_circular_dependency_THEN_deployme
firstErroredCDL.await(10, TimeUnit.SECONDS);
}

@Test
void GIVEN_local_deployment_WHEN_required_capabilities_not_present_THEN_deployments_fails_with_appropriate_error() throws Exception {
CountDownLatch deploymentCDL = new CountDownLatch(1);
DeploymentStatusKeeper deploymentStatusKeeper = kernel.getContext().get(DeploymentStatusKeeper.class);
deploymentStatusKeeper.registerDeploymentStatusConsumer(DeploymentType.LOCAL, (status) -> {

if(status.get(DEPLOYMENT_ID_KEY_NAME).equals("requiredCapabilityNotPresent") &&
status.get(DEPLOYMENT_STATUS_KEY_NAME).equals("FAILED") &&
((Map)status.get(DEPLOYMENT_STATUS_DETAILS_KEY_NAME)).get(DEPLOYMENT_FAILURE_CAUSE_KEY)
.equals("The current nucleus version doesn't support one or more capabilities that are "
+ "required by this deployment: NOT_SUPPORTED_1, NOT_SUPPORTED_2, LARGE_CONFIGURATION")){
deploymentCDL.countDown();
}
return true;
},"DeploymentServiceIntegrationTest3" );

Map<String, String> componentsToMerge = new HashMap<>();
componentsToMerge.put("YellowSignal", "1.0.0");
LocalOverrideRequest request = LocalOverrideRequest.builder().requestId("requiredCapabilityNotPresent")
.componentsToMerge(componentsToMerge)
.requestTimestamp(System.currentTimeMillis())
.requiredCapabilities(Arrays.asList("NOT_SUPPORTED_1", "NOT_SUPPORTED_2", "LARGE_CONFIGURATION"))
.build();

submitLocalDocument(request);
assertTrue(deploymentCDL.await(10, TimeUnit.SECONDS));
}

private void submitSampleJobDocument(URI uri, String arn, DeploymentType type) throws Exception {
Configuration deploymentConfiguration = OBJECT_MAPPER.readValue(new File(uri), Configuration.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"configurationArn": "Test",
"requiredCapabilities": ["LARGE_CONFIGURATION", "ANOTHER_CAPABILITY"],
"components": {
"RedSignal": {
"version": "1.0.0"
}
},
"creationTimestamp": 1601276785085,
"failureHandlingPolicy": "ROLLBACK",
"componentUpdatePolicy": {
"timeout": 60,
"action": "NOTIFY_COMPONENTS"
},
"configurationValidationPolicy": {
"timeout": 60
}
}
77 changes: 53 additions & 24 deletions src/main/java/com/aws/greengrass/deployment/DeploymentService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.aws.greengrass.deployment.converter.DeploymentDocumentConverter;
import com.aws.greengrass.deployment.exceptions.DeploymentTaskFailureException;
import com.aws.greengrass.deployment.exceptions.InvalidRequestException;
import com.aws.greengrass.deployment.exceptions.MissingRequiredCapabilitiesException;
import com.aws.greengrass.deployment.model.Deployment;
import com.aws.greengrass.deployment.model.DeploymentDocument;
import com.aws.greengrass.deployment.model.DeploymentResult;
import com.aws.greengrass.deployment.model.DeploymentResult.DeploymentStatus;
import com.aws.greengrass.deployment.model.DeploymentTask;
import com.aws.greengrass.deployment.model.DeploymentTaskMetadata;
import com.aws.greengrass.deployment.model.LocalOverrideRequest;
Expand All @@ -51,6 +53,7 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -280,11 +283,11 @@ private void finishCurrentDeployment() throws InterruptedException {
// if something is going wrong
DeploymentResult result = currentDeploymentTaskMetadata.getDeploymentResultFuture().get();
if (result != null) {
DeploymentResult.DeploymentStatus deploymentStatus = result.getDeploymentStatus();
DeploymentStatus deploymentStatus = result.getDeploymentStatus();

Map<String, String> statusDetails = new HashMap<>();
statusDetails.put(DEPLOYMENT_DETAILED_STATUS_KEY, deploymentStatus.name());
if (DeploymentResult.DeploymentStatus.SUCCESSFUL.equals(deploymentStatus)) {
if (DeploymentStatus.SUCCESSFUL.equals(deploymentStatus)) {
//Add the root packages of successful deployment to the configuration
persistGroupToRootComponents(currentDeploymentTaskMetadata.getDeploymentDocument());

Expand Down Expand Up @@ -437,35 +440,46 @@ private void createNewDeployment(Deployment deployment) {
deploymentStatusKeeper.persistAndPublishDeploymentStatus(deployment.getId(), deployment.getDeploymentType(),
JobStatus.IN_PROGRESS.toString(), new HashMap<>());

if (DEFAULT.equals(deployment.getDeploymentStage())
&& DeploymentType.LOCAL.equals(deployment.getDeploymentType())) {
try {
copyRecipesAndArtifacts(deployment);
} catch (InvalidRequestException | IOException e) {
logger.atError().log("Error copying recipes and artifacts", e);
HashMap<String, String> statusDetails = new HashMap<>();
statusDetails.put("error", e.getMessage());
deploymentStatusKeeper.persistAndPublishDeploymentStatus(deployment.getId(),
deployment.getDeploymentType(), JobStatus.FAILED.toString(), statusDetails);
return;
}
}
if (DEFAULT.equals(deployment.getDeploymentStage())) {

try {
if (DEFAULT.equals(deployment.getDeploymentStage())) {
try {
context.get(KernelAlternatives.class).cleanupLaunchDirectoryLinks();
deploymentDirectoryManager.createNewDeploymentDirectory(deployment.getDeploymentDocumentObj()
.getDeploymentId());
deploymentDirectoryManager.writeDeploymentMetadata(deployment);
} catch (IOException ioException) {
logger.atError().log("Unable to create deployment directory", ioException);
updateDeploymentResultAsFailed(deployment, deploymentTask, true,
new DeploymentTaskFailureException(ioException));
return;
}

List<String> requiredCapabilities = deployment.getDeploymentDocumentObj().getRequiredCapabilities();
if (requiredCapabilities != null && !requiredCapabilities.isEmpty()) {
List<String> missingCapabilities = requiredCapabilities.stream()
.filter(reqCapabilities -> !kernel.getSupportedCapabilities().contains(reqCapabilities))
.collect(Collectors.toList());
if (!missingCapabilities.isEmpty()) {
updateDeploymentResultAsFailed(deployment, deploymentTask, false,
new MissingRequiredCapabilitiesException("The current nucleus version doesn't support one "
+ "or more capabilities that are required by this deployment: "
+ String.join(", ", missingCapabilities)));
return;
}
}

if (DeploymentType.LOCAL.equals(deployment.getDeploymentType())) {
try {
copyRecipesAndArtifacts(deployment);
} catch (InvalidRequestException | IOException e) {
logger.atError().log("Error copying recipes and artifacts", e);
updateDeploymentResultAsFailed(deployment, deploymentTask, false, e);
return;
}
}
} catch (IOException ioException) {
logger.atError().log("Unable to create deployment directory", ioException);
CompletableFuture<DeploymentResult> process = new CompletableFuture<>();
process.completeExceptionally(new DeploymentTaskFailureException(ioException));
currentDeploymentTaskMetadata = new DeploymentTaskMetadata(deploymentTask, process, deployment.getId(),
deployment.getDeploymentType(), new AtomicInteger(1), deployment.getDeploymentDocumentObj(), false);
return;
}


Future<DeploymentResult> process = executorService.submit(deploymentTask);
logger.atInfo().kv("deployment", deployment.getId()).log("Started deployment execution");

Expand All @@ -474,6 +488,21 @@ private void createNewDeployment(Deployment deployment) {
new AtomicInteger(1), deployment.getDeploymentDocumentObj(), cancellable);
}

private void updateDeploymentResultAsFailed(Deployment deployment, DeploymentTask deploymentTask,
boolean completeExceptionally, Exception e) {
DeploymentResult result = new DeploymentResult(DeploymentStatus.FAILED_NO_STATE_CHANGE, e);
CompletableFuture<DeploymentResult> process;
if (completeExceptionally) {
process = new CompletableFuture<>();
process.completeExceptionally(e);
} else {
process = CompletableFuture.completedFuture(result);
}
currentDeploymentTaskMetadata = new DeploymentTaskMetadata(deploymentTask, process, deployment.getId(),
deployment.getDeploymentType(), new AtomicInteger(1),
deployment.getDeploymentDocumentObj(), false);
}

@SuppressWarnings("PMD.ExceptionAsFlowControl")
private void copyRecipesAndArtifacts(Deployment deployment) throws InvalidRequestException, IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public static DeploymentDocument convertFromLocalOverrideRequestAndRoot(LocalOve
return DeploymentDocument.builder().timestamp(localOverrideRequest.getRequestTimestamp())
.deploymentId(localOverrideRequest.getRequestId())
.deploymentPackageConfigurationList(packageConfigurations)
.requiredCapabilities(localOverrideRequest.getRequiredCapabilities())
.failureHandlingPolicy(FailureHandlingPolicy.DO_NOTHING) // Can't rollback for local deployment
// Currently we skip update policy check for local deployment to not slow down testing for customers
// If we make this configurable in local development then we can plug that input in here
Expand Down Expand Up @@ -142,6 +143,7 @@ public static DeploymentDocument convertFromDeploymentConfiguration(Configuratio

DeploymentDocument.DeploymentDocumentBuilder builder =
DeploymentDocument.builder().deploymentId(config.getConfigurationArn())
.requiredCapabilities(config.getRequiredCapabilities())
.deploymentPackageConfigurationList(convertComponents(config.getComponents()))
.groupName(parseGroupNameFromConfigurationArn(config)).timestamp(config.getCreationTimestamp());
if (config.getFailureHandlingPolicy() == null) {
Expand Down Expand Up @@ -172,6 +174,7 @@ public static DeploymentDocument convertFromDeploymentConfiguration(Configuratio
config.getConfigurationValidationPolicy())
);
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.deployment.exceptions;

public class MissingRequiredCapabilitiesException extends DeploymentException {

static final long serialVersionUID = -3387516993124229948L;

public MissingRequiredCapabilitiesException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class DeploymentDocument {
@JsonProperty("Packages")
private List<DeploymentPackageConfiguration> deploymentPackageConfigurationList;

@JsonProperty("RequiredCapabilities")
private List<String> requiredCapabilities;

@JsonProperty("GroupName")
private String groupName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class LocalOverrideRequest {
Map<String, String> componentsToMerge; // name to version
List<String> componentsToRemove; // remove just need name
String groupName;
List<String> requiredCapabilities;

@Deprecated
Map<String, Map<String, Object>> componentNameToConfig;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/aws/greengrass/lifecyclemanager/Kernel.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class Kernel {
private static final String DEPLOYMENT_STAGE_LOG_KEY = "stage";
protected static final ObjectMapper CONFIG_YAML_WRITER =
YAMLMapper.builder().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET).build();
private static final List<String> SUPPORTED_CAPABILITIES = Collections.emptyList();

@Getter
private final Context context;
Expand Down Expand Up @@ -662,4 +664,8 @@ private void setupProxy() {
public String deTilde(String filename) {
return kernelCommandLine.deTilde(filename);
}

public List<String> getSupportedCapabilities() {
return SUPPORTED_CAPABILITIES;
}
}
Loading

0 comments on commit 306c09e

Please sign in to comment.