Skip to content

Commit

Permalink
Merge pull request data-integrations#394 from cloudsufi/develop-postg…
Browse files Browse the repository at this point in the history
…reSQL-operations

Update and Upsert functionality for PostgreSQL plugins.
  • Loading branch information
vikasrathee-cs authored May 29, 2023
2 parents 9864d63 + 0df5053 commit 7242f30
Show file tree
Hide file tree
Showing 30 changed files with 880 additions and 19 deletions.
13 changes: 13 additions & 0 deletions aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
},
{
"widget-type": "hidden",
"label": "Operation Name",
"name": "operationName",
"widget-attributes" : {
"default": "insert"
}
},
{
"widget-type": "hidden",
"label": "Table Key",
"name": "relationTableKey"
}
]
},
Expand Down
6 changes: 6 additions & 0 deletions aurora-postgresql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.cdap.plugin</groupId>
<artifactId>postgresql-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand All @@ -99,6 +104,7 @@
<instructions>
<_exportcontents>
io.cdap.plugin.auroradb.postgres.*;
io.cdap.plugin.postgres.*;
io.cdap.plugin.db.source.*;
io.cdap.plugin.db.sink.*;
org.apache.commons.lang;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.db.DBRecord;
import io.cdap.plugin.db.config.DBSpecificSinkConfig;
import io.cdap.plugin.db.sink.AbstractDBSink;
import io.cdap.plugin.postgres.PostgresDBRecord;
import io.cdap.plugin.postgres.PostgresETLDBOutputFormat;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -48,6 +55,18 @@ public AuroraPostgresSink(AuroraPostgresSinkConfig auroraPostgresSinkConfig) {
super(auroraPostgresSinkConfig);
this.auroraPostgresSinkConfig = auroraPostgresSinkConfig;
}
@Override
protected void addOutputContext(BatchSinkContext context) {
context.addOutput(Output.of(auroraPostgresSinkConfig.getReferenceName(),
new SinkOutputFormatProvider(PostgresETLDBOutputFormat.class,
getConfiguration())));
}

@Override
protected DBRecord getDBRecord(StructuredRecord output) {
return new PostgresDBRecord(output, columnTypes, auroraPostgresSinkConfig.getOperationName(),
auroraPostgresSinkConfig.getRelationTableKey());
}

@Override
protected void setColumnsInfo(List<Schema.Field> fields) {
Expand Down
29 changes: 29 additions & 0 deletions aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,35 @@
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
},
{
"widget-type": "radio-group",
"label": "Operation Name",
"name": "operationName",
"widget-attributes": {
"default": "insert",
"layout": "inline",
"options": [
{
"id": "insert",
"label": "INSERT"
},
{
"id": "update",
"label": "UPDATE"
},
{
"id": "upsert",
"label": "UPSERT"
}
]
}
},
{
"name": "relationTableKey",
"widget-type": "csv",
"label": "Table Key",
"widget-attributes": {}
}
]
},
Expand Down
13 changes: 13 additions & 0 deletions cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
},
{
"widget-type": "hidden",
"label": "Operation Name",
"name": "operationName",
"widget-attributes" : {
"default": "insert"
}
},
{
"widget-type": "hidden",
"label": "Table Key",
"name": "relationTableKey"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
Expand All @@ -34,12 +35,14 @@
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.db.DBRecord;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.config.AbstractDBSpecificSinkConfig;
import io.cdap.plugin.db.sink.AbstractDBSink;
import io.cdap.plugin.db.sink.FieldsValidator;
import io.cdap.plugin.postgres.PostgresDBRecord;
import io.cdap.plugin.postgres.PostgresETLDBOutputFormat;
import io.cdap.plugin.postgres.PostgresFieldsValidator;
import io.cdap.plugin.postgres.PostgresSchemaReader;
import io.cdap.plugin.util.CloudSQLUtil;
Expand Down Expand Up @@ -88,10 +91,17 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
protected SchemaReader getSchemaReader() {
return new PostgresSchemaReader();
}
@Override
protected void addOutputContext(BatchSinkContext context) {
context.addOutput(Output.of(cloudsqlPostgresqlSinkConfig.getReferenceName(),
new SinkOutputFormatProvider(PostgresETLDBOutputFormat.class,
getConfiguration())));
}

@Override
protected DBRecord getDBRecord(StructuredRecord output) {
return new PostgresDBRecord(output, columnTypes);
return new PostgresDBRecord(output, columnTypes, cloudsqlPostgresqlSinkConfig.getOperationName(),
cloudsqlPostgresqlSinkConfig.getRelationTableKey());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,35 @@
"widget-type": "textbox",
"label": "Schema Name",
"name": "dbSchemaName"
},
{
"widget-type": "radio-group",
"label": "Operation Name",
"name": "operationName",
"widget-attributes": {
"default": "insert",
"layout": "inline",
"options": [
{
"id": "insert",
"label": "INSERT"
},
{
"id": "update",
"label": "UPDATE"
},
{
"id": "upsert",
"label": "UPSERT"
}
]
}
},
{
"name": "relationTableKey",
"widget-type": "csv",
"label": "Table Key",
"widget-attributes": {}
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class ConnectionConfigAccessor {
private static final String INIT_QUERIES = "io.cdap.plugin.db.init.queries";
public static final String AUTO_COMMIT_ENABLED = "io.cdap.plugin.db.output.autocommit.enabled";
public static final String FETCH_SIZE = "io.cdap.plugin.db.fetch.size";
public static final String OPERATION_NAME = "io.cdap.plugin.db.operation.name";
public static final String RELATION_TABLE_KEY = "io.cdap.plugin.db.relation.table.key";

private static final Gson GSON = new Gson();
private static final Type STRING_MAP_TYPE = new TypeToken<Map<String, String>>() { }.getType();
Expand Down Expand Up @@ -106,6 +108,13 @@ public void setFetchSize(Integer fetchSize) {
public Integer getFetchSize() {
return configuration.getInt(FETCH_SIZE, 0);
}
public void setOperationName(Operation operationName) {
configuration.set(OPERATION_NAME, operationName.toString());
}

public void setRelationTableKey(String relationTableKey) {
configuration.set(RELATION_TABLE_KEY, relationTableKey);
}

public Configuration getConfiguration() {
return configuration;
Expand Down
Loading

0 comments on commit 7242f30

Please sign in to comment.