Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor - separate DDL event handlers #114

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<properties>
<avro.version>1.8.2</avro.version>
<bigquery.version>1.78.0</bigquery.version>
<cdap.version>6.4.0-SNAPSHOT</cdap.version>
<cdap.version>6.4.0</cdap.version>
<delta.version>0.4.0-SNAPSHOT</delta.version>
<failsafe.version>2.3.3</failsafe.version>
<gcs.version>1.78.0</gcs.version>
Expand Down
210 changes: 33 additions & 177 deletions src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java

Large diffs are not rendered by default.

53 changes: 53 additions & 0 deletions src/main/java/io/cdap/delta/bigquery/event/AlterTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.cdap.delta.bigquery.event;

import com.google.cloud.bigquery.*;
import com.google.cloud.storage.Bucket;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.delta.api.DDLEvent;
import io.cdap.delta.api.DeltaFailureException;
import io.cdap.delta.bigquery.BigQueryEventConsumer;
import io.cdap.delta.bigquery.BigQueryTableState;
import io.cdap.delta.bigquery.Schemas;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class AlterTable extends DDLEventHandler {

public AlterTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }

@Override
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {

// need to flush any changes before altering the table to ensure all changes before the schema change
// are in the table when it is altered.
consumer.flush();
// after a flush, the staging table will be gone, so no need to alter it.
TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName);
Table table = config.bigQuery.getTable(tableId);
List<String> primaryKeys = event.getPrimaryKey();
Clustering clustering = config.maxClusteringColumns <= 0 ? null :
Clustering.newBuilder()
.setFields(primaryKeys.subList(0, Math.min(config.maxClusteringColumns, primaryKeys.size())))
.build();
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
.setSchema(Schemas.convert(BigQueryEventConsumer.addSupplementaryColumnsToTargetSchema(event.getSchema())))
.setClustering(clustering)
.build();
TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition);
if (config.encryptionConfig != null) {
builder.setEncryptionConfiguration(config.encryptionConfig);
}
TableInfo tableInfo = builder.build();
if (table == null) {
config.bigQuery.create(tableInfo);
} else {
config.bigQuery.update(tableInfo);
}

consumer.updatePrimaryKeys(tableId, primaryKeys);
}


}
35 changes: 35 additions & 0 deletions src/main/java/io/cdap/delta/bigquery/event/CreateDatabase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.cdap.delta.bigquery.event;

import com.google.cloud.bigquery.*;
import com.google.cloud.storage.Bucket;
import io.cdap.delta.api.DDLEvent;
import io.cdap.delta.api.DeltaFailureException;
import io.cdap.delta.bigquery.BigQueryEventConsumer;
import io.cdap.delta.bigquery.BigQueryTarget;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class CreateDatabase extends DDLEventHandler {

public CreateDatabase(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }

@Override
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
DatasetId datasetId = DatasetId.of(config.project, normalizedDatabaseName);
if (config.bigQuery.getDataset(datasetId) == null) {
DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(config.bucket.getLocation()).build();
try {
config.bigQuery.create(datasetInfo);
} catch (BigQueryException e) {
// It is possible that in multiple worker instances scenario
// dataset is created by another worker instance after this worker instance
// determined that dataset does not exists. Ignore error if dataset is created.
if (e.getCode() != BigQueryTarget.CONFLICT) {
throw e;
}
}
}
}
}
54 changes: 54 additions & 0 deletions src/main/java/io/cdap/delta/bigquery/event/CreateTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.cdap.delta.bigquery.event;

import com.google.cloud.bigquery.*;
import com.google.cloud.storage.Bucket;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.delta.api.DDLEvent;
import io.cdap.delta.api.DeltaFailureException;
import io.cdap.delta.bigquery.BigQueryEventConsumer;
import io.cdap.delta.bigquery.Schemas;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class CreateTable extends DDLEventHandler {

public CreateTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }

