Skip to content

Commit

Permalink
Merge pull request #27 from dhis2/INTEROP-37
Browse files Browse the repository at this point in the history
feat(INTEROP-37): support scenarios in INTEROP-37 and minor refactori…
  • Loading branch information
cjmamo authored Dec 10, 2024
2 parents c6d61cf + ab49acb commit d83f147
Show file tree
Hide file tree
Showing 18 changed files with 259 additions and 26 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
</licenses>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.openapitools</groupId>
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/org/hisp/hieboot/CamelHieBootApp.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,63 @@
package org.hisp.hieboot;

import org.apache.camel.CamelContext;
import org.hisp.hieboot.camel.security.SelfSignedHttpClientConfigurer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.util.StreamUtils;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;

@SpringBootApplication
public abstract class CamelHieBootApp extends SpringBootServletInitializer {

protected static final Logger LOGGER = LoggerFactory.getLogger(CamelHieBootApp.class);

@Value("${server.ssl.enabled:false}")
protected Boolean serverSslEnabled;

@Value("${server.servlet.context-path:}")
protected String serverServletContextPath;

@Value("${management.endpoints.web.base-path}")
protected String managementEndpointsWebBasePath;

@Value("${server.port}")
protected int serverPort;

@Autowired
protected CamelContext camelContext;

@Bean
public SelfSignedHttpClientConfigurer selfSignedHttpClientConfigurer() {
return new SelfSignedHttpClientConfigurer();
}

@EventListener(ApplicationReadyEvent.class)
public void onApplicationReadyEvent()
throws
IOException {

String baseUrl = String.format("%s://%s:%s%s", serverSslEnabled ? "https" : "http",
InetAddress.getLocalHost().getHostAddress(), serverPort, serverServletContextPath);

StringBuilder onlineBanner = new StringBuilder();
onlineBanner.append("Hawtio console: ").append(baseUrl).append(managementEndpointsWebBasePath)
.append("/hawtio\n");

LOGGER.info(
String.format(StreamUtils.copyToString(
Thread.currentThread().getContextClassLoader().getResourceAsStream("online-banner.txt"),
StandardCharsets.UTF_8),
onlineBanner));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.springframework.context.annotation.PropertySource;

@Configuration(proxyBeanMethods = false)
@PropertySource( "${sql.data-location}" )
@PropertySource( "${sql.message-store}" )
public class CamelHieBootAutoConfiguration {

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public Message getMessage() {

@Override
public String getContext() {
return "";
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ public class JdbcMessageRepository extends ServiceSupport implements MessageRepo
@Autowired
private CamelContext camelContext;

// @Autowired
// private ObjectMapper objectMapper;

private String dataSourceName;

public JdbcMessageRepository(String dataSourceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void process(Exchange exchange) throws Exception {

messageRepository.store(String.format("processing:%s:%s:[%s]", messageId, replayableRouteId, replayEndpointUri), exchange.getMessage());
messageRepository.delete(String.format("replaying:%s:%s:[%s]", messageId, replayableRouteId, replayEndpointUri));
LOGGER.info("Created checkpoint for message [{}] in route [{}]", messageId, replayableRouteId);
LOGGER.info("Created replay checkpoint for message [{}] in route [{}]", messageId, replayableRouteId);
}

public MessageRepository getMessageRepository() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.hisp.hieboot.camel.processor.replay;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.hisp.hieboot.camel.HieExchange;
import org.hisp.hieboot.camel.spi.MessageRepository;
import org.hisp.hieboot.camel.spi.RepositoryMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

public class FailCheckpointProcessor implements Processor {

protected static final Logger LOGGER = LoggerFactory.getLogger(FailCheckpointProcessor.class);

@Autowired
private MessageRepository messageRepository;

@Override
public void process(Exchange exchange) throws Exception {
List<RepositoryMessage> repositoryMessages = messageRepository.retrieve(String.format("processing:%s:%s:*", exchange.getProperty(HieExchange.REPLAY_CHECKPOINT_MESSAGE_ID), exchange.getProperty(HieExchange.REPLAY_CHECKPOINT_ROUTE_ID)));
if (!repositoryMessages.isEmpty()) {
RepositoryMessage repositoryMessage = repositoryMessages.get(0);
messageRepository.store(repositoryMessage.getKey().replace("processing:", "failed:"), repositoryMessage.getMessage(), exchange.getMessage().getHeader("errorMessage", String.class));
messageRepository.delete(repositoryMessage.getKey());
LOGGER.info("Failed replay checkpoint for message [{}] in route [{}]", exchange.getProperty(HieExchange.REPLAY_CHECKPOINT_MESSAGE_ID), exchange.getUnitOfWork().getRoute().getRouteId());
}
}

public MessageRepository getMessageRepository() {
return messageRepository;
}

public void setMessageRepository(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public Route createRoute() {
}
}
definition.addInterceptStrategy((context, namedNode, target, nextTarget) -> {
if (namedNode instanceof KameletDefinition && ((KameletDefinition) namedNode).getName().equals("hie-replay-checkpoint-action")) {
if (namedNode instanceof KameletDefinition && ((KameletDefinition) namedNode).getName().equals("hie-create-replay-checkpoint-action")) {
return exchange -> {
exchange.setProperty(HieExchange.REPLAY_CHECKPOINT_ROUTE_ID, exchange.getUnitOfWork().getRoute().getRouteId());
exchange.setProperty(HieExchange.REPLAY_CHECKPOINT_MESSAGE_ID, exchange.getUnitOfWork().getOriginalInMessage().getMessageId());
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ logging:
name: my-camel-dhis2-app.log

sql:
data-location: classpath:sql.properties
message-store: classpath:/hie-message-store-sql.properties
10 changes: 10 additions & 0 deletions src/main/resources/banner.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

Powered By
____ _ _ _ ___ _____ ____ _
/ ___|__ _ _ __ ___ ___| | | | | |_ _| ____| | __ ) ___ ___ | |_
| | / _` | '_ ` _ \ / _ \ | | |_| || || _| | _ \ / _ \ / _ \| __|
| |__| (_| | | | | | | __/ | | _ || || |___ | |_) | (_) | (_) | |_
\____\__,_|_| |_| |_|\___|_| |_| |_|___|_____| |____/ \___/ \___/ \__|

Camel HIE Boot version: v${project.version}
Apache Camel version: ${camel.version}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
name: hie-replay-checkpoint-action
name: hie-create-replay-checkpoint-action
annotations:
camel.apache.org/kamelet.support.level: Stable
camel.apache.org/provider: HISP Centre
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
name: hie-fail-replay-checkpoint-action
annotations:
camel.apache.org/kamelet.support.level: Stable
camel.apache.org/provider: HISP Centre
camel.apache.org/kamelet.group: HIE Replay
camel.apache.org/kamelet.namespace: HIE
labels:
camel.apache.org/kamelet.type: action
spec:
definition:
title: Fail Replay Checkpoint Action
description: |-
Marks replay checkpoint as failed.
properties:
replayChannelName:
title: Replay Channel Name
type: string
default: replay-{{routeId}}
dataTypes:
in:
headers:
errorMessage:
title: Error Message
type: string
template:
beans:
- name: failCheckpointProcessor
type: "#class:org.hisp.hieboot.camel.processor.replay.FailCheckpointProcessor"
route:
from:
uri: kamelet:source
steps:
- process:
ref: "{{failCheckpointProcessor}}"
- to: kamelet:sink
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ spec:
default: urn:mediator:camel-hie-mediator
template:
from:
uri: timer://heartbeatOpenHimMediator?delay=10000&fixedRate=true&period=5000
uri: timer://heartbeatOpenHimMediator
parameters:
delay: 10000
fixedRate: true
period: 5000
steps:
- setProperty:
name: uptime
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/online-banner.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Camel HIE Boot is up and running...

%s
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
25 changes: 14 additions & 11 deletions src/main/resources/routes/global.camel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
- process:
ref: "#class:org.hisp.hieboot.camel.processor.replay.HieBootOnSuccessProcessor"

- from:
uri: "timer://replay"
steps:
- process:
ref: "#replayProcessor"
- split:
simple: "${body}"
steps:
- process:
ref: "#unpackRepoMessageProcessor"
- toD: "${exchangeProperty.replayEndpointUri}"
- route:
id: Schedule Replay Route
from:
uri: "timer://replay"
steps:
- process:
ref: "#replayProcessor"
- split:
simple: "${body}"
steps:
- process:
ref: "#unpackRepoMessageProcessor"
- log: "Scheduling replay of message ${headers.CamelHieReplayCheckpointMessageId}..."
- toD: "${exchangeProperty.replayEndpointUri}"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.hisp.hieboot.camel.kamelet;
package org.hisp.hieboot.camel.kamelet.replay;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
Expand Down Expand Up @@ -29,7 +29,7 @@
@CamelSpringBootTest
@UseAdviceWith
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class HieReplayCheckpointActionKameletTestCase {
public class HieCreateReplayCheckpointActionKameletTestCase {

@Autowired
private ProducerTemplate producerTemplate;
Expand All @@ -55,7 +55,7 @@ public void testOnSuccess() throws Exception {
public void configure() {
from("direct:routeUnderTest")
.routeId("routeUnderTest")
.kamelet("hie-replay-checkpoint-action")
.kamelet("hie-create-replay-checkpoint-action")
.to("mock:verify");
}
});
Expand All @@ -79,7 +79,7 @@ public void testOnFailure() throws Exception {
public void configure() {
from("direct:routeUnderTest")
.routeId("routeUnderTest")
.kamelet("hie-replay-checkpoint-action")
.kamelet("hie-create-replay-checkpoint-action")
.to("mock:verify")
.throwException(new Exception());
}
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testReplaying() throws Exception {
public void configure() {
from("direct:routeUnderTest")
.routeId("routeUnderTest")
.kamelet("hie-replay-checkpoint-action?replayChannelName=routeUnderTest")
.kamelet("hie-create-replay-checkpoint-action?replayChannelName=routeUnderTest")
.to("mock:verify");
}
});
Expand Down
Loading

0 comments on commit d83f147

Please sign in to comment.