diff --git a/connectors/rocketmq-connect-metrics-exporter/pom.xml b/connectors/rocketmq-connect-metrics-exporter/pom.xml new file mode 100644 index 00000000..9b64e5c7 --- /dev/null +++ b/connectors/rocketmq-connect-metrics-exporter/pom.xml @@ -0,0 +1,47 @@ + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-metrics-exporter + 1.0-SNAPSHOT + rocketmq-connect-metrics-exporter + + + + 1.8 + 1.8 + UTF-8 + UTF-8 + 0.1.3 + + + + + io.openmessaging + openmessaging-connector + ${openmessaging.connector.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + + + + \ No newline at end of file diff --git a/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/connector/MetricsExportSinkConnector.java b/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/connector/MetricsExportSinkConnector.java new file mode 100644 index 00000000..79ca8256 --- /dev/null +++ b/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/connector/MetricsExportSinkConnector.java @@ -0,0 +1,41 @@ +package org.apache.rocket.connect.metrics.export.sink.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocket.connect.metrics.export.sink.util.ServiceProvicerUtil; + +public class MetricsExportSinkConnector extends SinkConnector { + private KeyValue config; + + private List metricsExporters; + { + metricsExporters = ServiceProvicerUtil.getMetricsExporterServices(); + } + + @Override public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + configs.add(config); + return configs; + } + + @Override public Class taskClass() { + return MetricsExportSinkTask.class; + } + + @Override public void start(KeyValue config) { + this.config = config; + } + + @Override public void stop() { + this.config = null; + } + + @Override public void validate(KeyValue config) { + for (MetricsExporter exporter: metricsExporters) { + exporter.validate(config); + } + } +} diff --git a/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/connector/MetricsExportSinkTask.java b/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/connector/MetricsExportSinkTask.java new file mode 100644 index 00000000..18b023c5 --- /dev/null +++ b/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/connector/MetricsExportSinkTask.java @@ -0,0 +1,37 @@ +package org.apache.rocket.connect.metrics.export.sink.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; +import java.util.List; +import org.apache.rocket.connect.metrics.export.sink.util.ServiceProvicerUtil; + + +public class MetricsExportSinkTask extends SinkTask { + private List metricsExporters; + + @Override public void put(List sinkRecords) throws ConnectException { + for (MetricsExporter exporter : metricsExporters) { + exporter.export(sinkRecords); + } + } + + @Override public void start(KeyValue config) { + for (MetricsExporter exporter : metricsExporters) { + exporter.start(config); + } + } + + @Override public void stop() { + for (MetricsExporter exporter : metricsExporters) { + exporter.stop(); + } + } + + @Override public void init(SinkTaskContext sinkTaskContext) { + super.init(sinkTaskContext); + metricsExporters = ServiceProvicerUtil.getMetricsExporterServices(); + } +} diff --git a/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/connector/MetricsExporter.java b/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/connector/MetricsExporter.java new file mode 100644 index 00000000..427f3ff5 --- /dev/null +++ b/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/connector/MetricsExporter.java @@ -0,0 +1,16 @@ +package org.apache.rocket.connect.metrics.export.sink.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.List; + +public interface MetricsExporter { + void export(List sinkRecords); + + void validate(KeyValue config); + + void start(KeyValue config); + + void stop(); + +} diff --git a/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/util/ServiceProvicerUtil.java b/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/util/ServiceProvicerUtil.java new file mode 100644 index 00000000..4caaa552 --- /dev/null +++ b/connectors/rocketmq-connect-metrics-exporter/src/main/java/org/apache/rocket/connect/metrics/export/sink/util/ServiceProvicerUtil.java @@ -0,0 +1,23 @@ +package org.apache.rocket.connect.metrics.export.sink.util; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceLoader; +import org.apache.rocket.connect.metrics.export.sink.connector.MetricsExporter; + +/** + * @author: ming + */ +public class ServiceProvicerUtil { + + public static List getMetricsExporterServices(){ + List metricsExporterList = new ArrayList<>(); + ServiceLoader metricsExporters = ServiceLoader.load(MetricsExporter.class); + Iterator iterator = metricsExporters.iterator(); + while (iterator.hasNext()){ + metricsExporterList.add(iterator.next()); + } + return metricsExporterList; + } +} diff --git a/connectors/rocketmq-connect-metrics-exporter/src/main/resources/META-INF/service/org.apache.rocket.connect.metrics.export.sink.connector.MetricsExport b/connectors/rocketmq-connect-metrics-exporter/src/main/resources/META-INF/service/org.apache.rocket.connect.metrics.export.sink.connector.MetricsExport new file mode 100644 index 00000000..e69de29b