Skip to content

Commit

Permalink
[Improve]Improve doris connector
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 committed Jun 20, 2024
1 parent 3bfdef8 commit dfb436b
Show file tree
Hide file tree
Showing 165 changed files with 8,747 additions and 5,001 deletions.
68 changes: 52 additions & 16 deletions connectors/rocketmq-connect-doris/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@
<!--rocket connect api-->
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
<openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>

<!--fast json-->
<fastjson.version>1.2.83</fastjson.version>

<metrics.version>4.2.25</metrics.version>
<jackson.version>2.13.2.1</jackson.version>
<rocketmq-connect.version>0.0.1-SNAPSHOT</rocketmq-connect.version>
<debezium.version>1.9.8.Final</debezium.version>
<geometry.version>2.2.0</geometry.version>
<commons-io.version>2.3</commons-io.version>
<mysql-connector.version>8.0.33</mysql-connector.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -94,27 +97,60 @@
<scope>test</scope>
</dependency>

<!--fast json version-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.9</version>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-runtime</artifactId>
<version>${rocketmq-connect.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>${metrics.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
<version>${geometry.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector.version}</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.rocketmq.connect.doris;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.connect.doris.cfg.DorisSinkConnectorConfig;
import org.apache.rocketmq.connect.doris.utils.ConfigCheckUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DorisSinkConnector extends SinkConnector {
private static final Logger LOG = LoggerFactory.getLogger(DorisSinkConnector.class);
private KeyValue keyValue;

@Override
public void start(KeyValue keyValue) {
this.keyValue = DorisSinkConnectorConfig.convertToLowercase(keyValue);
DorisSinkConnectorConfig.setDefaultValues(this.keyValue);
}

/**
* stop DorisSinkConnector
*/
@Override
public void stop() {
LOG.info("doris sink connector stop");
}

@Override
public Class<? extends Task> taskClass() {
return DorisSinkTask.class;
}

@Override
public List<KeyValue> taskConfigs(final int maxTasks) {
List<KeyValue> taskConfigs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; i++) {
keyValue.put("task_id", i + "");
taskConfigs.add(this.keyValue);
}
return taskConfigs;
}

@Override
public void validate(KeyValue config) {
LOG.info("start validate connector config");
ConfigCheckUtils.validateConfig(config);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.rocketmq.connect.doris;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.errors.ConnectException;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.connect.doris.service.DorisSinkService;
import org.apache.rocketmq.connect.doris.service.DorisSinkServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DorisSinkTask extends SinkTask {
private static final Logger LOG = LoggerFactory.getLogger(DorisSinkTask.class);
private DorisSinkService sink;

@Override
public void start(KeyValue keyValue) {
LOG.info("rocketmq doris sink task start");
this.sink = DorisSinkServiceFactory.getDorisSinkService(keyValue);
}

@Override
public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
LOG.info("Read {} records from Kafka", sinkRecords.size());
sink.insert(sinkRecords);
}

/**
* Support doris's two-phase commit
*/
@Override
public void flush(Map<RecordPartition, RecordOffset> currentOffsets) throws ConnectException {
if (sink == null || sink.getDorisWriterSize() == 0) {
return;
}
sink.commit(currentOffsets);
}

@Override
public void stop() {
LOG.info("rocketmq doris sink task stopped");
}
}
Loading

0 comments on commit dfb436b

Please sign in to comment.