Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #45] Implement methods from storage-plugin.admin(rocketmq) #55

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51be376
add rocketmqUtils
scwlkq Mar 9, 2024
b891623
add RocketmqUtils.java
scwlkq Mar 9, 2024
5ee7786
add RocketmqUtils.java
scwlkq Mar 9, 2024
17892e3
change rocketmqUtils impl from MQAdmin to RemotingClient
scwlkq Mar 9, 2024
cfcc41f
fix style
scwlkq Mar 9, 2024
5f76e29
fix style
scwlkq Mar 9, 2024
3a3415c
fix style
scwlkq Mar 9, 2024
dc2634c
fix style
scwlkq Mar 9, 2024
6d4a283
fix style
scwlkq Mar 9, 2024
6561d69
fix test style
scwlkq Mar 9, 2024
4b8fb1d
[ISSUE #29] Set up EventMesh Dashboard Front-end (#56)
SLSJL Mar 9, 2024
449b352
modify rocket dependency version
scwlkq Mar 10, 2024
7ebde4b
move classparse to RocketmqUtils.getTopics
scwlkq Mar 10, 2024
d523e26
fix style
scwlkq Mar 10, 2024
eaf4a3a
Revert "[ISSUE #29] Set up EventMesh Dashboard Front-end (#56)"
scwlkq Mar 13, 2024
e2f4c09
refactor TopicProperties
scwlkq Mar 13, 2024
16e7a53
synchronous
scwlkq Mar 14, 2024
50d40b5
try to sync rocketmq related code
scwlkq Mar 14, 2024
31c0117
move classparser to rocketmqUtil
scwlkq Mar 14, 2024
e2b6c48
move RocketmqProperties to properties
scwlkq Mar 14, 2024
d898abc
[ISSUE #57] Modify the field, synchronize the modification, and add t…
zzxxiansheng Mar 12, 2024
717ff8b
[ISSUE #60] add SDK manager (#62)
Lambert-Rao Mar 15, 2024
fa1c716
[ISSUE #64] Support automated deployment and Fix runtime packaging er…
Pil0tXia Mar 15, 2024
b33d5b0
move clientManager related to core module && add RocketmqService
scwlkq Mar 17, 2024
6e4ce01
revert application-dev.yml
scwlkq Mar 17, 2024
07f71d5
remove @Service
scwlkq Mar 17, 2024
8ff1297
sync
scwlkq Mar 17, 2024
418c6b7
move clientManager to core module
scwlkq Mar 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions eventmesh-dashboard-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,38 @@
<artifactId>fastjson2</artifactId>
<version>2.0.40</version>
</dependency>
<!-- Event Store -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- storage redis client -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<!-- Unit Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- Meta - nacos client -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,28 @@
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.console.function.SDK.config;
package org.apache.eventmesh.dashboard.common.model;

/**
* Config to create an SDK client, usually contains an address url.
*/
public interface CreateSDKConfig {
import lombok.Data;

@Data
public class RocketmqProperties {

private String namesrvAddr;

private String clusterName;

private String brokerUrl;

private String endPoint;

private int writeQueueNums;

private int readQueueNums;

private String accessKey;

private String secretKey;

String getUniqueKey();
private Long requestTimeoutMillis = 10000L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,37 @@

package org.apache.eventmesh.dashboard.common.model;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.rocketmq.common.TopicFilterType;

import lombok.Data;

/**
* One record displayed in 'Topic' page.
*/

@Data
public class TopicProperties {

public String name;
public long messageCount;

@JsonCreator
public TopicProperties(
@JsonProperty("name") String name,
@JsonProperty("messageCount") long messageCount) {
super();
this.name = name;
this.messageCount = messageCount;
}
private static final String SEPARATOR = " ";

public static int defaultReadQueueNums = 16;

public static int defaultWriteQueueNums = 16;

private String topicName;

private int offset;

private int readQueueNums;

private int writeQueueNums;

private int perm;

private TopicFilterType topicFilterType;

private int topicSysFlag;

private boolean order;
Comment on lines +45 to +51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add these fields in a dto?


}
39 changes: 0 additions & 39 deletions eventmesh-dashboard-console/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@
<scope>runtime</scope>
</dependency>

<!-- Meta - nacos client -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>

<!-- health check client -->
<!-- EventMesh SDK -->
<!-- <dependency>-->
Expand All @@ -93,38 +86,6 @@
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- storage redis client -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
<!-- rocketmq client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>5.2.0</version>
</dependency>
<!-- health check client end -->

<!-- Unit Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- TODO: remove junit4 dependency -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ public ResponseEntity<Object> preflight() {
}

@CrossOrigin
@GetMapping
@GetMapping("/list")
public Result<List<TopicProperties>> getList() {
List<TopicProperties> topicList = topicCore.getTopic();
List<TopicProperties> topicList = topicCore.getTopics();
return Result.success(topicList);
}

@CrossOrigin
@PostMapping
@PostMapping("/create")
public Result<Object> create(@RequestBody CreateTopicRequest createTopicRequest) {
String topicName = createTopicRequest.getName();
topicCore.createTopic(topicName);
return Result.success();
}

@CrossOrigin
@DeleteMapping
@DeleteMapping("/delete")
public Result<Object> delete(@RequestBody DeleteTopicRequest deleteTopicRequest) {
String topicName = deleteTopicRequest.getName();
topicCore.deleteTopic(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public class ConnectionEntity extends BaseEntity {
private String sinkType;

/**
* The id of the sink.<br>
* It can be connectorId or clientId according to the sinkType.
* The id of the sink.<br> It can be connectorId or clientId according to the sinkType.
*/
@Schema(name = "sinkId", description = "connectorId or clientId")
private Long sinkId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.eventmesh.dashboard.console.function.health.check.config;


import org.apache.eventmesh.dashboard.common.model.RocketmqProperties;

import java.util.Properties;

import lombok.Data;


@Data
public class HealthCheckObjectConfig {

Expand Down Expand Up @@ -56,13 +60,6 @@ public class HealthCheckObjectConfig {

private Long requestTimeoutMillis = 100000L;

private RocketmqConfig rocketmqConfig = new RocketmqConfig();

@Data
public class RocketmqConfig {
private RocketmqProperties rocketmqProperties = new RocketmqProperties();

private String nameServerUrl;
private String brokerUrl;
private String endPoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Rocketmq4BrokerCheck(HealthCheckObjectConfig healthCheckObjectConfig) {
public void doCheck(HealthCheckCallback callback) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
remotingClient.invokeAsync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis(),
remotingClient.invokeAsync(getConfig().getRocketmqProperties().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis(),
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
Expand All @@ -71,7 +71,7 @@ public void operationComplete(ResponseFuture responseFuture) {

@Override
public void init() {
if (getConfig().getRocketmqConfig().getBrokerUrl() == null || getConfig().getRocketmqConfig().getBrokerUrl().isEmpty()) {
if (getConfig().getRocketmqProperties().getBrokerUrl() == null || getConfig().getRocketmqProperties().getBrokerUrl().isEmpty()) {
throw new IllegalArgumentException("RocketmqCheck failed. BrokerUrl is null.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Rocketmq4NameServerCheck(HealthCheckObjectConfig healthCheckObjectConfig)
public void doCheck(HealthCheckCallback callback) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_NAMESRV_CONFIG, null);
remotingClient.invokeAsync(getConfig().getRocketmqConfig().getNameServerUrl(), request, getConfig().getRequestTimeoutMillis(),
remotingClient.invokeAsync(getConfig().getRocketmqProperties().getNamesrvAddr(), request, getConfig().getRequestTimeoutMillis(),
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
Expand All @@ -67,7 +67,7 @@ public void operationComplete(ResponseFuture responseFuture) {

@Override
public void init() {
if (getConfig().getRocketmqConfig().getNameServerUrl() == null || getConfig().getRocketmqConfig().getNameServerUrl().isEmpty()) {
if (getConfig().getRocketmqProperties().getNamesrvAddr() == null || getConfig().getRocketmqProperties().getNamesrvAddr().isEmpty()) {
throw new RuntimeException("RocketmqNameServerCheck init failed, nameServerUrl is empty");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback;
import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService;
import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig;
import org.apache.eventmesh.dashboard.service.mq.RocketmqTopicService;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
Expand All @@ -37,14 +38,11 @@
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

import java.util.Arrays;
import java.util.List;
Expand All @@ -54,12 +52,18 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.annotation.Resource;


import lombok.extern.slf4j.Slf4j;

@Slf4j
@HealthCheckType(type = HealthCheckTypeConstant.HEALTH_CHECK_TYPE_STORAGE, subType = HealthCheckTypeConstant.HEALTH_CHECK_SUBTYPE_ROCKETMQ_TOPIC)
public class Rocketmq4TopicCheck extends AbstractHealthCheckService {

@Resource
private RocketmqTopicService rocketmqTopicService;

private RemotingClient remotingClient;

private DefaultMQPushConsumer consumer;
Expand Down Expand Up @@ -138,30 +142,19 @@ public void init() {

//TODO there are many functions that can be reused, they should be collected in a util module
//this function that create topics can be reused
try {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(HealthConstant.ROCKETMQ_CHECK_TOPIC);
requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name());
requestHeader.setReadQueueNums(8);
requestHeader.setWriteQueueNums(8);
requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
Object result = remotingClient.invokeSync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis());
log.info(result.toString());
} catch (Exception e) {
log.error("RocketmqTopicCheck init failed when examining topic stats.", e);
return;
}
rocketmqTopicService.createTopic(HealthConstant.ROCKETMQ_CHECK_TOPIC, TopicFilterType.SINGLE_TAG.name(),
PermName.PERM_READ | PermName.PERM_WRITE,
getConfig().getRocketmqProperties().getBrokerUrl(), 8, 8, getConfig().getRequestTimeoutMillis());

try {
producer = new DefaultMQProducer(HealthConstant.ROCKETMQ_CHECK_PRODUCER_GROUP);
producer.setNamesrvAddr(getConfig().getRocketmqConfig().getNameServerUrl());
producer.setNamesrvAddr(getConfig().getRocketmqProperties().getNamesrvAddr());
producer.setCompressMsgBodyOverHowmuch(16);
producer.start();

consumer = new DefaultMQPushConsumer(HealthConstant.ROCKETMQ_CHECK_CONSUMER_GROUP);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setNamesrvAddr(getConfig().getRocketmqConfig().getNameServerUrl());
consumer.setNamesrvAddr(getConfig().getRocketmqProperties().getNamesrvAddr());
consumer.subscribe(HealthConstant.ROCKETMQ_CHECK_TOPIC, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RocketmqBrokerCheckTest {
@BeforeEach
public void init() {
HealthCheckObjectConfig config = new HealthCheckObjectConfig();
config.getRocketmqConfig().setBrokerUrl("127.0.0.1:10911");
config.getRocketmqProperties().setBrokerUrl("127.0.0.1:10911");
config.setRequestTimeoutMillis(1000L);
rocketmqCheck = new Rocketmq4BrokerCheck(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RocketmqNameserverCheckTest {
@BeforeEach
public void init() {
HealthCheckObjectConfig config = new HealthCheckObjectConfig();
config.getRocketmqConfig().setNameServerUrl("127.0.0.1:9876");
config.getRocketmqProperties().setNamesrvAddr("127.0.0.1:9876");
config.setRequestTimeoutMillis(1000L);
rocketmqCheck = new Rocketmq4NameServerCheck(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class RocketmqTopicCheckTest {
@BeforeEach
public void init() throws InterruptedException {
HealthCheckObjectConfig config = new HealthCheckObjectConfig();
config.getRocketmqConfig().setBrokerUrl("127.0.0.1:10911");
config.getRocketmqConfig().setNameServerUrl("127.0.0.1:9876");
config.getRocketmqProperties().setBrokerUrl("127.0.0.1:10911");
config.getRocketmqProperties().setNamesrvAddr("127.0.0.1:9876");
config.setRequestTimeoutMillis(1000000L);
rocketmqCheck = new Rocketmq4TopicCheck(config);
}
Expand Down
Loading