@Override
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName);
Table table = config.bigQuery.getTable(tableId);
// SNAPSHOT data is directly loaded in the target table. Check if any such direct load was in progress
// for the current table when target received CREATE_TABLE ddl. This indicates that the snapshot was abandoned
// because of some failure scenario. Delete the existing table if any.
byte[] state = config.context.getState(String.format(BigQueryEventConsumer.DIRECT_LOADING_IN_PROGRESS_PREFIX + "%s-%s",
normalizedDatabaseName, normalizedTableName));
if (table != null && state != null && Bytes.toBoolean(state)) {
config.bigQuery.delete(tableId);
}
List<String> primaryKeys = event.getPrimaryKey();
consumer.updatePrimaryKeys(tableId, primaryKeys);
// TODO: check schema of table if it exists already
if (table == null) {
List<String> clusteringSupportedKeys = BigQueryEventConsumer.getClusteringSupportedKeys(primaryKeys, event.getSchema());
Clustering clustering = config.maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null :
Clustering.newBuilder()
.setFields(clusteringSupportedKeys.subList(0, Math.min(config.maxClusteringColumns,
clusteringSupportedKeys.size())))
.build();
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
.setSchema(Schemas.convert(BigQueryEventConsumer.addSupplementaryColumnsToTargetSchema(event.getSchema())))
.setClustering(clustering)
.build();

TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition);
if (config.encryptionConfig != null) {
builder.setEncryptionConfiguration(config.encryptionConfig);
}
TableInfo tableInfo = builder.build();
config.bigQuery.create(tableInfo);
}
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/cdap/delta/bigquery/event/DDLEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.cdap.delta.bigquery.event;

import io.cdap.delta.api.DDLEvent;
import io.cdap.delta.api.DeltaFailureException;
import io.cdap.delta.bigquery.BigQueryEventConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public abstract class DDLEventHandler {

final static protected Logger LOG = LoggerFactory.getLogger(DDLEventHandler.class);

final protected BigQueryEventConsumer consumer;
final protected EventHandlerConfig config;

public DDLEventHandler(BigQueryEventConsumer consumer, EventHandlerConfig config) {
this.consumer = consumer;
this.config = config;
}

public abstract void handleDDL(
DDLEvent event,
String normalizedDatabaseName,
String normalizedTableName,
String normalizedStagingTableName)
throws IOException, DeltaFailureException, InterruptedException
;

}
31 changes: 31 additions & 0 deletions src/main/java/io/cdap/delta/bigquery/event/DropDatabase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.cdap.delta.bigquery.event;

import com.google.cloud.bigquery.DatasetId;
import io.cdap.delta.api.DDLEvent;
import io.cdap.delta.api.DeltaFailureException;
import io.cdap.delta.bigquery.BigQueryEventConsumer;

import java.io.IOException;

public class DropDatabase extends DDLEventHandler {

public DropDatabase(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }

@Override
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
DatasetId datasetId = DatasetId.of(config.project, normalizedDatabaseName);
config.primaryKeyStore.clear();
if (config.bigQuery.getDataset(datasetId) != null) {
if (config.requireManualDrops) {
String message = String.format("Encountered an event to drop dataset '%s' in project '%s', " +
"but the target is configured to require manual drops. " +
"Please manually drop the dataset to make progress.",
normalizedDatabaseName, config.project);
LOG.error(message);
throw new RuntimeException(message);
}
config.bigQuery.delete(datasetId);
}

}
}
45 changes: 45 additions & 0 deletions src/main/java/io/cdap/delta/bigquery/event/DropTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.cdap.delta.bigquery.event;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.storage.Bucket;
import io.cdap.delta.api.DDLEvent;
import io.cdap.delta.api.DeltaFailureException;
import io.cdap.delta.bigquery.BigQueryEventConsumer;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class DropTable extends DDLEventHandler {

public DropTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }

