Skip to content

Commit

Permalink
[ISSUE apache#97] Runtime tcp http grpc client impl (apache#96)
Browse files Browse the repository at this point in the history
* runtime tcp http grpc client impl

* check

* check

* check

* check

* check

* check

* check

* check

* Upgrade the grpc version.

* check

* check

* ci try-catch

* ci try-catch

* redis client update

* redis client update

* redis client test

* ci

* ci

* etcd add connect time

* update

* optimisation
  • Loading branch information
Alonexc authored Apr 15, 2024
1 parent 76eb1ba commit 13d3728
Show file tree
Hide file tree
Showing 31 changed files with 1,365 additions and 44 deletions.
30 changes: 28 additions & 2 deletions eventmesh-dashboard-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@
</properties>

<dependencies>
<!-- grpc -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.51.0</version>
</dependency>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>

<!-- EventMesh Dashboard modules -->
<dependency>
<groupId>org.apache.eventmesh.dashboard.common</groupId>
Expand All @@ -52,6 +65,19 @@
<version>0.0.1-SNAPSHOT</version>
</dependency>

<!-- eventmesh -->
<dependency>
<groupId>org.apache.eventmesh</groupId>
<artifactId>eventmesh-sdk-java</artifactId>
<version>1.10.0-release</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Meta - nacos client -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
Expand All @@ -61,7 +87,7 @@
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.3.0</version>
<version>0.7.5</version>
</dependency>

<!-- health check client -->
Expand Down Expand Up @@ -121,4 +147,4 @@
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RocketMQProduceSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RocketMQPushConsumerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RocketMQRemotingSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeGrpcConsumerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeGrpcProducerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeHttpConsumerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeHttpProducerSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeTcpCloudEventSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeTcpEventMeshSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.operation.RuntimeTcpOpenMessageSDKOperation;

import java.util.AbstractMap.SimpleEntry;
import java.util.Map;
Expand All @@ -38,10 +46,17 @@
*/
public class SDKManager {

private static final SDKManager INSTANCE = new SDKManager();
private static volatile SDKManager INSTANCE = null;


public static SDKManager getInstance() {
public static synchronized SDKManager getInstance() {
if (INSTANCE == null) {
synchronized (SDKManager.class) {
if (INSTANCE == null) {
INSTANCE = new SDKManager();
}
}
}
return INSTANCE;
}

Expand Down Expand Up @@ -73,6 +88,17 @@ public static SDKManager getInstance() {

clientCreateOperationMap.put(SDKTypeEnum.META_ETCD, new EtcdSDKOperation());

clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_EVENTMESH_CLIENT, new RuntimeSDKOperation());

clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_TCP_CLOUDEVENT_CLIENT, new RuntimeTcpCloudEventSDKOperation());
clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_TCP_EVENTMESH_CLIENT, new RuntimeTcpEventMeshSDKOperation());
clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_TCP_OPENMESSAGE_CLIENT, new RuntimeTcpOpenMessageSDKOperation());

clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_HTTP_PRODUCER, new RuntimeHttpProducerSDKOperation());
clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_HTTP_CONSUMER, new RuntimeHttpConsumerSDKOperation());

clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_GRPC_PRODUCER, new RuntimeGrpcProducerSDKOperation());
clientCreateOperationMap.put(SDKTypeEnum.RUNTIME_GRPC_CONSUMER, new RuntimeGrpcConsumerSDKOperation());
}

