diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java
index 82622b1e7c3e..114f64914f0a 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java
@@ -34,7 +34,11 @@
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.proto.operation.OperationMeta;
+import io.cdap.cdap.proto.operation.OperationRun;
+import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.PushAppRequest;
+import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigRequest;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigValidationException;
@@ -47,7 +51,6 @@
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
-
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -183,6 +186,49 @@ public void pushApp(FullHttpRequest request, HttpResponder responder,
}
}
+ /**
+ * Pushes application configs of requested applications to linked repository in Json format.
+ * It expects a post body that has a list of application ids and an optional commit message
+ * E.g.
+ *
+ *
+ * {@code
+ * {
+ * "appIds": ["app_id_1", "app_id_2"],
+ * "commitMessage": "pushed application XYZ"
+ * }
+ * }
+ *
+ *
+ * The response will be a {@link OperationMeta} object, which encapsulates the application name,
+ * version and fileHash.
+ */
+ @POST
+ @Path("/apps/push")
+ public void pushApps(FullHttpRequest request, HttpResponder responder,
+ @PathParam("namespace-id") String namespaceId) throws Exception {
+ checkSourceControlMultiFeatureFlag();
+ PushMultipleAppsRequest appsRequest;
+ try {
+ appsRequest = parseBody(request, PushMultipleAppsRequest.class);
+ } catch (JsonSyntaxException e) {
+ throw new BadRequestException(String.format("Invalid request body: %s", e.getMessage()));
+ }
+
+ if (appsRequest == null) {
+ throw new BadRequestException("Invalid request body.");
+ }
+
+ if (Strings.isNullOrEmpty(appsRequest.getCommitMessage())) {
+ throw new BadRequestException("Please specify commit message in the request body.");
+ }
+
+ NamespaceId namespace = validateNamespaceId(namespaceId);
+
+ OperationRun operationMeta = sourceControlService.pushApps(namespace, appsRequest);
+ responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta));
+ }
+
/**
* Pull the requested application from linked repository and deploy in current namespace.
*/
@@ -202,6 +248,31 @@ public void pullApp(FullHttpRequest request, HttpResponder responder,
}
}
+ /**
+ * Pull the requested applications from linked repository and deploy in current namespace.
+ */
+ @POST
+ @Path("/apps/pull")
+ public void pullApps(FullHttpRequest request, HttpResponder responder,
+ @PathParam("namespace-id") String namespaceId) throws Exception {
+ checkSourceControlMultiFeatureFlag();
+ NamespaceId namespace = validateNamespaceId(namespaceId);
+
+ PullMultipleAppsRequest appsRequest;
+ try {
+ appsRequest = parseBody(request, PullMultipleAppsRequest.class);
+ } catch (JsonSyntaxException e) {
+ throw new BadRequestException("Invalid request body.", e);
+ }
+
+ if (appsRequest == null) {
+ throw new BadRequestException("Invalid request body.");
+ }
+
+ OperationRun operationRun = sourceControlService.pullApps(namespace, appsRequest);
+ responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationRun));
+ }
+
private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws BadRequestException {
PushAppRequest appRequest;
try {
@@ -217,16 +288,19 @@ private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws
return appRequest;
}
- /**
- *
- * throws {@link ForbiddenException} if the feature is disabled
- */
private void checkSourceControlFeatureFlag() throws ForbiddenException {
if (!Feature.SOURCE_CONTROL_MANAGEMENT_GIT.isEnabled(featureFlagsProvider)) {
throw new ForbiddenException("Source Control Management feature is not enabled.");
}
}
+ private void checkSourceControlMultiFeatureFlag() throws ForbiddenException {
+ checkSourceControlFeatureFlag();
+ if (!Feature.SOURCE_CONTROL_MANAGEMENT_MULTI_APP.isEnabled(featureFlagsProvider)) {
+ throw new ForbiddenException("Source Control Management for multiple apps feature is not enabled.");
+ }
+ }
+
private NamespaceId validateNamespaceId(String namespaceId) throws BadRequestException {
try {
return new NamespaceId(namespaceId);
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java
index 9e24ecac06b2..80e2f5cbbf17 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java
@@ -23,9 +23,13 @@
import io.cdap.cdap.common.NamespaceNotFoundException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.RepositoryNotFoundException;
+import io.cdap.cdap.common.TooManyRequestsException;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
+import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
+import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
+import io.cdap.cdap.internal.operation.OperationLifecycleManager;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.artifact.AppRequest;
@@ -33,8 +37,11 @@
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.KerberosPrincipalId;
import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.security.NamespacePermission;
import io.cdap.cdap.proto.security.StandardPermission;
+import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest;
+import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta;
@@ -62,15 +69,17 @@
import io.cdap.cdap.store.NamespaceTable;
import io.cdap.cdap.store.RepositoryTable;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Service that manages source control for repositories and applications.
- * It exposes repository CRUD apis and source control tasks that do pull/pull/list applications in linked repository.
+ * Service that manages source control for repositories and applications. It exposes repository CRUD
+ * apis and source control tasks that do pull/pull/list applications in linked repository.
*/
public class SourceControlManagementService {
+
private final AccessEnforcer accessEnforcer;
private final AuthenticationContext authenticationContext;
private final TransactionRunner transactionRunner;
@@ -79,6 +88,7 @@ public class SourceControlManagementService {
private final SourceControlOperationRunner sourceControlOperationRunner;
private final ApplicationLifecycleService appLifecycleService;
private final Store store;
+ private final OperationLifecycleManager operationLifecycleManager;
private static final Logger LOG = LoggerFactory.getLogger(SourceControlManagementService.class);
@@ -87,13 +97,14 @@ public class SourceControlManagementService {
*/
@Inject
public SourceControlManagementService(CConfiguration cConf,
- SecureStore secureStore,
- TransactionRunner transactionRunner,
- AccessEnforcer accessEnforcer,
- AuthenticationContext authenticationContext,
- SourceControlOperationRunner sourceControlOperationRunner,
- ApplicationLifecycleService applicationLifecycleService,
- Store store) {
+ SecureStore secureStore,
+ TransactionRunner transactionRunner,
+ AccessEnforcer accessEnforcer,
+ AuthenticationContext authenticationContext,
+ SourceControlOperationRunner sourceControlOperationRunner,
+ ApplicationLifecycleService applicationLifecycleService,
+ Store store,
+ OperationLifecycleManager operationLifecycleManager) {
this.cConf = cConf;
this.secureStore = secureStore;
this.transactionRunner = transactionRunner;
@@ -102,13 +113,16 @@ public SourceControlManagementService(CConfiguration cConf,
this.sourceControlOperationRunner = sourceControlOperationRunner;
this.appLifecycleService = applicationLifecycleService;
this.store = store;
+ this.operationLifecycleManager = operationLifecycleManager;
}
- private RepositoryTable getRepositoryTable(StructuredTableContext context) throws TableNotFoundException {
+ private RepositoryTable getRepositoryTable(StructuredTableContext context)
+ throws TableNotFoundException {
return new RepositoryTable(context);
}
- private NamespaceTable getNamespaceTable(StructuredTableContext context) throws TableNotFoundException {
+ private NamespaceTable getNamespaceTable(StructuredTableContext context)
+ throws TableNotFoundException {
return new NamespaceTable(context);
}
@@ -121,7 +135,7 @@ private NamespaceTable getNamespaceTable(StructuredTableContext context) throws
* @throws NamespaceNotFoundException if the namespace is non-existent
*/
public RepositoryMeta setRepository(NamespaceId namespace, RepositoryConfig repository)
- throws NamespaceNotFoundException {
+ throws NamespaceNotFoundException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
NamespacePermission.UPDATE_REPOSITORY_METADATA);
@@ -155,9 +169,11 @@ public void deleteRepository(NamespaceId namespace) {
/**
* Return the repository config for the given namespace.
*
- * @throws RepositoryNotFoundException if repository config is not available for the namespace
+ * @throws RepositoryNotFoundException if repository config is not available for the
+ * namespace
*/
- public RepositoryMeta getRepositoryMeta(NamespaceId namespace) throws RepositoryNotFoundException {
+ public RepositoryMeta getRepositoryMeta(NamespaceId namespace)
+ throws RepositoryNotFoundException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), StandardPermission.GET);
return TransactionRunners.run(transactionRunner, context -> {
@@ -177,10 +193,11 @@ public RepositoryMeta getRepositoryMeta(NamespaceId namespace) throws Repository
* @throws RemoteRepositoryValidationException if validation of remote repository fails
*/
public void validateRepository(NamespaceId namespace, RepositoryConfig repoConfig)
- throws RemoteRepositoryValidationException {
+ throws RemoteRepositoryValidationException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
NamespacePermission.UPDATE_REPOSITORY_METADATA);
- RepositoryManager.validateConfig(secureStore, new SourceControlConfig(namespace, repoConfig, cConf));
+ RepositoryManager.validateConfig(secureStore,
+ new SourceControlConfig(namespace, repoConfig, cConf));
}
/**
@@ -189,27 +206,31 @@ public void validateRepository(NamespaceId namespace, RepositoryConfig repoConfi
* @param appRef {@link ApplicationReference}
* @param commitMessage enforced commit message from user
* @return {@link PushAppResponse}
- * @throws NotFoundException if the application is not found or the repository config is not found
- * @throws IOException if {@link ApplicationLifecycleService} fails to get the adminOwner store
+ * @throws NotFoundException if the application is not found or the repository config is not
+ * found
+ * @throws IOException if {@link ApplicationLifecycleService} fails to get the adminOwner
+ * store
* @throws SourceControlException if {@link SourceControlOperationRunner} fails to push
* @throws AuthenticationConfigException if the repository configuration authentication fails
- * @throws NoChangesToPushException if there's no change of the application between namespace and linked repository
+ * @throws NoChangesToPushException if there's no change of the application between namespace
+ * and linked repository
*/
public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage)
- throws NotFoundException, IOException, NoChangesToPushException, AuthenticationConfigException {
+ throws NotFoundException, IOException, NoChangesToPushException, AuthenticationConfigException {
accessEnforcer.enforce(appRef.getParent(), authenticationContext.getPrincipal(),
NamespacePermission.WRITE_REPOSITORY);
// TODO: CDAP-20396 RepositoryConfig is currently only accessible from the service layer
// Need to fix it and avoid passing it in RepositoryManagerFactory
RepositoryConfig repoConfig = getRepositoryMeta(appRef.getParent()).getConfig();
-
+
// AppLifecycleService already enforces ApplicationDetail Access
ApplicationDetail appDetail = appLifecycleService.getLatestAppDetail(appRef, false);
String committer = authenticationContext.getPrincipal().getName();
// TODO CDAP-20371 revisit and put correct Author and Committer, for now they are the same
- CommitMeta commitMeta = new CommitMeta(committer, committer, System.currentTimeMillis(), commitMessage);
+ CommitMeta commitMeta = new CommitMeta(committer, committer, System.currentTimeMillis(),
+ commitMessage);
LOG.info("Start to push app {} in namespace {} to linked repository by user {}",
appRef.getApplication(),
@@ -217,14 +238,14 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage
appLifecycleService.decodeUserId(authenticationContext));
PushAppResponse pushResponse = sourceControlOperationRunner.push(
- new PushAppOperationRequest(appRef.getParent(), repoConfig, appDetail, commitMeta)
+ new PushAppOperationRequest(appRef.getParent(), repoConfig, appDetail, commitMeta)
);
LOG.info("Successfully pushed app {} in namespace {} to linked repository by user {}",
appRef.getApplication(),
appRef.getParent(),
appLifecycleService.decodeUserId(authenticationContext));
-
+
SourceControlMeta sourceControlMeta = new SourceControlMeta(pushResponse.getFileHash());
ApplicationId appId = appRef.app(appDetail.getAppVersion());
store.setAppSourceControlMeta(appId, sourceControlMeta);
@@ -239,7 +260,8 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage
* @return {@link ApplicationRecord} of the deployed application.
* @throws Exception when {@link ApplicationLifecycleService} fails to deploy.
* @throws NoChangesToPullException if the fileHashes are the same
- * @throws NotFoundException if the repository config is not found or the application in repository is not found
+ * @throws NotFoundException if the repository config is not found or the application in
+ * repository is not found
* @throws SourceControlException if unexpected errors happen when pulling the application.
* @throws AuthenticationConfigException if the repository configuration authentication fails
*/
@@ -251,11 +273,12 @@ public ApplicationRecord pullAndDeploy(ApplicationReference appRef) throws Excep
accessEnforcer.enforce(appId, authenticationContext.getPrincipal(), StandardPermission.CREATE);
accessEnforcer.enforce(appRef.getParent(), authenticationContext.getPrincipal(),
NamespacePermission.READ_REPOSITORY);
-
+
PullAppResponse> pullResponse = pullAndValidateApplication(appRef);
AppRequest> appRequest = pullResponse.getAppRequest();
- SourceControlMeta sourceControlMeta = new SourceControlMeta(pullResponse.getApplicationFileHash());
+ SourceControlMeta sourceControlMeta = new SourceControlMeta(
+ pullResponse.getApplicationFileHash());
LOG.info("Start to deploy app {} in namespace {} by user {}",
appId.getApplication(),
@@ -263,34 +286,40 @@ public ApplicationRecord pullAndDeploy(ApplicationReference appRef) throws Excep
appLifecycleService.decodeUserId(authenticationContext));
ApplicationWithPrograms app = appLifecycleService.deployApp(appId, appRequest,
- sourceControlMeta, x -> { }, false);
-
- LOG.info("Successfully deployed app {} in namespace {} from artifact {} with configuration {} and "
- + "principal {}", app.getApplicationId().getApplication(), app.getApplicationId().getNamespace(),
- app.getArtifactId(), appRequest.getConfig(), app.getOwnerPrincipal()
+ sourceControlMeta, x -> {
+ }, false);
+
+ LOG.info(
+ "Successfully deployed app {} in namespace {} from artifact {} with configuration {} and "
+ + "principal {}", app.getApplicationId().getApplication(),
+ app.getApplicationId().getNamespace(),
+ app.getArtifactId(), appRequest.getConfig(), app.getOwnerPrincipal()
);
return new ApplicationRecord(
- ArtifactSummary.from(app.getArtifactId().toApiArtifactId()),
- app.getApplicationId().getApplication(),
- app.getApplicationId().getVersion(),
- app.getSpecification().getDescription(),
- Optional.ofNullable(app.getOwnerPrincipal()).map(KerberosPrincipalId::getPrincipal).orElse(null),
- app.getChangeDetail(), app.getSourceControlMeta());
+ ArtifactSummary.from(app.getArtifactId().toApiArtifactId()),
+ app.getApplicationId().getApplication(),
+ app.getApplicationId().getVersion(),
+ app.getSpecification().getDescription(),
+ Optional.ofNullable(app.getOwnerPrincipal()).map(KerberosPrincipalId::getPrincipal)
+ .orElse(null),
+ app.getChangeDetail(), app.getSourceControlMeta());
}
/**
- * Pull the application from repository, look up the fileHash in store and compare it with the cone in repository.
+ * Pull the application from repository, look up the fileHash in store and compare it with the
+ * cone in repository.
*
* @param appRef {@link ApplicationReference} to fetch the application with
* @return {@link PullAppResponse}
* @throws NoChangesToPullException if the fileHashes are the same
- * @throws NotFoundException if the repository config is not found or the application in repository is not found
+ * @throws NotFoundException if the repository config is not found or the application in
+ * repository is not found
* @throws SourceControlException if unexpected errors happen when pulling the application.
* @throws AuthenticationConfigException if the repository configuration authentication fails
*/
private PullAppResponse> pullAndValidateApplication(ApplicationReference appRef)
- throws NoChangesToPullException, NotFoundException, AuthenticationConfigException {
+ throws NoChangesToPullException, NotFoundException, AuthenticationConfigException {
RepositoryConfig repoConfig = getRepositoryMeta(appRef.getParent()).getConfig();
SourceControlMeta latestMeta = store.getAppSourceControlMeta(appRef);
PullAppResponse> pullResponse = sourceControlOperationRunner.pull(
@@ -298,8 +327,9 @@ private PullAppResponse> pullAndValidateApplication(ApplicationReference appRe
if (latestMeta != null
&& latestMeta.getFileHash().equals(pullResponse.getApplicationFileHash())) {
- throw new NoChangesToPullException(String.format("Pipeline deployment was not successful because there is "
- + "no new change for the pulled application: %s", appRef));
+ throw new NoChangesToPullException(
+ String.format("Pipeline deployment was not successful because there is "
+ + "no new change for the pulled application: %s", appRef));
}
return pullResponse;
}
@@ -308,15 +338,61 @@ private PullAppResponse> pullAndValidateApplication(ApplicationReference appRe
* The method to list all applications found in linked repository.
*
* @return {@link RepositoryAppsResponse}
- * @throws RepositoryNotFoundException if the repository config is not found
+ * @throws RepositoryNotFoundException if the repository config is not found
* @throws AuthenticationConfigException if git auth config is not found
- * @throws SourceControlException if {@link SourceControlOperationRunner} fails to list applications
+ * @throws SourceControlException if {@link SourceControlOperationRunner} fails to list
+ * applications
*/
public RepositoryAppsResponse listApps(NamespaceId namespace) throws NotFoundException,
- AuthenticationConfigException {
+ AuthenticationConfigException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
NamespacePermission.READ_REPOSITORY);
RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
return sourceControlOperationRunner.list(new NamespaceRepository(namespace, repoConfig));
}
+
+ /**
+ * The method to push multiple applications in the same namespace to the linked repository.
+ *
+ * @param namespace {@link NamespaceId} from where the apps are to be pushed
+ * @param request {@link PushMultipleAppsRequest} containing the appIds and the commit
+ * message
+ * @return {@link OperationRun} of the operation to push the apps
+ * @throws NotFoundException when the repository or any of the apps are not found
+ */
+ public OperationRun pushApps(NamespaceId namespace, PushMultipleAppsRequest request)
+ throws NotFoundException, IOException, TooManyRequestsException {
+ accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
+ NamespacePermission.WRITE_REPOSITORY);
+ RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
+ String principal = authenticationContext.getPrincipal().getName();
+ CommitMeta commitMeta = new CommitMeta(principal, principal, System.currentTimeMillis(),
+ request.getCommitMessage());
+ PushAppsRequest pushOpRequest = new PushAppsRequest(new HashSet<>(request.getApps()),
+ repoConfig, commitMeta);
+ return operationLifecycleManager.createPushOperation(namespace.getNamespace(),
+ RunIds.generate().getId(), pushOpRequest, principal);
+ }
+
+ /**
+ * The method to pull multiple applications from the linked repository and deploy them in current
+ * namespace.
+ *
+ * @param namespace {@link NamespaceId} from where the apps are to be pushed
+ * @param request {@link PullMultipleAppsRequest} containing the appIds
+ * @return {@link OperationRun} of the operation to push the apps
+ * @throws NotFoundException when the repository or any of the apps are not found
+ */
+ public OperationRun pullApps(NamespaceId namespace, PullMultipleAppsRequest request)
+ throws NotFoundException, IOException, TooManyRequestsException {
+ accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
+ NamespacePermission.READ_REPOSITORY);
+ RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
+ String principal = authenticationContext.getPrincipal().getName();
+ PullAppsRequest pullOpRequest = new PullAppsRequest(new HashSet<>(request.getApps()),
+ repoConfig);
+
+ return operationLifecycleManager.createPullOperation(namespace.getNamespace(),
+ RunIds.generate().getId(), pullOpRequest, principal);
+ }
}
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java
index 87f75810de6c..35a0d9160e95 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java
@@ -18,13 +18,20 @@
import com.google.inject.Inject;
import io.cdap.cdap.common.BadRequestException;
+import io.cdap.cdap.common.TooManyRequestsException;
+import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
+import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
+import io.cdap.cdap.proto.operation.OperationMeta;
+import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.operation.OperationRunStatus;
+import io.cdap.cdap.proto.operation.OperationType;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
+import java.time.Instant;
import java.util.Collections;
import java.util.function.Consumer;
import org.slf4j.Logger;
@@ -67,15 +74,15 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize,
while (currentLimit > 0) {
ScanOperationRunsRequest batchRequest = ScanOperationRunsRequest
.builder(request)
- .setScanAfter(lastKey)
- .setLimit(Math.min(txBatchSize, currentLimit))
- .build();
+ .setScanAfter(lastKey)
+ .setLimit(Math.min(txBatchSize, currentLimit))
+ .build();
request = batchRequest;
lastKey = TransactionRunners.run(transactionRunner, context -> {
- return getOperationRunStore(context).scanOperations(batchRequest, consumer);
- }, IOException.class, OperationRunNotFoundException.class);
+ return getOperationRunStore(context).scanOperations(batchRequest, consumer);
+ }, IOException.class, OperationRunNotFoundException.class);
if (lastKey == null) {
break;
@@ -105,7 +112,7 @@ public OperationRunDetail getOperationRun(OperationRunId runId)
}
/**
- * Validates state transition and sends STOPPING notification for an operation
+ * Validates state transition and sends STOPPING notification for an operation.
*
* @param runId {@link OperationRunId} of the operation
*/
@@ -129,6 +136,63 @@ public OperationController startOperation(OperationRunDetail detail) {
return runtime.run(detail);
}
+ /**
+ * Create a new pull operation. Inserts the run in DB and then send TMS message.
+ */
+ public OperationRun createPushOperation(String namespace, String runId, PushAppsRequest request,
+ String principal)
+ throws IOException, TooManyRequestsException {
+ OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build();
+ OperationRunId operationRunId = new OperationRunId(namespace, runId);
+ OperationRun run = OperationRun.builder()
+ .setRunId(runId)
+ .setMetadata(meta)
+ .setStatus(OperationRunStatus.STARTING)
+ .setType(OperationType.PUSH_APPS)
+ .build();
+ OperationRunDetail detail = OperationRunDetail.builder()
+ .setRun(run)
+ .setPrincipal(principal)
+ .setPushAppsRequest(request)
+ .setRunId(operationRunId)
+ .build();
+ TransactionRunners.run(transactionRunner, context -> {
+ validateOnlyOneGitOperationRunning(namespace, context);
+ getOperationRunStore(context).createOperationRun(operationRunId, detail);
+ statePublisher.publishStarting(operationRunId);
+ }, TooManyRequestsException.class, IOException.class);
+ return run;
+ }
+
+ /**
+ * Create a new pull operation. Inserts the run in DB and then send TMS message.
+ */
+ public OperationRun createPullOperation(String namespace, String runId, PullAppsRequest request,
+ String principal)
+ throws IOException, TooManyRequestsException {
+ OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build();
+ OperationRunId operationRunId = new OperationRunId(namespace, runId);
+ OperationRun run = OperationRun.builder()
+ .setRunId(runId)
+ .setMetadata(meta)
+ .setStatus(OperationRunStatus.STARTING)
+ .setType(OperationType.PULL_APPS)
+ .build();
+ OperationRunDetail detail = OperationRunDetail.builder()
+ .setRun(run)
+ .setPrincipal(principal)
+ .setPullAppsRequest(request)
+ .setRunId(operationRunId)
+ .build();
+ TransactionRunners.run(transactionRunner, context -> {
+ validateOnlyOneGitOperationRunning(namespace, context);
+ getOperationRunStore(context).createOperationRun(operationRunId, detail);
+ statePublisher.publishStarting(operationRunId);
+ }, TooManyRequestsException.class, IOException.class);
+
+ return run;
+ }
+
/**
* Initiate operation stop. It is the responsibility of the caller to validate state transition.
@@ -146,8 +210,8 @@ public void stopOperation(OperationRunDetail detail) {
}
/**
- * Checks if the operation is running. If not sends a failure notification
- * Called after service restart.
+ * Checks if the operation is running. If not sends a failure notification Called after service
+ * restart.
*
* @param detail {@link OperationRunDetail} of the operation
*/
@@ -162,6 +226,24 @@ public void isRunning(OperationRunDetail detail, OperationStatePublisher statePu
}
}
+ // Validate only one multi git operation running at a time
+ private void validateOnlyOneGitOperationRunning(String namespaceId,
+ StructuredTableContext context)
+ throws TooManyRequestsException, OperationRunNotFoundException, IOException {
+ OperationRunStore store = getOperationRunStore(context);
+ OperationRunDetail existing = store.getLatestActiveOperation(namespaceId,
+ OperationType.PULL_APPS);
+ if (existing != null) {
+ throw new TooManyRequestsException(
+ String.format("Already running a bulk pull operation %s", existing.getRun()));
+ }
+ existing = store.getLatestActiveOperation(namespaceId, OperationType.PUSH_APPS);
+ if (existing != null) {
+ throw new TooManyRequestsException(
+ String.format("Already running a bulk push operation %s", existing.getRun()));
+ }
+ }
+
private OperationRunStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunStore(context);
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java
index 407ce6d3a1a4..ae3cbaf039da 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java
@@ -64,9 +64,8 @@ public class OperationRunDetail {
protected OperationRunDetail(
OperationRunId runId, OperationRun run,
- byte[] sourceId, @Nullable String principal,
- @Nullable PullAppsRequest pullAppsRequest,
- @Nullable PushAppsRequest pushAppsRequest) {
+ @Nullable byte[] sourceId, @Nullable String principal,
+ @Nullable PullAppsRequest pullAppsRequest, @Nullable PushAppsRequest pushAppsRequest) {
this.runId = runId;
this.run = run;
this.sourceId = sourceId;
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java
index aeeee7e8a978..702aa0e93659 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java
@@ -27,6 +27,7 @@
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.operation.OperationRunStatus;
+import io.cdap.cdap.proto.operation.OperationType;
import io.cdap.cdap.spi.data.SortOrder;
import io.cdap.cdap.spi.data.StructuredRow;
import io.cdap.cdap.spi.data.StructuredTable;
@@ -44,6 +45,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nullable;
@@ -260,6 +262,43 @@ public void scanOperationByStatus(OperationRunStatus status,
}
}
+ /**
+ * Get the latest active operation of a given type in a namespace.
+ *
+ * @param namespaceId to serach in
+ * @param type {@link OperationType} to search
+ */
+ public OperationRunDetail getLatestActiveOperation(String namespaceId, OperationType type)
+ throws OperationRunNotFoundException, IOException {
+ AtomicReference latestRun = new AtomicReference<>();
+ // get STARTING
+ ScanOperationRunsRequest request = ScanOperationRunsRequest.builder()
+ .setNamespace(namespaceId).setFilter(new OperationRunFilter(type, OperationRunStatus.STARTING))
+ .setLimit(1).build();
+ scanOperations(request, latestRun::set);
+
+ // get RUNNING
+ request = ScanOperationRunsRequest.builder()
+ .setNamespace(namespaceId).setFilter(new OperationRunFilter(type, OperationRunStatus.RUNNING))
+ .setLimit(1).build();
+ scanOperations(request, detail -> {
+ if (latestRun.get() != null && isRunLater(detail.getRun(), latestRun.get().getRun())) {
+ latestRun.set(detail);
+ }
+ });
+
+ // get STOPPING
+ request = ScanOperationRunsRequest.builder()
+ .setNamespace(namespaceId).setFilter(new OperationRunFilter(type, OperationRunStatus.STOPPING))
+ .setLimit(1).build();
+ scanOperations(request, detail -> {
+ if (latestRun.get() != null && isRunLater(detail.getRun(), latestRun.get().getRun())) {
+ latestRun.set(detail);
+ }
+ });
+ return latestRun.get();
+ }
+
private List> getRangeFields(OperationRunId runId)
throws IOException, OperationRunNotFoundException {
List> fields = new ArrayList<>();
@@ -342,6 +381,10 @@ private StructuredTable getOperationRunsTable(StructuredTableContext context) {
return context.getTable(StoreDefinition.OperationRunsStore.OPERATION_RUNS);
}
+private boolean isRunLater(OperationRun run1, OperationRun run2) {
+ return run1.getMetadata().getCreateTime().isAfter(run2.getMetadata().getCreateTime());
+}
+
@VisibleForTesting
// USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS
void clearData() throws IOException {
diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java
index abf6a5a1c8ef..4807c5599f84 100644
--- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java
+++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java
@@ -109,7 +109,9 @@
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.id.ProgramReference;
import io.cdap.cdap.proto.profile.Profile;
+import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.PushAppRequest;
+import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest;
import io.cdap.cdap.runtime.spi.profile.ProfileStatus;
import io.cdap.cdap.scheduler.CoreSchedulerService;
import io.cdap.cdap.scheduler.Scheduler;
@@ -517,6 +519,16 @@ private HttpResponse addArtifact(Id.Artifact artifactId, InputSupplier extends
return HttpRequests.execute(builder.build(), httpRequestConfig);
}
+ /**
+ * Deploys an application but does not mark the new version as latest.
+ */
+ protected HttpResponse deployWithoutMarkingLatest(Id.Application appId, AppRequest> appRequest)
+ throws Exception {
+ String deployPath = getVersionedInternalApiPath(
+ "apps/" + appId.getId() + "?skipMarkingLatest=true", appId.getNamespaceId());
+ return executeDeploy(HttpRequest.put(getEndPoint(deployPath).toURL()), appRequest);
+ }
+
/**
* Deploys an application.
*/
@@ -547,13 +559,6 @@ protected HttpResponse deploy(Id.Application appId,
return executeDeploy(HttpRequest.put(getEndPoint(deployPath).toURL()), appRequest);
}
- protected HttpResponse deployWithoutMarkingLatest(Id.Application appId, AppRequest> appRequest)
- throws Exception {
- String deployPath = getVersionedInternalApiPath(
- "apps/" + appId.getId() + "?skipMarkingLatest=true", appId.getNamespaceId());
- return executeDeploy(HttpRequest.put(getEndPoint(deployPath).toURL()), appRequest);
- }
-
protected HttpResponse deploy(ApplicationId appId, AppRequest extends Config> appRequest) throws Exception {
String deployPath = getVersionedApiPath(String.format("apps/%s/versions/%s/create", appId.getApplication(),
appId.getVersion()),
@@ -709,6 +714,13 @@ protected ApplicationDetail getAppDetails(String namespace, String appName) thro
return readResponse(response, ApplicationDetail.class);
}
+ protected ApplicationDetail getAppDetails(String namespace, String appName, String appVersion) throws Exception {
+ HttpResponse response = getAppResponse(namespace, appName, appVersion);
+ assertResponseCode(200, response);
+ Assert.assertEquals("application/json", getFirstHeaderValue(response, HttpHeaderNames.CONTENT_TYPE.toString()));
+ return readResponse(response, ApplicationDetail.class);
+ }
+
protected HttpResponse getAppResponse(String namespace, String appName) throws Exception {
return doGet(getVersionedApiPath(String.format("apps/%s", appName),
Constants.Gateway.API_VERSION_3_TOKEN, namespace));
@@ -729,13 +741,6 @@ protected Set getAppVersions(String namespace, String appName) throws Ex
return readResponse(response, SET_STRING_TYPE);
}
- protected ApplicationDetail getAppDetails(String namespace, String appName, String appVersion) throws Exception {
- HttpResponse response = getAppResponse(namespace, appName, appVersion);
- assertResponseCode(200, response);
- Assert.assertEquals("application/json", getFirstHeaderValue(response, HttpHeaderNames.CONTENT_TYPE.toString()));
- return readResponse(response, ApplicationDetail.class);
- }
-
/**
* Checks the given schedule states.
*/
@@ -1402,6 +1407,15 @@ protected List getProgramRuns(Id.Program program, ProgramRunStatus st
return GSON.fromJson(response.getResponseBodyAsString(), LIST_RUN_RECORD_TYPE);
}
+ protected List getProgramRuns(ProgramId program, ProgramRunStatus status) throws Exception {
+ String path = String.format("apps/%s/versions/%s/%s/%s/runs?status=%s", program.getApplication(),
+ program.getVersion(), program.getType().getCategoryName(), program.getProgram(),
+ status.toString());
+ HttpResponse response = doGet(getVersionedApiPath(path, program.getNamespace()));
+ assertResponseCode(200, response);
+ return GSON.fromJson(response.getResponseBodyAsString(), LIST_RUN_RECORD_TYPE);
+ }
+
protected int getProgramRunRecord(Id.Program program, String runId) throws Exception {
String path = String.format("apps/%s/%s/%s/runs/%s", program.getApplicationId(),
program.getType().getCategoryName(), program.getId(), runId);
@@ -1438,27 +1452,18 @@ protected void assertProgramRuns(final ProgramId program, final ProgramRunStatus
Tasks.waitFor(true, () -> getProgramRuns(program, status).size() == expected, 15, TimeUnit.SECONDS);
}
- protected List getProgramRuns(ProgramId program, ProgramRunStatus status) throws Exception {
- String path = String.format("apps/%s/versions/%s/%s/%s/runs?status=%s", program.getApplication(),
- program.getVersion(), program.getType().getCategoryName(), program.getProgram(),
- status.toString());
- HttpResponse response = doGet(getVersionedApiPath(path, program.getNamespace()));
- assertResponseCode(200, response);
- return GSON.fromJson(response.getResponseBodyAsString(), LIST_RUN_RECORD_TYPE);
- }
-
protected HttpResponse createNamespace(String id) throws Exception {
return doPut(String.format("%s/namespaces/%s", Constants.Gateway.API_VERSION_3, id), null);
}
- protected HttpResponse deleteNamespace(String name) throws Exception {
- return doDelete(String.format("%s/unrecoverable/namespaces/%s", Constants.Gateway.API_VERSION_3, name));
- }
-
protected HttpResponse createNamespace(String metadata, String id) throws Exception {
return doPut(String.format("%s/namespaces/%s", Constants.Gateway.API_VERSION_3, id), metadata);
}
+ protected HttpResponse deleteNamespace(String name) throws Exception {
+ return doDelete(String.format("%s/unrecoverable/namespaces/%s", Constants.Gateway.API_VERSION_3, name));
+ }
+
protected HttpResponse listAllNamespaces() throws Exception {
return doGet(String.format("%s/namespaces", Constants.Gateway.API_VERSION_3));
}
@@ -1497,12 +1502,26 @@ protected HttpResponse pushApplication(ApplicationReference appRef, String commi
appRef.getNamespace(), appRef.getApplication()), GSON.toJson(request));
}
+ protected HttpResponse pushApplications(String namespace, List apps, String commitMessage)
+ throws Exception {
+ PushMultipleAppsRequest request = new PushMultipleAppsRequest(apps, commitMessage);
+ return doPost(String.format("%s/namespaces/%s/repository/apps/push",
+ Constants.Gateway.API_VERSION_3, namespace), GSON.toJson(request));
+ }
+
protected HttpResponse pullApplication(ApplicationReference appRef) throws Exception {
return doPost(String.format("%s/namespaces/%s/repository/apps/%s/pull",
Constants.Gateway.API_VERSION_3,
appRef.getNamespace(), appRef.getApplication()));
}
+ protected HttpResponse pullApplications(String namespace, List apps)
+ throws Exception {
+ PullMultipleAppsRequest request = new PullMultipleAppsRequest(apps);
+ return doPost(String.format("%s/namespaces/%s/repository/apps/pull",
+ Constants.Gateway.API_VERSION_3, namespace), GSON.toJson(request));
+ }
+
protected HttpResponse listApplicationsFromRepository(String namespace) throws Exception {
return doGet(String.format("%s/namespaces/%s/repository/apps", Constants.Gateway.API_VERSION_3, namespace));
}
@@ -1565,6 +1584,20 @@ protected File buildAppArtifact(Class> cls, String name) throws IOException {
return buildAppArtifact(cls, name, new Manifest());
}
+ private File buildAppArtifact(Class> cls, String name, Manifest manifest) throws IOException {
+ if (!name.endsWith(".jar")) {
+ name += ".jar";
+ }
+ File destination = new File(tmpFolder.newFolder(), name);
+ return buildAppArtifact(cls, manifest, destination);
+ }
+
+ protected File buildAppArtifact(Class> cls, Manifest manifest, File destination) throws IOException {
+ Location appJar = AppJarHelper.createDeploymentJar(locationFactory, cls, manifest);
+ Locations.linkOrCopyOverwrite(appJar, destination);
+ return destination;
+ }
+
/**
* If the configuration has authorization enabled, e.g. with
* {@link io.cdap.cdap.internal.AppFabricTestHelper#enableAuthorization(CConfiguration, TemporaryFolder)}, allows
@@ -1580,20 +1613,6 @@ protected void doAs(String user, Retries.Runnable actio
}
}
- private File buildAppArtifact(Class> cls, String name, Manifest manifest) throws IOException {
- if (!name.endsWith(".jar")) {
- name += ".jar";
- }
- File destination = new File(tmpFolder.newFolder(), name);
- return buildAppArtifact(cls, manifest, destination);
- }
-
- protected File buildAppArtifact(Class> cls, Manifest manifest, File destination) throws IOException {
- Location appJar = AppJarHelper.createDeploymentJar(locationFactory, cls, manifest);
- Locations.linkOrCopyOverwrite(appJar, destination);
- return destination;
- }
-
protected DatasetMeta getDatasetMeta(DatasetId datasetId)
throws UnauthorizedException, UnauthenticatedException, NotFoundException, IOException {
return datasetClient.get(datasetId);
diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java
index 209c86887d8e..42366e6371e7 100644
--- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java
+++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java
@@ -28,14 +28,20 @@
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.id.Id;
+import io.cdap.cdap.common.id.Id.Namespace;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.gateway.handlers.SourceControlManagementHttpHandler;
import io.cdap.cdap.internal.app.services.ApplicationLifecycleService;
import io.cdap.cdap.internal.app.services.SourceControlManagementService;
import io.cdap.cdap.internal.app.services.http.AppFabricTestBase;
+import io.cdap.cdap.internal.operation.OperationLifecycleManager;
import io.cdap.cdap.metadata.MetadataSubscriberService;
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.proto.operation.OperationMeta;
+import io.cdap.cdap.proto.operation.OperationRun;
+import io.cdap.cdap.proto.operation.OperationRunStatus;
+import io.cdap.cdap.proto.operation.OperationType;
import io.cdap.cdap.proto.sourcecontrol.AuthConfig;
import io.cdap.cdap.proto.sourcecontrol.AuthType;
import io.cdap.cdap.proto.sourcecontrol.PatConfig;
@@ -61,6 +67,7 @@
import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.common.http.HttpResponse;
+import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import javax.annotation.Nullable;
@@ -119,17 +126,20 @@ public SourceControlManagementService provideSourceControlManagementService(
AuthenticationContext authenticationContext,
SourceControlOperationRunner sourceControlRunner,
ApplicationLifecycleService applicationLifecycleService,
- Store store) {
+ Store store, OperationLifecycleManager manager) {
+
return Mockito.spy(new SourceControlManagementService(cConf, secureStore, transactionRunner,
accessEnforcer, authenticationContext,
sourceControlRunner, applicationLifecycleService,
- store));
+ store, manager));
}
});
}
private static void setScmFeatureFlag(boolean flag) {
cConf.setBoolean(FEATURE_FLAG_PREFIX + Feature.SOURCE_CONTROL_MANAGEMENT_GIT.getFeatureFlagString(), flag);
+ cConf.setBoolean(FEATURE_FLAG_PREFIX
+ + Feature.SOURCE_CONTROL_MANAGEMENT_MULTI_APP.getFeatureFlagString(), flag);
}
private void assertResponseCode(int expected, HttpResponse response) {
@@ -497,6 +507,124 @@ public void testListAppsNotFound() throws Exception {
assertResponseCode(404, response);
}
+ @Test
+ public void testPushAppsSucceeds() throws Exception {
+ String commitMessage = "push two apps";
+ OperationRun expectedResponse = OperationRun.builder().setRunId("1")
+ .setStatus(OperationRunStatus.STARTING).setType(OperationType.PUSH_APPS).setMetadata(
+ OperationMeta.builder().setCreateTime(Instant.now()).build()
+ ).build();
+
+ Mockito.doReturn(expectedResponse).when(sourceControlService)
+ .pushApps(Mockito.any(), Mockito.any());
+ HttpResponse response = pushApplications(Namespace.DEFAULT.getId(),
+ Arrays.asList("appToPush1", "appToPush2"), commitMessage);
+
+ assertResponseCode(200, response);
+ OperationMeta result = readResponse(response, OperationRun.class);
+ Assert.assertEquals(result, expectedResponse);
+ }
+
+ @Test
+ public void testPushAppsInvalidRequest() throws Exception {
+ // Push empty commit message
+ String commitMessage = "";
+ HttpResponse response = pushApplications(NamespaceId.DEFAULT.getNamespace(),
+ Arrays.asList("appToPush1", "appToPush2"), commitMessage);
+
+ // Assert the response
+ assertResponseCode(400, response);
+ Assert.assertEquals(response.getResponseBodyAsString(),
+ "Please specify commit message in the request body.");
+ }
+
+ @Test
+ public void testPushAppsNotFound() throws Exception {
+ String commitMessage = "push two apps";
+ Mockito.doThrow(new NotFoundException("apps not found")).when(sourceControlService)
+ .pushApps(Mockito.any(), Mockito.any());
+ HttpResponse response = pushApplications(NamespaceId.DEFAULT.getNamespace(),
+ Arrays.asList("appToPush1", "appToPush2"), commitMessage);
+
+ assertResponseCode(404, response);
+ Assert.assertEquals(response.getResponseBodyAsString(), "apps not found");
+ }
+
+ @Test
+ public void testPushAppsSourceControlException() throws Exception {
+ String commitMessage = "push two apps";
+ Mockito.doThrow(new SourceControlException("Failed to push apps.")).when(sourceControlService)
+ .pushApps(Mockito.any(), Mockito.any());
+ HttpResponse response = pushApplications(NamespaceId.DEFAULT.getNamespace(),
+ Arrays.asList("appToPush1", "appToPush2"), commitMessage);
+
+ assertResponseCode(500, response);
+ Assert.assertTrue(response.getResponseBodyAsString().contains("Failed to push apps."));
+ }
+
+ @Test
+ public void testPushAppsInvalidAuthenticationConfig() throws Exception {
+ String commitMessage = "push two apps";
+ Mockito.doThrow(new AuthenticationConfigException("Repository config not valid")).when(sourceControlService)
+ .pushApps(Mockito.any(), Mockito.any());
+ HttpResponse response = pushApplications(NamespaceId.DEFAULT.getNamespace(),
+ Arrays.asList("appToPush1", "appToPush2"), commitMessage);
+
+ assertResponseCode(500, response);
+ Assert.assertTrue(response.getResponseBodyAsString().contains("Repository config not valid"));
+ }
+
+ @Test
+ public void testPullAppsSucceeds() throws Exception {
+ OperationRun expectedResponse = OperationRun.builder().setRunId("2")
+ .setStatus(OperationRunStatus.STARTING).setType(OperationType.PULL_APPS).setMetadata(
+ OperationMeta.builder().setCreateTime(Instant.now()).build()
+ ).build();
+
+ Mockito.doReturn(expectedResponse).when(sourceControlService)
+ .pullApps(Mockito.any(), Mockito.any());
+ HttpResponse response = pullApplications(Namespace.DEFAULT.getId(),
+ Arrays.asList("appToPush1", "appToPush2"));
+
+ assertResponseCode(200, response);
+ OperationMeta result = readResponse(response, OperationRun.class);
+ Assert.assertEquals(result, expectedResponse);
+ }
+
+ @Test
+ public void testPullAppsNotFound() throws Exception {
+ Mockito.doThrow(new NotFoundException("apps not found")).when(sourceControlService)
+ .pullApps(Mockito.any(), Mockito.any());
+ HttpResponse response = pullApplications(NamespaceId.DEFAULT.getNamespace(),
+ Arrays.asList("appToPush1", "appToPush2"));
+
+ // Assert the app is not found
+ assertResponseCode(404, response);
+ Assert.assertEquals(response.getResponseBodyAsString(), "apps not found");
+ }
+
+ @Test
+ public void testPullAppsSourceControlException() throws Exception {
+ Mockito.doThrow(new SourceControlException("Failed to pull apps.")).when(sourceControlService)
+ .pullApps(Mockito.any(), Mockito.any());
+ HttpResponse response = pullApplications(NamespaceId.DEFAULT.getNamespace(),
+ Arrays.asList("appToPush1", "appToPush2"));
+
+ assertResponseCode(500, response);
+ Assert.assertTrue(response.getResponseBodyAsString().contains("Failed to pull apps."));
+ }
+
+ @Test
+ public void testPullAppsInvalidAuthenticationConfig() throws Exception {
+ Mockito.doThrow(new AuthenticationConfigException("Repository config not valid.")).when(sourceControlService)
+ .pullApps(Mockito.any(), Mockito.any());
+ HttpResponse response = pullApplications(NamespaceId.DEFAULT.getNamespace(),
+ Arrays.asList("appToPush1", "appToPush2"));
+
+ assertResponseCode(500, response);
+ Assert.assertTrue(response.getResponseBodyAsString().contains("Repository config not valid."));
+ }
+
private String buildRepoRequestString(Provider provider, String link, String defaultBranch,
AuthConfig authConfig, @Nullable String pathPrefix) {
Map patJsonMap = ImmutableMap.of(
diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java
index 72fc992b6801..6a6efdd61ff8 100644
--- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java
+++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java
@@ -22,6 +22,7 @@
import com.google.inject.Injector;
import com.google.inject.Scopes;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
+import io.cdap.cdap.common.TooManyRequestsException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants.AppFabric;
import io.cdap.cdap.common.guice.ConfigModule;
@@ -52,12 +53,11 @@
import org.mockito.Mockito;
public class OperationLifecycleManagerTest extends OperationTestBase {
+
protected static TransactionRunner transactionRunner;
- private static final String testNamespace = "test";
- private static OperationLifecycleManager operationLifecycleManager;
private static int batchSize;
- @ClassRule
+ @ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
private static EmbeddedPostgres postgres;
@@ -92,9 +92,6 @@ protected void configure() {
});
transactionRunner = injector.getInstance(TransactionRunner.class);
- operationLifecycleManager =
- new OperationLifecycleManager(transactionRunner, Mockito.mock(OperationRuntime.class),
- null);
StoreDefinition.OperationRunsStore.create(injector.getInstance(StructuredTableAdmin.class));
batchSize = cConf.getInt(AppFabric.STREAMING_BATCH_SIZE);
}
@@ -106,6 +103,8 @@ public static void afterClass() {
@Test
public void testScanOperations() throws Exception {
+ OperationLifecycleManager manager =
+ new OperationLifecycleManager(transactionRunner, null, null);
List insertedRuns = insertTestRuns(transactionRunner);
// get a filtered list of testNamespace runs
List testNamespaceRuns =
@@ -113,98 +112,127 @@ public void testScanOperations() throws Exception {
.filter(detail -> detail.getRunId().getNamespace().equals(testNamespace))
.collect(Collectors.toList());
- TransactionRunners.run(
- transactionRunner,
- context -> {
- List gotRuns = new ArrayList<>();
- List expectedRuns;
- ScanOperationRunsRequest request;
-
- // verify the scan without filters picks all runs for testNamespace
- request = ScanOperationRunsRequest.builder().setNamespace(testNamespace).build();
- operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add);
- expectedRuns = testNamespaceRuns;
- Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
-
- // verify limit
- gotRuns.clear();
- request =
- ScanOperationRunsRequest.builder().setNamespace(testNamespace).setLimit(2).build();
- operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add);
- expectedRuns = testNamespaceRuns.stream().limit(2).collect(Collectors.toList());
- Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
-
- // verify the scan with type filter
- gotRuns.clear();
- request =
- ScanOperationRunsRequest.builder()
- .setNamespace(testNamespace)
- .setFilter(new OperationRunFilter(OperationType.PUSH_APPS, null))
- .build();
- operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add);
- expectedRuns =
- testNamespaceRuns.stream()
- .filter(detail -> detail.getRun().getType().equals(OperationType.PUSH_APPS))
- .collect(Collectors.toList());
- Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
-
- // verify the scan with status filter and limit
- gotRuns.clear();
- request =
- ScanOperationRunsRequest.builder()
- .setNamespace(testNamespace)
- .setLimit(2)
- .setFilter(
- new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED))
- .build();
- operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add);
- expectedRuns =
- testNamespaceRuns.stream()
- .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS))
- .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED))
- .limit(2)
- .collect(Collectors.toList());
- Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
-
- // verify the scan with status filter
- gotRuns.clear();
- request =
- ScanOperationRunsRequest.builder()
- .setNamespace(testNamespace)
- .setFilter(
- new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED))
- .build();
- operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add);
- expectedRuns =
- testNamespaceRuns.stream()
- .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS))
- .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED))
- .collect(Collectors.toList());
- Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
- });
+ List gotRuns = new ArrayList<>();
+ List expectedRuns;
+ ScanOperationRunsRequest request;
+
+ // verify the scan without filters picks all runs for testNamespace
+ request = ScanOperationRunsRequest.builder().setNamespace(testNamespace).build();
+ manager.scanOperations(request, batchSize, gotRuns::add);
+ expectedRuns = testNamespaceRuns;
+ Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
+
+ // verify limit
+ gotRuns.clear();
+ request =
+ ScanOperationRunsRequest.builder().setNamespace(testNamespace).setLimit(2).build();
+ manager.scanOperations(request, batchSize, gotRuns::add);
+ expectedRuns = testNamespaceRuns.stream().limit(2).collect(Collectors.toList());
+ Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
+
+ // verify the scan with type filter
+ gotRuns.clear();
+ request =
+ ScanOperationRunsRequest.builder()
+ .setNamespace(testNamespace)
+ .setFilter(new OperationRunFilter(OperationType.PUSH_APPS, null))
+ .build();
+ manager.scanOperations(request, batchSize, gotRuns::add);
+ expectedRuns =
+ testNamespaceRuns.stream()
+ .filter(detail -> detail.getRun().getType().equals(OperationType.PUSH_APPS))
+ .collect(Collectors.toList());
+ Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
+
+ // verify the scan with status filter and limit
+ gotRuns.clear();
+ request =
+ ScanOperationRunsRequest.builder()
+ .setNamespace(testNamespace)
+ .setLimit(2)
+ .setFilter(
+ new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED))
+ .build();
+ manager.scanOperations(request, batchSize, gotRuns::add);
+ expectedRuns =
+ testNamespaceRuns.stream()
+ .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS))
+ .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED))
+ .limit(2)
+ .collect(Collectors.toList());
+ Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
+
+ // verify the scan with status filter
+ gotRuns.clear();
+ request =
+ ScanOperationRunsRequest.builder()
+ .setNamespace(testNamespace)
+ .setFilter(
+ new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED))
+ .build();
+ manager.scanOperations(request, batchSize, gotRuns::add);
+ expectedRuns =
+ testNamespaceRuns.stream()
+ .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS))
+ .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED))
+ .collect(Collectors.toList());
+ Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray());
}
@Test
public void testGetOperation() throws Exception {
+ OperationLifecycleManager manager =
+ new OperationLifecycleManager(transactionRunner, null, null);
OperationRunDetail expectedDetail =
insertRun(
testNamespace, OperationType.PUSH_APPS, OperationRunStatus.RUNNING, transactionRunner);
String testId = expectedDetail.getRun().getId();
OperationRunId runId = new OperationRunId(testNamespace, testId);
- TransactionRunners.run(
- transactionRunner,
- context -> {
- OperationRunDetail gotDetail = operationLifecycleManager.getOperationRun(runId);
- Assert.assertEquals(expectedDetail, gotDetail);
- try {
- operationLifecycleManager.getOperationRun(
- new OperationRunId(Namespace.DEFAULT.getId(), testId));
- Assert.fail("Found unexpected run in default namespace");
- } catch (OperationRunNotFoundException e) {
- // expected
- }
- },
- Exception.class);
+ OperationRunDetail gotDetail = manager.getOperationRun(runId);
+ Assert.assertEquals(expectedDetail, gotDetail);
+ try {
+ manager.getOperationRun(
+ new OperationRunId(Namespace.DEFAULT.getId(), testId));
+ Assert.fail("Found unexpected run in default namespace");
+ } catch (OperationRunNotFoundException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCreatePushOperation() throws Exception {
+ OperationStatePublisher publisher = Mockito.mock(OperationStatePublisher.class);
+ OperationLifecycleManager manager =
+ new OperationLifecycleManager(transactionRunner, null, publisher);
+ // happy path
+ manager.createPushOperation(testNamespace, "1", testPushRequest, "test");
+ Mockito.verify(publisher).publishStarting(new OperationRunId(testNamespace, "1"));
+
+ // test two run at a time fails
+ try {
+ manager.createPushOperation(testNamespace, "2", testPushRequest, "test");
+ Assert.fail("Expected exception");
+ } catch (TooManyRequestsException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCreatePullOperation() throws Exception {
+ OperationStatePublisher publisher = Mockito.mock(OperationStatePublisher.class);
+ OperationLifecycleManager manager =
+ new OperationLifecycleManager(transactionRunner, null, publisher);
+ // happy path
+ manager.createPullOperation(testNamespace, "1", testPullRequest, "test");
+ Mockito.verify(publisher).publishStarting(new OperationRunId(testNamespace, "1"));
+
+ // test two run at a time fails
+ try {
+ manager.createPullOperation(testNamespace, "2", testPullRequest, "test");
+ Assert.fail("Expected exception");
+ } catch (TooManyRequestsException e) {
+ // expected
+ }
}
}
diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java
index 0e0dad18ab74..3fb825da7d07 100644
--- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java
+++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java
@@ -20,6 +20,7 @@
import io.cdap.cdap.common.id.Id.Namespace;
import io.cdap.cdap.internal.AppFabricTestHelper;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
+import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationRun;
@@ -40,7 +41,9 @@ public class OperationTestBase {
private static final AtomicInteger sourceId = new AtomicInteger();
private static final AtomicLong runIdTime = new AtomicLong(System.currentTimeMillis());
protected static final String testNamespace = "test";
- private static final PullAppsRequest input = new PullAppsRequest(Collections.emptySet(), null);
+ protected static final PullAppsRequest testPullRequest = new PullAppsRequest(Collections.emptySet(), null);
+ protected static final PushAppsRequest testPushRequest = new PushAppsRequest(Collections.emptySet(), null, null);
+
protected static OperationRunDetail getRun(OperationRunId runId,
TransactionRunner transactionRunner)
@@ -75,7 +78,7 @@ protected static OperationRunDetail insertRun(
.setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet()))
.setRunId(runId)
.setRun(run)
- .setPullAppsRequest(input)
+ .setPullAppsRequest(testPullRequest)
.build();
TransactionRunners.run(
transactionRunner,
diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PullMultipleAppsRequest.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PullMultipleAppsRequest.java
new file mode 100644
index 000000000000..9db83e09aca5
--- /dev/null
+++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PullMultipleAppsRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.proto.sourcecontrol;
+
+import java.util.List;
+
+/**
+ * The request class to push multiple applications (in the same namespace) to linked git repository.
+ */
+public class PullMultipleAppsRequest {
+ private final List apps;
+
+ public PullMultipleAppsRequest(List apps) {
+ this.apps = apps;
+ }
+
+ public List getApps() {
+ return apps;
+ }
+}
diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PushMultipleAppsRequest.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PushMultipleAppsRequest.java
new file mode 100644
index 000000000000..c81155dda223
--- /dev/null
+++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PushMultipleAppsRequest.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.proto.sourcecontrol;
+
+import java.util.List;
+
+/**
+ * The request class to push multiple applications (in the same namespace) to linked git repository.
+ */
+public class PushMultipleAppsRequest {
+ private final String commitMessage;
+ private final List apps;
+
+ public PushMultipleAppsRequest(List apps, String commitMessage) {
+ this.apps = apps;
+ this.commitMessage = commitMessage;
+ }
+
+ public String getCommitMessage() {
+ return commitMessage;
+ }
+
+ public List getApps() {
+ return apps;
+ }
+}