@Override
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
// need to flush changes before dropping the table, otherwise the next flush will write data that
// shouldn't exist
consumer.flush();
TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName);
config.primaryKeyStore.remove(tableId);
Table table = config.bigQuery.getTable(tableId);
if (table != null) {
if (config.requireManualDrops) {
String message = String.format("Encountered an event to drop table '%s' in dataset '%s' in project '%s', " +
"but the target is configured to require manual drops. " +
"Please manually drop the table to make progress.",
normalizedTableName, normalizedDatabaseName, config.project);
LOG.error(message);
throw new RuntimeException(message);
}
config.bigQuery.delete(tableId);
}
TableId stagingTableId = TableId.of(config.project, normalizedDatabaseName, normalizedStagingTableName);
Table stagingTable = config.bigQuery.getTable(stagingTableId);
if (stagingTable != null) {
config.bigQuery.delete(stagingTableId);
}

}
}
37 changes: 37 additions & 0 deletions src/main/java/io/cdap/delta/bigquery/event/EventDispatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.cdap.delta.bigquery.event;

import io.cdap.delta.api.DDLEvent;
import io.cdap.delta.api.DDLOperation;
import io.cdap.delta.api.DeltaFailureException;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class EventDispatcher {

private final Map<DDLOperation.Type, DDLEventHandler> handlers = new HashMap<>();

public EventDispatcher(
DropDatabase dropDatabase,
CreateDatabase createDatabase,
DropTable dropTable,
CreateTable createTable,
AlterTable alterTable,
TruncateTable truncateTable,
RenameTable renameTable
) {
handlers.put(DDLOperation.Type.DROP_DATABASE, dropDatabase);
handlers.put(DDLOperation.Type.CREATE_DATABASE, createDatabase);
handlers.put(DDLOperation.Type.DROP_TABLE, dropTable);
handlers.put(DDLOperation.Type.CREATE_TABLE, createTable);
handlers.put(DDLOperation.Type.ALTER_TABLE, alterTable);
handlers.put(DDLOperation.Type.TRUNCATE_TABLE, truncateTable);
handlers.put(DDLOperation.Type.RENAME_TABLE, renameTable);
}

public DDLEventHandler handler(DDLEvent event) {
return handlers.get(event.getOperation().getType());
}

}
44 changes: 44 additions & 0 deletions src/main/java/io/cdap/delta/bigquery/event/EventHandlerConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.cdap.delta.bigquery.event;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.EncryptionConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.storage.Bucket;
import io.cdap.delta.api.DeltaTargetContext;

import java.util.List;
import java.util.Map;

public class EventHandlerConfig {

public final DeltaTargetContext context;
public final BigQuery bigQuery;
public final String project;
public final Bucket bucket;
public final Map<TableId, List<String>> primaryKeyStore;
public final boolean requireManualDrops;
public final int maxClusteringColumns;
public final EncryptionConfiguration encryptionConfig;

public EventHandlerConfig(
DeltaTargetContext context,
BigQuery bigQuery,
String project,
Bucket bucket,
Map<TableId,
List<String>> primaryKeyStore,
boolean requireManualDrops,
int maxClusteringColumns,
EncryptionConfiguration encryptionConfig

) {
this.context = context;
this.bigQuery = bigQuery;
this.project = project;
this.bucket = bucket;
this.primaryKeyStore = primaryKeyStore;
this.requireManualDrops = requireManualDrops;
this.maxClusteringColumns = maxClusteringColumns;
this.encryptionConfig = encryptionConfig;
}
}
27 changes: 27 additions & 0 deletions src/main/java/io/cdap/delta/bigquery/event/RenameTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.cdap.delta.bigquery.event;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.storage.Bucket;
import io.cdap.delta.api.DDLEvent;
import io.cdap.delta.api.DeltaFailureException;
import io.cdap.delta.bigquery.BigQueryEventConsumer;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class RenameTable extends DDLEventHandler {

public RenameTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }

@Override
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
// TODO: flush changes, execute a copy job, delete previous table, drop old staging table, remove old entry
// in primaryKeyStore, put new entry in primaryKeyStore
LOG.warn("Rename DDL events are not supported. Ignoring rename event in database {} from table {} to table {}.",
event.getOperation().getDatabaseName(), event.getOperation().getPrevTableName(),
event.getOperation().getTableName());

}
}
Loading