private SDKManager() {
Expand Down Expand Up @@ -114,4 +140,9 @@ public void deleteClient(SDKTypeEnum clientTypeEnum, String uniqueKey) {
public Object getClient(SDKTypeEnum clientTypeEnum, String uniqueKey) {
return this.clientMap.get(clientTypeEnum).get(uniqueKey);
}

// get all client
public Map<String, Object> getClients(SDKTypeEnum clientTypeEnum) {
return this.clientMap.get(clientTypeEnum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ public enum SDKTypeEnum {
META_NACOS_NAMING,

META_ETCD,

RUNTIME_EVENTMESH_CLIENT,

RUNTIME_TCP_CLOUDEVENT_CLIENT,
RUNTIME_TCP_EVENTMESH_CLIENT,
RUNTIME_TCP_OPENMESSAGE_CLIENT,

RUNTIME_HTTP_PRODUCER,
RUNTIME_HTTP_CONSUMER,

RUNTIME_GRPC_PRODUCER,
RUNTIME_GRPC_CONSUMER,
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

package org.apache.eventmesh.dashboard.core.function.SDK.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CreateEtcdConfig implements CreateSDKConfig {

private String etcdServerAddress;

@Builder.Default()
private int connectTime = 10;

@Override
public String getUniqueKey() {
return etcdServerAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,24 @@

package org.apache.eventmesh.dashboard.core.function.SDK.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CreateRedisConfig implements CreateSDKConfig {

private String redisUrl;

private String password;

@Builder.Default
private int timeOut = 10;

@Override
public String getUniqueKey() {
return redisUrl;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.core.function.SDK.config;

import org.apache.eventmesh.common.protocol.tcp.UserAgent;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CreateRuntimeConfig implements CreateSDKConfig {

// 127.0.0.1:10105;127.0.0.2:10105
private String runtimeServerAddress;

// protocol example:HTTP,TCP,GRPC
private String protocol;
// for tcp:cloudevents,eventmeshmessage,openmessage
private String protocolName;

// producer or consumer
private String clientType;

// topic
private String topic;

// config
private String env;
private String idc;
private String ip;
private String sys;
private String pid;
private String username;
private String password;

// producer
private String producerGroup;

// consumer
private String consumerGroup;
private String subUrl;

// Agent
private UserAgent userAgent;

@Override
public String getUniqueKey() {
return runtimeServerAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateEtcdConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;

import io.etcd.jetcd.Client;
Expand All @@ -39,6 +40,7 @@ public SimpleEntry<String, KV> createClient(CreateSDKConfig clientConfig) {
try {
final Client client = Client.builder()
.endpoints(getSplitEndpoints(etcdConfig))
.connectTimeout(Duration.ofSeconds(etcdConfig.getConnectTime()))
.build();
kvClient = client.getKVClient();
} catch (EtcdException e) {
Expand All @@ -48,7 +50,7 @@ public SimpleEntry<String, KV> createClient(CreateSDKConfig clientConfig) {
}

private static String[] getSplitEndpoints(CreateEtcdConfig etcdConfig) {
return etcdConfig.getEtcdServerAddress().split(",");
return etcdConfig.getEtcdServerAddress().split(";");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.eventmesh.dashboard.core.function.SDK.wrapper.NacosSDKWrapper;

import java.util.AbstractMap.SimpleEntry;
import java.util.Objects;

import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.naming.NamingService;
Expand All @@ -34,8 +35,8 @@ public class NacosSDKOperation extends AbstractSDKOperation<NacosSDKWrapper> {
@Override
public SimpleEntry<String, NacosSDKWrapper> createClient(CreateSDKConfig createClientConfig) {
SimpleEntry<String, ConfigService> configSimpleEntry = nacosConfigClientCreateOperation.createClient(createClientConfig);
SimpleEntry namingSimpleEntry = nacosNamingClientCreateOperation.createClient(createClientConfig);
if (configSimpleEntry.getKey() != namingSimpleEntry.getKey()) {
SimpleEntry<String, NamingService> namingSimpleEntry = nacosNamingClientCreateOperation.createClient(createClientConfig);
if (!Objects.equals(configSimpleEntry.getKey(), namingSimpleEntry.getKey())) {
throw new RuntimeException("Nacos config and naming server address not match");
}
NacosSDKWrapper nacosClient = new NacosSDKWrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,28 @@
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRedisConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;

public class RedisSDKOperation extends AbstractSDKOperation<StatefulRedisConnection<String, String>> {

@Override
public SimpleEntry<String, StatefulRedisConnection<String, String>> createClient(CreateSDKConfig clientConfig) {
String redisUrl = ((CreateRedisConfig) clientConfig).getRedisUrl();
RedisClient redisClient = RedisClient.create(redisUrl);
CreateRedisConfig redisConfig = (CreateRedisConfig) clientConfig;
String redisUrl = redisConfig.getRedisUrl();
String clientHost = redisUrl.split(":")[0];
int clientPort = Integer.parseInt(redisUrl.split(":")[1]);
RedisURI redisURI = RedisURI.builder()
.withHost(clientHost)
.withPort(clientPort)
.withPassword(redisConfig.getPassword())
.withTimeout(Duration.ofSeconds(redisConfig.getTimeOut()))
.build();
RedisClient redisClient = RedisClient.create(redisURI);
return new SimpleEntry<>(clientConfig.getUniqueKey(), redisClient.connect());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.dashboard.core.function.SDK.operation;

import static org.apache.eventmesh.dashboard.core.function.SDK.util.RuntimeSDKOperationUtils.buildEventMeshGrpcConsumerConfig;

import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.dashboard.core.function.SDK.AbstractSDKOperation;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateRuntimeConfig;
import org.apache.eventmesh.dashboard.core.function.SDK.config.CreateSDKConfig;

import java.util.AbstractMap.SimpleEntry;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RuntimeGrpcConsumerSDKOperation extends AbstractSDKOperation<EventMeshGrpcConsumer> {

@Override
public SimpleEntry<String, EventMeshGrpcConsumer> createClient(CreateSDKConfig clientConfig) {
final CreateRuntimeConfig runtimeConfig = (CreateRuntimeConfig) clientConfig;
final EventMeshGrpcClientConfig grpcClientConfig = buildEventMeshGrpcConsumerConfig(runtimeConfig);
EventMeshGrpcConsumer grpcConsumer = null;
try {
grpcConsumer = new EventMeshGrpcConsumer(grpcClientConfig);
grpcConsumer.init();
} catch (EventMeshException e) {
log.error("create runtime grpc Consumer client failed", e);
}
return new SimpleEntry<>(clientConfig.getUniqueKey(), grpcConsumer);
}

@Override
public void close(Object client) {
castClient(client).close();
}
}
Loading

0 comments on commit 13d3728

Please sign in to comment.