Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate with Options WF - Support finer replication granularity #2740

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com)

### Fixed

- #2740 - Add additional options to Replicate With Options workflow process.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add to line 13ff, 5.0.4 has been released already.

- #2704 - Fixed issue with MCP report generation throwing an exception, and fixed some minor UI issues on AEM SDK (added BG color)
- #2716 - Fixed issue with Shared Component Properties Bindings Values Provider facing lock contention
- #2718 - Fixes CM Code Quality Pipeline failure caused by TestMarketoInterfaces and Jacoco instrumentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.adobe.acs.commons.util.visitors.ContentVisitor;
import com.adobe.acs.commons.util.visitors.ResourceRunnable;
import com.adobe.acs.commons.workflow.WorkflowPackageManager;
import org.apache.jackrabbit.JcrConstants;
import com.day.cq.replication.ReplicationActionType;
import com.day.cq.replication.ReplicationOptions;
import com.day.cq.replication.Replicator;
Expand All @@ -36,14 +37,15 @@
import com.day.cq.workflow.exec.WorkItem;
import com.day.cq.workflow.exec.WorkflowProcess;
import com.day.cq.workflow.metadata.MetaDataMap;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,6 +82,16 @@ public class ReplicateWithOptionsWorkflowProcess implements WorkflowProcess {

private static final String BRAND_PORTAL_AGENTS = "BRAND_PORTAL_AGENTS";

// comma delimited list of agent names to use for dispatcher flush, need to have these agents enabled before running workflow
private static final String ARG_AGENTS_DISPATCHER = "agentsDispatcher";
// specify to flush dispatcher for the payload
private static final String ARG_FLUSH_DISPATCHER = "flushDispatcher";
// flush parent resource type, find ancestor/parent with this resource type and use it for dispatcher flush.
private static final String ARG_FLUSH_PARENT_RESOURCE_TYPE =
"flushParentResourceType";
// specify which sub child item to replicate
private static final String ARG_REPLICATE_CHILD_PATH = "replicateChildPath";

@Reference
private WorkflowPackageManager workflowPackageManager;

Expand Down Expand Up @@ -109,19 +121,85 @@ public void execute(WorkItem workItem, WorkflowSession workflowSession, MetaData
final ResourceRunnable replicatorRunnable = new ResourceRunnable() {
@Override
public void run(final Resource resource) throws Exception {
// if we are throttling
if (processArgs.isThrottle()) {
throttledTaskRunner.waitForLowCpuAndLowMemory();
}
replicator.replicate(resource.getResourceResolver().adaptTo(Session.class),

// get the path for child to replicate, empty means replicate current payload path
String replicateChildPath = processArgs.getReplicateChildPath();
// use current payload resource as default for replication
Resource replicateResource = resource;
// allow skipping replicaiton of current payload
Boolean skip = false;

// do we need to replicate child path
if (processArgs.isReplicateChild()) {
if (!replicateChildPath.startsWith("/")) {
replicateChildPath = "/" + replicateChildPath;
}

// get the child path
Resource childResource = resource.getResourceResolver().getResource(resource.getPath() + replicateChildPath);

// if child path does not exist, log a warning and skip processing payload
if (childResource != null && !ResourceUtil.isNonExistingResource(childResource)) {
// when child resource exist set it as the replication resource, this is what need replication.
replicateResource = childResource;
} else {
log.info("Could not find child path [{}] in page {}, skipping.", replicateChildPath, replicateResource.getPath());
skip = true;
}
}

// if not skipping do main processing
if (!skip) {
// replicate payload resource
replicator.replicate(
resource.getResourceResolver().adaptTo(Session.class),
processArgs.getReplicationActionType(),
resource.getPath(),
processArgs.getReplicationOptions(resource));
replicateResource.getPath(),
processArgs.getReplicationOptions(replicateResource)
);

// do we need to flush dispatcher
if (processArgs.isFlushDispatcher()) {
// get parent resource type
String flushParentResourceType = processArgs.getFlushParentResourceType();
// get resource path for the current replication resource
String flushPath = StringUtils.defaultString(replicateResource.getPath(), "");

// if parent resource type has been set
if (StringUtils.isNoneEmpty(flushParentResourceType)) {
// search for the parent
Resource parentResource = processArgs.getContainingResource(replicateResource.getResourceResolver(), replicateResource, flushParentResourceType);
// if we found the parent resource
if (parentResource != null && !ResourceUtil.isNonExistingResource(parentResource)) {
// update flush path to match parent
flushPath = parentResource.getPath();
}
}

// if we have a valid flush path
if (StringUtils.isNoneEmpty(flushPath)) {
// send the replicate event to the flush agent
replicator.replicate(
replicateResource.getResourceResolver().adaptTo(Session.class),
processArgs.getReplicationActionType(),
flushPath,
processArgs.getReplicationOptionsDispatcher(replicateResource)
);
}
}
}
// tick the counter
count.incrementAndGet();
}
};

final ContentVisitor visitor = new ContentVisitor(replicatorRunnable);

// for all payload paths send them to content visitor
for (final String payload : payloads) {
final Resource resource = resourceResolver.getResource(payload);

Expand Down Expand Up @@ -150,6 +228,15 @@ protected static class ProcessArgs {
private boolean throttle;
private List<String> agents;

// specify which sub child item to replicate
private String replicateChildPath;
// specify to flush dispatcher for the path
private boolean flushDispatcher;
//flush parent resource type, find ancestor/parent with this resource type and use it for dispatcher flush.
private String flushParentResourceType;
// comma delimited list of agent names to use for dispatcher flush, need to have these agents enabled before running workflow
private List<String> agentsDispatcher;

public ProcessArgs(MetaDataMap map) throws WorkflowException {
final String[] lines = StringUtils.split(map.get(WorkflowHelper.PROCESS_ARGS, ""), System.lineSeparator());
final Map<String, String> data = ParameterUtil.toMap(lines, "=");
Expand All @@ -164,6 +251,11 @@ public ProcessArgs(MetaDataMap map) throws WorkflowException {
replicationOptions.setSuppressVersions(Boolean.parseBoolean(data.get(ARG_REPLICATION_SUPPRESS_VERSIONS)));

agents = Arrays.asList(StringUtils.split(data.get(ARG_AGENTS), ","));

replicateChildPath = StringUtils.defaultIfEmpty(data.get(ARG_REPLICATE_CHILD_PATH), "");
flushDispatcher = Boolean.parseBoolean(data.get(ARG_FLUSH_DISPATCHER));
flushParentResourceType = StringUtils.defaultIfEmpty(data.get(ARG_FLUSH_PARENT_RESOURCE_TYPE),"");
agentsDispatcher = Arrays.asList(StringUtils.split(StringUtils.defaultString(data.get(ARG_AGENTS_DISPATCHER), ""),","));
}

public ReplicationActionType getReplicationActionType() {
Expand All @@ -188,5 +280,57 @@ public boolean isThrottle() {
return throttle;
}

public ReplicationOptions getReplicationOptionsDispatcher(Resource content) {
ReplicationOptions replicationOptionsDispatcher = new ReplicationOptions();
replicationOptionsDispatcher.setSynchronous(
replicationOptionsDispatcher.isSynchronous()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this line here? Is source and target really both the new object replicationOptionsDispatcher?

);
replicationOptionsDispatcher.setSuppressVersions(
replicationOptionsDispatcher.isSuppressVersions()
);
replicationOptionsDispatcher.setFilter(
new AgentIdsAgentFilter(agentsDispatcher)
);
return replicationOptionsDispatcher;
}

public boolean isFlushDispatcher() {
return flushDispatcher;
}

public String getFlushParentResourceType() {
return flushParentResourceType;
}

public Resource getContainingResource(ResourceResolver resourceResolver,Resource child,String parentResourceType) {
Resource parent = null;
if (child == null || StringUtils.isEmpty(parentResourceType)) {
return parent;
}

String path = child.getPath();
int idx = path.lastIndexOf("/" + JcrConstants.JCR_CONTENT);
if (idx >= 0) {
path = path.substring(0, idx);
}

//walk the tree up until we find the parent resource type.
parent = resourceResolver.getResource(path);
while (parent != null && !ResourceUtil.isNonExistingResource(parent)) {
if (parent.isResourceType(parentResourceType)) {
return parent;
}
parent = parent.getParent();
}
return parent;
}

public boolean isReplicateChild() {
return StringUtils.isNotEmpty(replicateChildPath);
}

public String getReplicateChildPath() {
return replicateChildPath;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void setup() throws Exception {
@Test
public void testExecuteDeactivateWithoutTraversal() throws Exception {
when(workflowPackageManager.getPaths(context.resourceResolver(), "/content/payload"))
.thenReturn(Arrays.asList("/content/page", "/content/asset"));
.thenReturn(Arrays.asList("/content/page", "/content/page/child3", "/content/asset"));

StringBuilder args = new StringBuilder();
args.append("replicationActionType=Deactivate");
Expand All @@ -118,11 +118,43 @@ public void testExecuteDeactivateWithoutTraversal() throws Exception {
ReplicationOptionsMatcher optionsMatcher = new ReplicationOptionsMatcher().withAgentIdFilter("agent1");

verify(replicator).replicate(any(), eq(ReplicationActionType.DEACTIVATE), eq("/content/page"), argThat(optionsMatcher));
verify(replicator).replicate(any(), eq(ReplicationActionType.DEACTIVATE), eq("/content/page/child3"), argThat(optionsMatcher));
verify(replicator).replicate(any(), eq(ReplicationActionType.DEACTIVATE), eq("/content/asset"), argThat(optionsMatcher));
verifyNoMoreInteractions(replicator);
verifyNoInteractions(throttledTaskRunner);
}

@Test
public void itExecuteActivateWithoutTraversalWithDispatcherFlush()
throws Exception {
when(workflowPackageManager.getPaths(context.resourceResolver(),"/content/payload"))
.thenReturn(Arrays.asList("/content/page/child3/jcr:content"));

StringBuilder args = new StringBuilder();
args.append("replicationActionType=Activate");
args.append(System.lineSeparator());
args.append("agents=agent1");
args.append(System.lineSeparator());
args.append("flushDispatcher=true");
args.append(System.lineSeparator());
args.append("agentsDispatcher=flush");
args.append(System.lineSeparator());
args.append("flushParentResourceType=cq:Page");
args.append(System.lineSeparator());
args.append("replicateChildPath=seo");

when(metaDataMap.get(WorkflowHelper.PROCESS_ARGS, "")).thenReturn(args.toString());
process.execute(workItem, workflowSession, metaDataMap);

ReplicationOptionsMatcher optionsMatcher = new ReplicationOptionsMatcher().withAgentIdFilter("agent1");
ReplicationOptionsMatcher optionsMatcherDispatcher = new ReplicationOptionsMatcher().withAgentIdFilter("flush");

verify(replicator).replicate(any(),eq(ReplicationActionType.ACTIVATE),eq("/content/page/child3/jcr:content/seo"),argThat(optionsMatcher));
verify(replicator).replicate(any(),eq(ReplicationActionType.ACTIVATE),eq("/content/page/child3"),argThat(optionsMatcherDispatcher));
verifyNoMoreInteractions(replicator);
verifyNoInteractions(throttledTaskRunner);
}

@Test
public void testExecuteDeactivateWithThrottling() throws Exception {
when(workflowPackageManager.getPaths(context.resourceResolver(), "/content/payload"))
Expand Down Expand Up @@ -166,6 +198,7 @@ public void testExecuteDeactivateWithTraversal() throws Exception {
verify(replicator).replicate(any(), eq(ReplicationActionType.DEACTIVATE), eq("/content/page"), argThat(optionsMatcher));
verify(replicator).replicate(any(), eq(ReplicationActionType.DEACTIVATE), eq("/content/page/child1"), argThat(optionsMatcher));
verify(replicator).replicate(any(), eq(ReplicationActionType.DEACTIVATE), eq("/content/page/child2"), argThat(optionsMatcher));
verify(replicator).replicate(any(), eq(ReplicationActionType.DEACTIVATE), eq("/content/page/child3"), argThat(optionsMatcher));
verify(replicator).replicate(any(), eq(ReplicationActionType.DEACTIVATE), eq("/content/asset"), argThat(optionsMatcher));
verifyNoMoreInteractions(replicator);
verifyNoInteractions(throttledTaskRunner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
"jcr:content": {
"jcr:primaryType": "nt:unstructured"
}
},
"child3" : {
"jcr:primaryType": "cq:Page",
"jcr:content": {
"jcr:primaryType": "nt:unstructured",
"seo": {
"jcr:primaryType": "nt:unstructured",
"canonicalUrlOverride": "/content/xero/gb/en2",
"noindex": "true",
"priority": "1"
}
}
}
},
"asset" : {
Expand Down