diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md index adc9703796..ecafc29f6e 100644 --- a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md +++ b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md @@ -62,8 +62,9 @@ sh start.sh - eventmesh-runtime : eventmesh运行时模块 - eventmesh-sdk-java : eventmesh java客户端sdk - eventmesh-starter : eventmesh本地启动运行项目入口 +- eventmesh-spi : eventmesh SPI加载模块 -> 注:插件模块遵循java spi机制,需要在对应模块中的/main/resources/META-INF/services 下配置相关接口与实现类的映射文件 +> 注:插件模块遵循eventmesh定义的spi机制,需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件 **2.3.2 配置VM启动参数** @@ -75,18 +76,17 @@ sh start.sh ``` > 注:如果操作系统为Windows, 可能需要将文件分隔符换成\ -**2.3.3 配置build.gradle文件** +**2.3.3 配置插件** -通过修改dependencies,compile project 项来指定项目启动后加载的插件 +在`eventMesh.properties`配置文件通过声明式的方式来指定项目启动后需要加载的插件 -修改`eventmesh-starter`模块下面的`build.gradle`文件 +修改`confPath`目录下面的`eventMesh.properties`文件 -加载**RocketMQ**插件配置: +加载**RocketMQ Connector**插件配置: ```java -dependencies { - compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq") -} +#connector plugin +eventMesh.connector.plugin.type=rocketmq ``` **2.3.4 启动运行** diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md index 0536e5b6ee..dd89795f3d 100644 --- a/docs/en/instructions/eventmesh-runtime-quickstart.md +++ b/docs/en/instructions/eventmesh-runtime-quickstart.md @@ -62,9 +62,10 @@ Same with 1.2 - eventmesh-runtime : eventmesh runtime module - eventmesh-sdk-java : eventmesh java client sdk - eventmesh-starter : eventmesh project local start entry +- eventmesh-spi : eventmesh SPI load module -> ps: The loading of connector plugin follows the Java SPI mechanism, it's necessary to configure the mapping file of -related interface and implementation class under /main/resources/meta-inf/services in the corresponding module +> ps: The loading of connector plugin follows the eventmesh SPI mechanism, it's necessary to configure the mapping file of +related interface and implementation class under /main/resources/meta-inf/eventmesh in the corresponding module **2.3.2 Configure VM Options** @@ -76,18 +77,17 @@ related interface and implementation class under /main/resources/meta-inf/servic ``` > ps: If you use Windows, you may need to replace the file separator to \ -**2.3.3 Configure build.gradle file** +**2.3.3 Configure plugin** -Specify the connector that will be loaded after the project start with updating compile project item in dependencies +Specify the connector plugin that will be loaded after the project start by declaring in `eventMesh.properties` -update `build.gradle` file under the `eventmesh-starter` module +Modify the `eventMesh.properties` file in the `confPath` directory load **rocketmq connector** configuration: ```java -dependencies { - compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq") -} +#connector plugin +eventMesh.connector.plugin.type=rocketmq ``` **2.3.4 Run** diff --git a/docs/images/project-structure.png b/docs/images/project-structure.png index 252a9536c6..efc5249e59 100644 Binary files a/docs/images/project-structure.png and b/docs/images/project-structure.png differ diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java index 08a44cb8e1..1ae6b2604a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java @@ -17,17 +17,10 @@ package org.apache.eventmesh.common.config; -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Enumeration; - import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.IPUtil; public class CommonConfiguration { public String eventMeshEnv = "P"; @@ -35,7 +28,7 @@ public class CommonConfiguration { public String eventMeshCluster = "LS"; public String eventMeshName = ""; public String sysID = "5477"; - + public String eventMeshConnectorPluginType = "rocketmq"; public String namesrvAddr = ""; public String clientUserName = "username"; @@ -84,8 +77,11 @@ public void init() { eventMeshServerIp = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP); if (StringUtils.isBlank(eventMeshServerIp)) { - eventMeshServerIp = getLocalAddr(); + eventMeshServerIp = IPUtil.getLocalAddress(); } + + eventMeshConnectorPluginType = configurationWraper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE)); } } @@ -105,94 +101,7 @@ static class ConfKeys { public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = "eventMesh.server.registry.registerIntervalInMills"; public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills"; - } - - public static String getLocalAddr() { - //priority of networkInterface when generating client ip - String priority = System.getProperty("networkInterface.priority", "bond1 preferList = new ArrayList(); - for (String eth : priority.split("<")) { - preferList.add(eth); - } - NetworkInterface preferNetworkInterface = null; - - try { - Enumeration enumeration1 = NetworkInterface.getNetworkInterfaces(); - while (enumeration1.hasMoreElements()) { - final NetworkInterface networkInterface = enumeration1.nextElement(); - if (!preferList.contains(networkInterface.getName())) { - continue; - } else if (preferNetworkInterface == null) { - preferNetworkInterface = networkInterface; - } - //get the networkInterface that has higher priority - else if (preferList.indexOf(networkInterface.getName()) - > preferList.indexOf(preferNetworkInterface.getName())) { - preferNetworkInterface = networkInterface; - } - } - - // Traversal Network interface to get the first non-loopback and non-private address - ArrayList ipv4Result = new ArrayList(); - ArrayList ipv6Result = new ArrayList(); - - if (preferNetworkInterface != null) { - final Enumeration en = preferNetworkInterface.getInetAddresses(); - getIpResult(ipv4Result, ipv6Result, en); - } else { - Enumeration enumeration = NetworkInterface.getNetworkInterfaces(); - while (enumeration.hasMoreElements()) { - final NetworkInterface networkInterface = enumeration.nextElement(); - final Enumeration en = networkInterface.getInetAddresses(); - getIpResult(ipv4Result, ipv6Result, en); - } - } - - // prefer ipv4 - if (!ipv4Result.isEmpty()) { - for (String ip : ipv4Result) { - if (ip.startsWith("127.0") || ip.startsWith("192.168")) { - continue; - } - - return ip; - } - return ipv4Result.get(ipv4Result.size() - 1); - } else if (!ipv6Result.isEmpty()) { - return ipv6Result.get(0); - } - //If failed to find,fall back to localhost - final InetAddress localHost = InetAddress.getLocalHost(); - return normalizeHostAddress(localHost); - } catch (SocketException e) { - e.printStackTrace(); - } catch (UnknownHostException e) { - e.printStackTrace(); - } - - return null; - } - - public static String normalizeHostAddress(final InetAddress localHost) { - if (localHost instanceof Inet6Address) { - return "[" + localHost.getHostAddress() + "]"; - } else { - return localHost.getHostAddress(); - } - } - - private static void getIpResult(ArrayList ipv4Result, ArrayList ipv6Result, - Enumeration en) { - while (en.hasMoreElements()) { - final InetAddress address = en.nextElement(); - if (!address.isLoopbackAddress()) { - if (address instanceof Inet6Address) { - ipv6Result.add(normalizeHostAddress(address)); - } else { - ipv4Result.add(normalizeHostAddress(address)); - } - } - } + public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type"; } } \ No newline at end of file diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-common/src/test/resources/configuration.properties index d7c50968b3..76f29f2771 100644 --- a/eventmesh-common/src/test/resources/configuration.properties +++ b/eventmesh-common/src/test/resources/configuration.properties @@ -21,3 +21,4 @@ eventMesh.sysid=3 eventMesh.server.cluster=value4 eventMesh.server.name=value5 eventMesh.server.hostIp=value6 +eventMesh.connector.plugin.type=rocketmq diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-connector-api/build.gradle index 2d1205df38..157048ecc4 100644 --- a/eventmesh-connector-api/build.gradle +++ b/eventmesh-connector-api/build.gradle @@ -20,6 +20,6 @@ List open_message = [ ] dependencies { - implementation open_message,project(":eventmesh-common") - testImplementation open_message,project(":eventmesh-common") + implementation open_message,project(":eventmesh-common"), project(":eventmesh-spi") + testImplementation open_message,project(":eventmesh-common"), project(":eventmesh-spi") } diff --git a/eventmesh-connector-api/gradle.properties b/eventmesh-connector-api/gradle.properties index ae30087cf9..9d1744e07a 100644 --- a/eventmesh-connector-api/gradle.properties +++ b/eventmesh-connector-api/gradle.properties @@ -16,6 +16,6 @@ # group=org.apache.eventmesh version=1.2.0-SNAPSHOT -jdk=1.7 +jdk=1.8 mavenUserName= mavenPassword= \ No newline at end of file diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java index 5e60e0e0df..4ac1edbfc8 100644 --- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java +++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java @@ -25,7 +25,9 @@ import io.openmessaging.api.Message; import org.apache.eventmesh.api.AbstractContext; +import org.apache.eventmesh.spi.EventMeshSPI; +@EventMeshSPI public interface MeshMQPushConsumer extends Consumer { void init(Properties keyValue) throws Exception; diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java index 82ca583ce7..c717385e05 100644 --- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java +++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java @@ -24,7 +24,9 @@ import io.openmessaging.api.SendCallback; import org.apache.eventmesh.api.RRCallback; +import org.apache.eventmesh.spi.EventMeshSPI; +@EventMeshSPI public interface MeshMQProducer extends Producer { void init(Properties properties) throws Exception; diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer similarity index 90% rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer rename to eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer index c98880a841..0df2e286d7 100644 --- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer +++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl \ No newline at end of file +rocketmq=org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl \ No newline at end of file diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer similarity index 90% rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer rename to eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer index 28907ca176..ef4959d994 100644 --- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer +++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl \ No newline at end of file +rocketmq=org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl \ No newline at end of file diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index 09dd79f9c2..e5bb065659 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -31,6 +31,6 @@ List open_message = [ dependencies { - implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common") - testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api") + implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi") + testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi") } diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 035b950fc6..45fc193b4f 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -51,4 +51,7 @@ eventMesh.server.admin.http.port=10106 eventMesh.server.registry.registerIntervalInMills=10000 eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000 #auto-ack -#eventMesh.server.defibus.client.comsumeTimeoutInMin=5 \ No newline at end of file +#eventMesh.server.defibus.client.comsumeTimeoutInMin=5 + +#connector plugin +eventMesh.connector.plugin.type=rocketmq \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java index 080b7af40a..b6298542b5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.Properties; -import java.util.ServiceLoader; import io.openmessaging.api.AsyncMessageListener; import io.openmessaging.api.Message; @@ -35,6 +34,14 @@ public class MQConsumerWrapper extends MQWrapper { protected MeshMQPushConsumer meshMQPushConsumer; + public MQConsumerWrapper(String connectorPluginType) { + this.meshMQPushConsumer = PluginFactory.getMeshMQPushConsumer(connectorPluginType); + if (meshMQPushConsumer == null) { + logger.error("can't load the meshMQPushConsumer plugin, please check."); + throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check."); + } + } + public void subscribe(String topic, AsyncMessageListener listener) throws Exception { meshMQPushConsumer.subscribe(topic, listener); } @@ -44,24 +51,10 @@ public void unsubscribe(String topic) throws Exception { } public synchronized void init(Properties keyValue) throws Exception { - meshMQPushConsumer = getMeshMQPushConsumer(); - if (meshMQPushConsumer == null) { - logger.error("can't load the meshMQPushConsumer plugin, please check."); - throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check."); - } - meshMQPushConsumer.init(keyValue); inited.compareAndSet(false, true); } - private MeshMQPushConsumer getMeshMQPushConsumer() { - ServiceLoader meshMQPushConsumerServiceLoader = ServiceLoader.load(MeshMQPushConsumer.class); - if (meshMQPushConsumerServiceLoader.iterator().hasNext()) { - return meshMQPushConsumerServiceLoader.iterator().next(); - } - return null; - } - public synchronized void start() throws Exception { meshMQPushConsumer.start(); started.compareAndSet(false, true); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java index 082ab3bc00..60fe8421af 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java @@ -34,27 +34,21 @@ public class MQProducerWrapper extends MQWrapper { protected MeshMQProducer meshMQProducer; - public synchronized void init(Properties keyValue) throws Exception { - if (inited.get()) { - return; - } - - meshMQProducer = getSpiMeshMQProducer(); + public MQProducerWrapper(String connectorPluginType) { + this.meshMQProducer = PluginFactory.getMeshMQProducer(connectorPluginType); if (meshMQProducer == null) { logger.error("can't load the meshMQProducer plugin, please check."); throw new RuntimeException("doesn't load the meshMQProducer plugin, please check."); } - meshMQProducer.init(keyValue); - - inited.compareAndSet(false, true); } - private MeshMQProducer getSpiMeshMQProducer() { - ServiceLoader meshMQProducerServiceLoader = ServiceLoader.load(MeshMQProducer.class); - if (meshMQProducerServiceLoader.iterator().hasNext()) { - return meshMQProducerServiceLoader.iterator().next(); + public synchronized void init(Properties keyValue) throws Exception { + if (inited.get()) { + return; } - return null; + meshMQProducer.init(keyValue); + + inited.compareAndSet(false, true); } public synchronized void start() throws Exception { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java new file mode 100644 index 0000000000..b11495341b --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.eventmesh.runtime.core.plugin; + +import org.apache.eventmesh.api.consumer.MeshMQPushConsumer; +import org.apache.eventmesh.api.producer.MeshMQProducer; +import org.apache.eventmesh.spi.EventMeshExtensionFactory; + +public class PluginFactory { + + public static MeshMQProducer getMeshMQProducer(String connectorPluginName) { + return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName); + } + + public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) { + return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName); + } + + private static T getPlugin(Class pluginType, String pluginName) { + return EventMeshExtensionFactory.getExtension(pluginType, pluginName); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java index 8620e682ab..ef051b03f4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java @@ -66,13 +66,15 @@ public class EventMeshConsumer { private ConsumerGroupConf consumerGroupConf; - private MQConsumerWrapper persistentMqConsumer = new MQConsumerWrapper(); + private MQConsumerWrapper persistentMqConsumer; - private MQConsumerWrapper broadcastMqConsumer = new MQConsumerWrapper(); + private MQConsumerWrapper broadcastMqConsumer; public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConf) { this.eventMeshHTTPServer = eventMeshHTTPServer; this.consumerGroupConf = consumerGroupConf; + this.persistentMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType); + this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType); } private MessageHandler httpMessageHandler; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java index cf41ca2b9e..fe32180879 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java @@ -69,7 +69,7 @@ public boolean reply(final SendMessageContext sendMsgContext, final SendCallback return true; } - protected MQProducerWrapper mqProducerWrapper = new MQProducerWrapper(); + protected MQProducerWrapper mqProducerWrapper; public MQProducerWrapper getMqProducerWrapper() { return mqProducerWrapper; @@ -85,7 +85,7 @@ public synchronized void init(EventMeshHTTPConfiguration eventMeshHttpConfigurat //TODO for defibus keyValue.put("eventMeshIDC", eventMeshHttpConfiguration.eventMeshIDC); - + mqProducerWrapper = new MQProducerWrapper(eventMeshHttpConfiguration.eventMeshConnectorPluginType); mqProducerWrapper.init(keyValue); inited.compareAndSet(false, true); logger.info("EventMeshProducer [{}] inited.............", producerGroupConfig.getGroupName()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index 5829e4941a..310ea6dee2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -50,6 +50,7 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper; import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper; +import org.apache.eventmesh.runtime.core.plugin.PluginFactory; import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext; @@ -96,14 +97,16 @@ public class ClientGroupWrapper { public AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE); - private MQConsumerWrapper persistentMsgConsumer = new MQConsumerWrapper(); + private MQConsumerWrapper persistentMsgConsumer; - private MQConsumerWrapper broadCastMsgConsumer = new MQConsumerWrapper(); + private MQConsumerWrapper broadCastMsgConsumer; private ConcurrentHashMap> topic2sessionInGroupMapping = new ConcurrentHashMap>(); public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE); + private MQProducerWrapper mqProducerWrapper; + public ClientGroupWrapper(String sysId, String producerGroup, String consumerGroup, EventMeshTCPServer eventMeshTCPServer, DownstreamDispatchStrategy downstreamDispatchStrategy) { @@ -115,6 +118,9 @@ public ClientGroupWrapper(String sysId, String producerGroup, String consumerGro this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer(); this.eventMeshTcpMonitor = eventMeshTCPServer.getEventMeshTcpMonitor(); this.downstreamDispatchStrategy = downstreamDispatchStrategy; + this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType); + this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType); + this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType); } public ConcurrentHashMap> getTopic2sessionInGroupMapping() { @@ -163,8 +169,6 @@ public void onException(OnExceptionContext context) { return true; } - private MQProducerWrapper mqProducerWrapper = new MQProducerWrapper(); - public MQProducerWrapper getMqProducerWrapper() { return mqProducerWrapper; } diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java index 96e054bd0c..6aea9db11d 100644 --- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.spi; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; public enum EventMeshExtensionFactory { ; diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java index 740ecb3f9d..89696e04da 100644 --- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java @@ -17,6 +17,9 @@ package org.apache.eventmesh.spi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.InputStream; import java.net.URL; @@ -27,6 +30,8 @@ public enum EventMeshExtensionLoader { ; + private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionLoader.class); + private static final ConcurrentHashMap, ConcurrentHashMap>> EXTENSION_CLASS_LOAD_CACHE = new ConcurrentHashMap<>(16); private static final ConcurrentHashMap EXTENSION_INSTANCE_CACHE = new ConcurrentHashMap<>(16); @@ -54,7 +59,9 @@ private static void initializeExtension(Class extensionType, String exten } Class aClass = extensionClassMap.get(extensionName); try { - EXTENSION_INSTANCE_CACHE.put(extensionName, aClass.newInstance()); + Object extensionObj = aClass.newInstance(); + logger.info("initialize extension instance success, extensionType: {}, extensionName: {}", extensionType, extensionName); + EXTENSION_INSTANCE_CACHE.put(extensionName, extensionObj); } catch (InstantiationException | IllegalAccessException e) { throw new ExtensionException("Extension initialize error", e); } @@ -87,6 +94,7 @@ private static void loadResources(URL url, Class extensionType) throws IO String extensionClassStr = (String) extensionClass; try { Class targetClass = Class.forName(extensionClassStr); + logger.info("load extension class success, extensionType: {}, extensionClass: {}", extensionType, targetClass); if (!extensionType.isAssignableFrom(targetClass)) { throw new ExtensionException( String.format("class: %s is not subClass of %s", targetClass, extensionType)); diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java index 9d3af8ffe4..329f2bc648 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java @@ -34,10 +34,15 @@ public class SyncRequestInstance { public static void main(String[] args) throws Exception { LiteProducer liteProducer = null; + String eventMeshIPPort = "127.0.0.1:10105"; + String topic = "EventMesh.SyncRequestInstance"; try { - String eventMeshIPPort = args[0]; - - final String topic = args[1]; + if (args.length > 0 && StringUtils.isNotBlank(args[0])) { + eventMeshIPPort = args[0]; + } + if (args.length > 1 && StringUtils.isNotBlank(args[1])) { + topic = args[1]; + } if (StringUtils.isBlank(eventMeshIPPort)) { // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105