diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheck.java new file mode 100644 index 00000000..db82e1fb --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheck.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.meta; + +import static org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant.HEALTH_CHECK_SUBTYPE_NACOS_CONFIG; +import static org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant.HEALTH_CHECK_TYPE_META; +import static org.apache.eventmesh.dashboard.console.constant.health.HealthConstant.NACOS_CHECK_CONTENT; +import static org.apache.eventmesh.dashboard.console.constant.health.HealthConstant.NACOS_CHECK_DATA_ID; +import static org.apache.eventmesh.dashboard.console.constant.health.HealthConstant.NACOS_CHECK_GROUP; + +import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType; +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.exception.NacosException; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@HealthCheckType(type = HEALTH_CHECK_TYPE_META, subType = HEALTH_CHECK_SUBTYPE_NACOS_CONFIG) +public class NacosConfigCheck extends AbstractHealthCheckService { + + private ConfigService configService; + + public NacosConfigCheck(HealthCheckObjectConfig healthCheckObjectConfig) { + super(healthCheckObjectConfig); + } + + @Override + public void doCheck(HealthCheckCallback callback) { + CompletableFuture.runAsync(() -> { + try { + String content = configService.getConfig(NACOS_CHECK_DATA_ID, NACOS_CHECK_GROUP, getConfig().getRequestTimeoutMillis()); + if (NACOS_CHECK_CONTENT.equals(content)) { + callback.onSuccess(); + } else { + callback.onFail(new RuntimeException("NacosCheck failed. Content is wrong.")); + } + } catch (NacosException e) { + callback.onFail(e); + } + }); + } + + @Override + public void init() { + //create a config + try { + Properties properties = new Properties(); + properties.put("serverAddr", getConfig().getConnectUrl()); + ConfigService configService = NacosFactory.createConfigService(properties); + boolean isPublishOk = configService.publishConfig(NACOS_CHECK_DATA_ID, NACOS_CHECK_GROUP, + NACOS_CHECK_CONTENT); + if (!isPublishOk) { + log.error("NacosCheck init failed caused by crate config failed"); + } + } catch (NacosException e) { + log.error("NacosCheck init failed caused by {}", e.getErrMsg()); + } + + try { + Properties properties = new Properties(); + properties.put("serverAddr", getConfig().getConnectUrl()); + configService = NacosFactory.createConfigService(properties); + } catch (NacosException e) { + log.error("NacosCheck init failed caused by {}", e.getErrMsg()); + } + } + + @Override + public void destroy() { + if (configService != null) { + try { + configService.removeConfig(NACOS_CHECK_DATA_ID, NACOS_CHECK_GROUP); + } catch (NacosException e) { + log.error("NacosCheck destroy failed caused by {}", e.getErrMsg()); + } + + } + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosRegisterCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosRegisterCheck.java new file mode 100644 index 00000000..90ec2ea4 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosRegisterCheck.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.meta; + +import static org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant.HEALTH_CHECK_SUBTYPE_NACOS_REGISTER; +import static org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant.HEALTH_CHECK_TYPE_META; +import static org.apache.eventmesh.dashboard.console.constant.health.HealthConstant.NACOS_CHECK_SERVICE_CLUSTER_NAME; +import static org.apache.eventmesh.dashboard.console.constant.health.HealthConstant.NACOS_CHECK_SERVICE_NAME; + +import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType; +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingFactory; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.pojo.Instance; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +@HealthCheckType(type = HEALTH_CHECK_TYPE_META, subType = HEALTH_CHECK_SUBTYPE_NACOS_REGISTER) +public class NacosRegisterCheck extends AbstractHealthCheckService { + + private NamingService namingService; + + public NacosRegisterCheck(HealthCheckObjectConfig healthCheckObjectConfig) { + super(healthCheckObjectConfig); + } + + @Override + public void doCheck(HealthCheckCallback callback) { + CompletableFuture.runAsync(() -> { + try { + Instance result = namingService.selectOneHealthyInstance(NACOS_CHECK_SERVICE_NAME); + if (result.isHealthy()) { + callback.onSuccess(); + } else { + callback.onFail(new RuntimeException("NacosCheck failed. Service is not healthy.")); + } + } catch (NacosException e) { + callback.onFail(e); + } + }); + } + + @Override + public void init() { + try { + Properties properties = new Properties(); + properties.put("serverAddr", getConfig().getConnectUrl()); + namingService = NamingFactory.createNamingService(properties); + namingService.registerInstance(NACOS_CHECK_SERVICE_NAME, "11.11.11.11", 8888, NACOS_CHECK_SERVICE_CLUSTER_NAME); + } catch (NacosException e) { + log.error("NacosRegisterCheck init failed", e); + } + } + + @Override + public void destroy() { + try { + namingService.deregisterInstance(NACOS_CHECK_SERVICE_NAME, "11.11.11.11", 8888, NACOS_CHECK_SERVICE_CLUSTER_NAME); + } catch (NacosException e) { + log.error("NacosRegisterCheck destroy failed", e); + } + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRedisCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/RedisCheck.java similarity index 73% rename from eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRedisCheck.java rename to eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/RedisCheck.java index 00cfad8e..71488ac7 100644 --- a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/health/check/impl/StorageRedisCheck.java +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/RedisCheck.java @@ -15,12 +15,14 @@ * limitations under the License. */ -package org.apache.eventmesh.dashboard.console.health.check.impl; +package org.apache.eventmesh.dashboard.console.function.health.check.impl.storage; -import org.apache.eventmesh.dashboard.console.health.annotation.HealthCheckType; -import org.apache.eventmesh.dashboard.console.health.callback.HealthCheckCallback; -import org.apache.eventmesh.dashboard.console.health.check.AbstractHealthCheckService; -import org.apache.eventmesh.dashboard.console.health.check.config.HealthCheckObjectConfig; +import static org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant.HEALTH_CHECK_TYPE_STORAGE; + +import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType; +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; import java.time.Duration; import java.util.Objects; @@ -30,13 +32,15 @@ import io.lettuce.core.RedisURI.Builder; import io.lettuce.core.api.async.RedisAsyncCommands; +import lombok.extern.slf4j.Slf4j; -@HealthCheckType(type = "storage", subType = "redis") -public class StorageRedisCheck extends AbstractHealthCheckService { +@Slf4j +@HealthCheckType(type = HEALTH_CHECK_TYPE_STORAGE, subType = "redis") +public class RedisCheck extends AbstractHealthCheckService { private RedisClient redisClient; - public StorageRedisCheck(HealthCheckObjectConfig healthCheckObjectConfig) { + public RedisCheck(HealthCheckObjectConfig healthCheckObjectConfig) { super(healthCheckObjectConfig); } @@ -55,6 +59,7 @@ public void doCheck(HealthCheckCallback callback) { return null; }); } catch (Exception e) { + log.error(e.toString()); callback.onFail(e); } } @@ -79,4 +84,11 @@ public void init() { } redisClient = RedisClient.create(redisUrl); } + + @Override + public void destroy() { + if (redisClient != null) { + redisClient.shutdown(); + } + } } diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4BrokerCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4BrokerCheck.java new file mode 100644 index 00000000..5fb92cfa --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4BrokerCheck.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.storage.rocketmq4; + + +import static org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant.HEALTH_CHECK_SUBTYPE_ROCKETMQ_BROKER; +import static org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant.HEALTH_CHECK_TYPE_STORAGE; + +import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType; +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.netty.ResponseFuture; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@HealthCheckType(type = HEALTH_CHECK_TYPE_STORAGE, subType = HEALTH_CHECK_SUBTYPE_ROCKETMQ_BROKER) +public class Rocketmq4BrokerCheck extends AbstractHealthCheckService { + + private RemotingClient remotingClient; + + + public Rocketmq4BrokerCheck(HealthCheckObjectConfig healthCheckObjectConfig) { + super(healthCheckObjectConfig); + } + + @Override + public void doCheck(HealthCheckCallback callback) { + try { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null); + remotingClient.invokeAsync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis(), + new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + if (responseFuture.isSendRequestOK()) { + callback.onSuccess(); + } else { + callback.onFail(new RuntimeException("RocketmqNameServerCheck failed caused by " + responseFuture.getCause())); + } + } + + }); + } catch (Exception e) { + log.error("RocketmqCheck failed.", e); + callback.onFail(e); + } + } + + @Override + public void init() { + if (getConfig().getRocketmqConfig().getBrokerUrl() == null || getConfig().getRocketmqConfig().getBrokerUrl().isEmpty()) { + throw new IllegalArgumentException("RocketmqCheck failed. BrokerUrl is null."); + } + + NettyClientConfig config = new NettyClientConfig(); + config.setUseTLS(false); + remotingClient = new NettyRemotingClient(config); + remotingClient.start(); + + if (getConfig().getConnectUrl() == null || getConfig().getConnectUrl().isEmpty()) { + if (getConfig().getHost() != null && getConfig().getPort() != null) { + getConfig().setConnectUrl(getConfig().getHost() + ":" + getConfig().getPort()); + } + } + + if (getConfig().getConnectUrl() == null) { + log.error("RocketmqCheck failed. ConnectUrl is null."); + } + } + + @Override + public void destroy() { + + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4NameServerCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4NameServerCheck.java new file mode 100644 index 00000000..08df28b0 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4NameServerCheck.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.storage.rocketmq4; + +import org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant; +import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType; +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.netty.ResponseFuture; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@HealthCheckType(type = HealthCheckTypeConstant.HEALTH_CHECK_TYPE_STORAGE, subType = HealthCheckTypeConstant.HEALTH_CHECK_SUBTYPE_ROCKETMQ_NAMESERVER) +public class Rocketmq4NameServerCheck extends AbstractHealthCheckService { + + private RemotingClient remotingClient; + + public Rocketmq4NameServerCheck(HealthCheckObjectConfig healthCheckObjectConfig) { + super(healthCheckObjectConfig); + } + + @Override + public void doCheck(HealthCheckCallback callback) { + try { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_NAMESRV_CONFIG, null); + remotingClient.invokeAsync(getConfig().getRocketmqConfig().getNameServerUrl(), request, getConfig().getRequestTimeoutMillis(), + new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + if (responseFuture.isSendRequestOK()) { + callback.onSuccess(); + } else { + callback.onFail(new RuntimeException("RocketmqNameServerCheck failed caused by " + responseFuture.getCause())); + } + } + + }); + } catch (Exception e) { + log.error("RocketmqCheck failed.", e); + callback.onFail(e); + } + } + + @Override + public void init() { + if (getConfig().getRocketmqConfig().getNameServerUrl() == null || getConfig().getRocketmqConfig().getNameServerUrl().isEmpty()) { + throw new RuntimeException("RocketmqNameServerCheck init failed, nameServerUrl is empty"); + } + + NettyClientConfig config = new NettyClientConfig(); + config.setUseTLS(false); + remotingClient = new NettyRemotingClient(config); + remotingClient.start(); + + } + + @Override + public void destroy() { + + } +} diff --git a/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4TopicCheck.java b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4TopicCheck.java new file mode 100644 index 00000000..75547624 --- /dev/null +++ b/eventmesh-dashboard-console/src/main/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/Rocketmq4TopicCheck.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.storage.rocketmq4; + +import static org.apache.rocketmq.client.producer.SendStatus.SEND_OK; + +import org.apache.eventmesh.dashboard.console.constant.health.HealthCheckTypeConstant; +import org.apache.eventmesh.dashboard.console.constant.health.HealthConstant; +import org.apache.eventmesh.dashboard.console.function.health.annotation.HealthCheckType; +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.AbstractHealthCheckService; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@HealthCheckType(type = HealthCheckTypeConstant.HEALTH_CHECK_TYPE_STORAGE, subType = HealthCheckTypeConstant.HEALTH_CHECK_SUBTYPE_ROCKETMQ_TOPIC) +public class Rocketmq4TopicCheck extends AbstractHealthCheckService { + + private RemotingClient remotingClient; + + private DefaultMQPushConsumer consumer; + + private DefaultMQProducer producer; + + private Long startTime; + + private BlockingQueue consumedMessages = new LinkedBlockingQueue<>(); + + public Rocketmq4TopicCheck(HealthCheckObjectConfig healthCheckObjectConfig) { + super(healthCheckObjectConfig); + } + + @Override + public void doCheck(HealthCheckCallback callback) { + startTime = System.currentTimeMillis(); + String uuid = UUID.randomUUID().toString(); + log.debug("RocketmqTopicCheck start, uuid:{}", uuid); + try { + Message msg = new Message(HealthConstant.ROCKETMQ_CHECK_TOPIC, "eventmesh-dashboard-rocketmq-topic-check", uuid + .getBytes( + RemotingHelper.DEFAULT_CHARSET)); + synchronized (this) { + producer.send(msg, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + if (!sendResult.getSendStatus().equals(SEND_OK)) { + log.error("send message failed, sendResult:{}", sendResult); + callback.onFail(new Exception("send message failed for reason:" + sendResult.getSendStatus().toString())); + return; + } + consume(callback, uuid); + } + + @Override + public void onException(Throwable e) { + log.error("send message failed", e); + callback.onFail((Exception) e); + } + }); + } + + } catch (Exception e) { + log.error("RocketmqTopicCheck failed when producing message.", e); + callback.onFail(e); + } + + } + + private synchronized void consume(HealthCheckCallback callback, String uuid) { + try { + while (System.currentTimeMillis() - startTime < getConfig().getRequestTimeoutMillis()) { + Message message = consumedMessages.poll(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + if (message != null) { + log.debug("RocketmqTopicCheck consume message:{}", new String(message.getBody())); + if (Arrays.equals(message.getBody(), uuid.getBytes())) { + callback.onSuccess(); + return; + } + } + } + callback.onFail(new TimeoutException("consume message timeout")); + } catch (Exception e) { + log.error("RocketmqTopicCheck failed when consuming message.", e); + callback.onFail(e); + } + } + + @Override + public void init() { + NettyClientConfig config = new NettyClientConfig(); + config.setUseTLS(false); + remotingClient = new NettyRemotingClient(config); + remotingClient.start(); + + //TODO there are many functions that can be reused, they should be collected in a util module + //this function that create topics can be reused + try { + CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); + requestHeader.setTopic(HealthConstant.ROCKETMQ_CHECK_TOPIC); + requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name()); + requestHeader.setReadQueueNums(8); + requestHeader.setWriteQueueNums(8); + requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); + Object result = remotingClient.invokeSync(getConfig().getRocketmqConfig().getBrokerUrl(), request, getConfig().getRequestTimeoutMillis()); + log.info(result.toString()); + } catch (Exception e) { + log.error("RocketmqTopicCheck init failed when examining topic stats.", e); + return; + } + + try { + producer = new DefaultMQProducer(HealthConstant.ROCKETMQ_CHECK_PRODUCER_GROUP); + producer.setNamesrvAddr(getConfig().getRocketmqConfig().getNameServerUrl()); + producer.setCompressMsgBodyOverHowmuch(16); + producer.start(); + + consumer = new DefaultMQPushConsumer(HealthConstant.ROCKETMQ_CHECK_CONSUMER_GROUP); + consumer.setMessageModel(MessageModel.CLUSTERING); + consumer.setNamesrvAddr(getConfig().getRocketmqConfig().getNameServerUrl()); + consumer.subscribe(HealthConstant.ROCKETMQ_CHECK_TOPIC, "*"); + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + consumedMessages.addAll(list); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.start(); + + } catch (Exception e) { + log.error("RocketmqCheck initialization failed when creating Rocketmq4 clients.", e); + } + + + } + + @Override + public void destroy() { + producer.shutdown(); + consumer.shutdown(); + } +} + diff --git a/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheckTest.java b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheckTest.java new file mode 100644 index 00000000..66133958 --- /dev/null +++ b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosConfigCheckTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.meta; + +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import java.util.concurrent.CountDownLatch; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class NacosConfigCheckTest { + + private NacosConfigCheck nacosCheck; + + @BeforeEach + public void init() { + HealthCheckObjectConfig config = new HealthCheckObjectConfig(); + config.setInstanceId(1L); + config.setHealthCheckResourceType("meta"); + config.setHealthCheckResourceSubType("nacos"); + config.setClusterId(1L); + config.setConnectUrl("127.0.0.1:8848"); + nacosCheck = new NacosConfigCheck(config); + } + + @Test + public void testDoCheck() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + nacosCheck.doCheck(new HealthCheckCallback() { + @Override + public void onSuccess() { + latch.countDown(); + log.info("{} success", this.getClass().getSimpleName()); + } + + @Override + public void onFail(Exception e) { + latch.countDown(); + log.error("{}, failed for reason {}", this.getClass().getSimpleName(), e); + } + }); + latch.await(); + } + + @AfterEach + public void destroy() { + nacosCheck.destroy(); + } +} \ No newline at end of file diff --git a/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosRegisterCheckTest.java b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosRegisterCheckTest.java new file mode 100644 index 00000000..1b97df16 --- /dev/null +++ b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/meta/NacosRegisterCheckTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.meta; + +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import java.util.concurrent.CountDownLatch; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class NacosRegisterCheckTest { + + private NacosRegisterCheck nacosRegisterCheck; + + @BeforeEach + public void init() { + HealthCheckObjectConfig config = new HealthCheckObjectConfig(); + config.setInstanceId(1L); + config.setHealthCheckResourceType("meta"); + config.setHealthCheckResourceSubType("nacos"); + config.setClusterId(1L); + config.setConnectUrl("127.0.0.1:8848"); + nacosRegisterCheck = new NacosRegisterCheck(config); + } + + @Test + public void testDoCheck() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + nacosRegisterCheck.doCheck(new HealthCheckCallback() { + @Override + public void onSuccess() { + latch.countDown(); + log.info("{} success", this.getClass().getSimpleName()); + } + + @Override + public void onFail(Exception e) { + latch.countDown(); + log.error("{}, failed for reason {}", this.getClass().getSimpleName(), e); + } + }); + latch.await(); + } + + @AfterEach + public void destroy() { + nacosRegisterCheck.destroy(); + } +} \ No newline at end of file diff --git a/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/RocketmqBrokerCheckTest.java b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/RocketmqBrokerCheckTest.java new file mode 100644 index 00000000..abf7ab29 --- /dev/null +++ b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/RocketmqBrokerCheckTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.storage.rocketmq4; + +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import java.util.concurrent.CountDownLatch; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class RocketmqBrokerCheckTest { + + private Rocketmq4BrokerCheck rocketmqCheck; + + @BeforeEach + public void init() { + HealthCheckObjectConfig config = new HealthCheckObjectConfig(); + config.getRocketmqConfig().setBrokerUrl("127.0.0.1:10911"); + config.setRequestTimeoutMillis(1000L); + rocketmqCheck = new Rocketmq4BrokerCheck(config); + } + + @Test + public void testDoCheck() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + rocketmqCheck.doCheck(new HealthCheckCallback() { + @Override + public void onSuccess() { + latch.countDown(); + log.info("{} success", this.getClass().getSimpleName()); + } + + @Override + public void onFail(Exception e) { + latch.countDown(); + log.error("{}, failed for reason {}", this.getClass().getSimpleName(), e); + } + }); + latch.await(); + } +} \ No newline at end of file diff --git a/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/RocketmqNameserverCheckTest.java b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/RocketmqNameserverCheckTest.java new file mode 100644 index 00000000..92677bae --- /dev/null +++ b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/RocketmqNameserverCheckTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.storage.rocketmq4; + +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import java.util.concurrent.CountDownLatch; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class RocketmqNameserverCheckTest { + + private Rocketmq4NameServerCheck rocketmqCheck; + + @BeforeEach + public void init() { + HealthCheckObjectConfig config = new HealthCheckObjectConfig(); + config.getRocketmqConfig().setNameServerUrl("127.0.0.1:9876"); + config.setRequestTimeoutMillis(1000L); + rocketmqCheck = new Rocketmq4NameServerCheck(config); + } + + @Test + public void testDoCheck() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + rocketmqCheck.doCheck(new HealthCheckCallback() { + @Override + public void onSuccess() { + latch.countDown(); + log.info("{} success", this.getClass().getSimpleName()); + } + + @Override + public void onFail(Exception e) { + latch.countDown(); + log.error("{}, failed for reason {}", this.getClass().getSimpleName(), e); + } + }); + latch.await(); + } +} \ No newline at end of file diff --git a/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/RocketmqTopicCheckTest.java b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/RocketmqTopicCheckTest.java new file mode 100644 index 00000000..8a90a5bd --- /dev/null +++ b/eventmesh-dashboard-console/src/test/java/org/apache/eventmesh/dashboard/console/function/health/check/impl/storage/rocketmq4/RocketmqTopicCheckTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.dashboard.console.function.health.check.impl.storage.rocketmq4; + +import org.apache.eventmesh.dashboard.console.function.health.callback.HealthCheckCallback; +import org.apache.eventmesh.dashboard.console.function.health.check.config.HealthCheckObjectConfig; + +import java.util.concurrent.CountDownLatch; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class RocketmqTopicCheckTest { + + private Rocketmq4TopicCheck rocketmqCheck; + + @BeforeEach + public void init() throws InterruptedException { + HealthCheckObjectConfig config = new HealthCheckObjectConfig(); + config.getRocketmqConfig().setBrokerUrl("127.0.0.1:10911"); + config.getRocketmqConfig().setNameServerUrl("127.0.0.1:9876"); + config.setRequestTimeoutMillis(1000000L); + rocketmqCheck = new Rocketmq4TopicCheck(config); + } + + @Test + public void testDoCheck() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + rocketmqCheck.doCheck(new HealthCheckCallback() { + @Override + public void onSuccess() { + latch.countDown(); + log.info("{} success", this.getClass().getSimpleName()); + } + + @Override + public void onFail(Exception e) { + latch.countDown(); + log.error("{}, failed for reason {}", this.getClass().getSimpleName(), e); + } + }); + rocketmqCheck.doCheck(new HealthCheckCallback() { + @Override + public void onSuccess() { + latch.countDown(); + log.info("{} success", this.getClass().getSimpleName()); + } + + @Override + public void onFail(Exception e) { + latch.countDown(); + log.error("{}, failed for reason {}", this.getClass().getSimpleName(), e); + } + }); + latch.await(); + } +} \ No newline at end of file