Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #45] Implement methods from storage-plugin.admin(rocketmq) #55

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51be376
add rocketmqUtils
scwlkq Mar 9, 2024
b891623
add RocketmqUtils.java
scwlkq Mar 9, 2024
5ee7786
add RocketmqUtils.java
scwlkq Mar 9, 2024
17892e3
change rocketmqUtils impl from MQAdmin to RemotingClient
scwlkq Mar 9, 2024
cfcc41f
fix style
scwlkq Mar 9, 2024
5f76e29
fix style
scwlkq Mar 9, 2024
3a3415c
fix style
scwlkq Mar 9, 2024
dc2634c
fix style
scwlkq Mar 9, 2024
6d4a283
fix style
scwlkq Mar 9, 2024
6561d69
fix test style
scwlkq Mar 9, 2024
4b8fb1d
[ISSUE #29] Set up EventMesh Dashboard Front-end (#56)
SLSJL Mar 9, 2024
449b352
modify rocket dependency version
scwlkq Mar 10, 2024
7ebde4b
move classparse to RocketmqUtils.getTopics
scwlkq Mar 10, 2024
d523e26
fix style
scwlkq Mar 10, 2024
eaf4a3a
Revert "[ISSUE #29] Set up EventMesh Dashboard Front-end (#56)"
scwlkq Mar 13, 2024
e2f4c09
refactor TopicProperties
scwlkq Mar 13, 2024
16e7a53
synchronous
scwlkq Mar 14, 2024
50d40b5
try to sync rocketmq related code
scwlkq Mar 14, 2024
31c0117
move classparser to rocketmqUtil
scwlkq Mar 14, 2024
e2b6c48
move RocketmqProperties to properties
scwlkq Mar 14, 2024
d898abc
[ISSUE #57] Modify the field, synchronize the modification, and add t…
zzxxiansheng Mar 12, 2024
717ff8b
[ISSUE #60] add SDK manager (#62)
Lambert-Rao Mar 15, 2024
fa1c716
[ISSUE #64] Support automated deployment and Fix runtime packaging er…
Pil0tXia Mar 15, 2024
b33d5b0
move clientManager related to core module && add RocketmqService
scwlkq Mar 17, 2024
6e4ce01
revert application-dev.yml
scwlkq Mar 17, 2024
07f71d5
remove @Service
scwlkq Mar 17, 2024
8ff1297
sync
scwlkq Mar 17, 2024
418c6b7
move clientManager to core module
scwlkq Mar 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions eventmesh-dashboard-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@
<artifactId>fastjson2</artifactId>
<version>2.0.40</version>
</dependency>
<!-- Event Store -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.common.model;

import lombok.Data;

@Data
public class RocketmqProperties {

private String namesrvAddr;

private String clusterName;

private String brokerUrl;

private String endPoint;

private int writeQueueNums;

private int readQueueNums;

private String accessKey;

private String secretKey;

private Long requestTimeoutMillis = 10000L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,37 @@

package org.apache.eventmesh.dashboard.common.model;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.rocketmq.common.TopicFilterType;

import lombok.Data;

/**
* One record displayed in 'Topic' page.
*/

@Data
public class TopicProperties {

public String name;
public long messageCount;

@JsonCreator
public TopicProperties(
@JsonProperty("name") String name,
@JsonProperty("messageCount") long messageCount) {
super();
this.name = name;
this.messageCount = messageCount;
}
private static final String SEPARATOR = " ";

public static int defaultReadQueueNums = 16;

public static int defaultWriteQueueNums = 16;

private String topicName;

private int offset;

private int readQueueNums;

private int writeQueueNums;

private int perm;

private TopicFilterType topicFilterType;

private int topicSysFlag;

private boolean order;
Comment on lines +45 to +51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add these fields in a dto?


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.common.util;

import org.apache.eventmesh.dashboard.common.model.TopicProperties;

import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

import org.springframework.beans.BeanUtils;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@UtilityClass
public class RocketmqUtils {

private final RemotingClient remotingClient;

static {
NettyClientConfig config = new NettyClientConfig();
config.setUseTLS(false);
remotingClient = new NettyRemotingClient(config);
remotingClient.start();
}


public void createTopic(String topicName, String topicFilterTypeName, int perm, String nameServerAddr,
int readQueueNums, int writeQueueNums, long requestTimeoutMillis) {
try {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicName);
requestHeader.setTopicFilterType(topicFilterTypeName);
requestHeader.setReadQueueNums(readQueueNums);
requestHeader.setWriteQueueNums(writeQueueNums);
requestHeader.setPerm(perm);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
Object result = remotingClient.invokeSync(nameServerAddr, request, requestTimeoutMillis);
log.info("rocketmq create topic result:" + result.toString());
} catch (Exception e) {
log.error("RocketmqTopicCheck init failed when examining topic stats.", e);
}
}

public List<TopicProperties> getTopics(String nameServerAddr, long requestTimeoutMillis) {
List<TopicConfig> topicConfigList = new ArrayList<>();
try {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, (CommandCustomHeader) null);
RemotingCommand response = remotingClient.invokeSync(nameServerAddr, request, requestTimeoutMillis);
TopicConfigSerializeWrapper allTopicConfig = TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
ConcurrentMap<String, TopicConfig> topicConfigTable = allTopicConfig.getTopicConfigTable();
topicConfigList = new ArrayList<>(topicConfigTable.values());
} catch (Exception e) {
log.error("RocketmqTopicCheck init failed when examining topic stats.", e);
}

return topicConfig2TopicProperties(topicConfigList);
}

public void deleteTopic(String topicName, String nameServerAddr, long requestTimeoutMillis) {
try {
DeleteTopicRequestHeader deleteTopicRequestHeader = new DeleteTopicRequestHeader();
deleteTopicRequestHeader.setTopic(topicName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, null);
Object result = remotingClient.invokeSync(nameServerAddr, request, requestTimeoutMillis);

log.info("rocketmq delete topic result:" + result.toString());
} catch (Exception e) {
log.error("RocketmqTopicCheck init failed when examining topic stats.", e);
}
}

private List<TopicProperties> topicConfig2TopicProperties(List<TopicConfig> topicConfigList) {
ArrayList<TopicProperties> topicPropertiesList = new ArrayList<>();
for (TopicConfig topicConfig : topicConfigList) {
TopicProperties topicProperties = new TopicProperties();
BeanUtils.copyProperties(topicConfig, topicProperties);
topicPropertiesList.add(topicProperties);
}
return topicPropertiesList;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ public ResponseEntity<Object> preflight() {
}

@CrossOrigin
@GetMapping
@GetMapping("/list")
public Result<List<TopicProperties>> getList() {
List<TopicProperties> topicList = topicCore.getTopic();
List<TopicProperties> topicList = topicCore.getTopics();
return Result.success(topicList);
}

@CrossOrigin
@PostMapping
@PostMapping("/create")
public Result<Object> create(@RequestBody CreateTopicRequest createTopicRequest) {
String topicName = createTopicRequest.getName();
topicCore.createTopic(topicName);
return Result.success();
}

@CrossOrigin
@DeleteMapping
@DeleteMapping("/delete")
public Result<Object> delete(@RequestBody DeleteTopicRequest deleteTopicRequest) {
String topicName = deleteTopicRequest.getName();
topicCore.deleteTopic(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public class ConnectionEntity extends BaseEntity {
private String sinkType;

/**
* The id of the sink.<br>
* It can be connectorId or clientId according to the sinkType.
* The id of the sink.<br> It can be connectorId or clientId according to the sinkType.
*/
@Schema(name = "sinkId", description = "connectorId or clientId")
private Long sinkId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.eventmesh.dashboard.console.function.health.check.config;


import org.apache.eventmesh.dashboard.common.model.RocketmqProperties;

import java.util.Properties;

import lombok.Data;


@Data
public class HealthCheckObjectConfig {

Expand Down Expand Up @@ -56,13 +60,6 @@ public class HealthCheckObjectConfig {

private Long requestTimeoutMillis = 100000L;

private RocketmqConfig rocketmqConfig = new RocketmqConfig();

@Data
public class RocketmqConfig {
private RocketmqProperties rocketmqProperties = new RocketmqProperties();

private String nameServerUrl;
private String brokerUrl;
private String endPoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Rocketmq4BrokerCheck(HealthCheckObjectConfig healthCheckObjectConfig) {
public void doCheck(HealthCheckCallback callback) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
remotingClient.invokeAsync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis(),
remotingClient.invokeAsync(getConfig().getRocketmqProperties().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis(),
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
Expand All @@ -71,7 +71,7 @@ public void operationComplete(ResponseFuture responseFuture) {

@Override
public void init() {
if (getConfig().getRocketmqConfig().getBrokerUrl() == null || getConfig().getRocketmqConfig().getBrokerUrl().isEmpty()) {
if (getConfig().getRocketmqProperties().getBrokerUrl() == null || getConfig().getRocketmqProperties().getBrokerUrl().isEmpty()) {
throw new IllegalArgumentException("RocketmqCheck failed. BrokerUrl is null.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Rocketmq4NameServerCheck(HealthCheckObjectConfig healthCheckObjectConfig)
public void doCheck(HealthCheckCallback callback) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_NAMESRV_CONFIG, null);
remotingClient.invokeAsync(getConfig().getRocketmqConfig().getNameServerUrl(), request, getConfig().getRequestTimeoutMillis(),
remotingClient.invokeAsync(getConfig().getRocketmqProperties().getNamesrvAddr(), request, getConfig().getRequestTimeoutMillis(),
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
Expand All @@ -67,7 +67,7 @@ public void operationComplete(ResponseFuture responseFuture) {

@Override
public void init() {
if (getConfig().getRocketmqConfig().getNameServerUrl() == null || getConfig().getRocketmqConfig().getNameServerUrl().isEmpty()) {
if (getConfig().getRocketmqProperties().getNamesrvAddr() == null || getConfig().getRocketmqProperties().getNamesrvAddr().isEmpty()) {
throw new RuntimeException("RocketmqNameServerCheck init failed, nameServerUrl is empty");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.rocketmq.client.producer.SendStatus.SEND_OK;

import org.apache.eventmesh.dashboard.common.util.RocketmqUtils;
import org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant;
import org.apache.eventmesh.dashboard.console.constant.health.HealthConstant;
import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType;
Expand All @@ -37,14 +38,11 @@
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -138,30 +136,18 @@ public void init() {

//TODO there are many functions that can be reused, they should be collected in a util module
//this function that create topics can be reused
try {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(HealthConstant.ROCKETMQ_CHECK_TOPIC);
requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name());
requestHeader.setReadQueueNums(8);
requestHeader.setWriteQueueNums(8);
requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
Object result = remotingClient.invokeSync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis());
log.info(result.toString());
} catch (Exception e) {
log.error("RocketmqTopicCheck init failed when examining topic stats.", e);
return;
}
RocketmqUtils.createTopic(HealthConstant.ROCKETMQ_CHECK_TOPIC, TopicFilterType.SINGLE_TAG.name(), PermName.PERM_READ | PermName.PERM_WRITE,
getConfig().getRocketmqProperties().getBrokerUrl(), 8, 8, getConfig().getRequestTimeoutMillis());

try {
producer = new DefaultMQProducer(HealthConstant.ROCKETMQ_CHECK_PRODUCER_GROUP);
producer.setNamesrvAddr(getConfig().getRocketmqConfig().getNameServerUrl());
producer.setNamesrvAddr(getConfig().getRocketmqProperties().getNamesrvAddr());
producer.setCompressMsgBodyOverHowmuch(16);
producer.start();

consumer = new DefaultMQPushConsumer(HealthConstant.ROCKETMQ_CHECK_CONSUMER_GROUP);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setNamesrvAddr(getConfig().getRocketmqConfig().getNameServerUrl());
consumer.setNamesrvAddr(getConfig().getRocketmqProperties().getNamesrvAddr());
consumer.subscribe(HealthConstant.ROCKETMQ_CHECK_TOPIC, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RocketmqBrokerCheckTest {
@BeforeEach
public void init() {
HealthCheckObjectConfig config = new HealthCheckObjectConfig();
config.getRocketmqConfig().setBrokerUrl("127.0.0.1:10911");
config.getRocketmqProperties().setBrokerUrl("127.0.0.1:10911");
config.setRequestTimeoutMillis(1000L);
rocketmqCheck = new Rocketmq4BrokerCheck(config);
}
Expand Down
Loading