diff --git a/pom.xml b/pom.xml index ee54b4e..60be627 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,7 @@ 1.8.2 1.78.0 - 6.4.0-SNAPSHOT + 6.4.0 0.4.0-SNAPSHOT 2.3.3 1.78.0 diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java b/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java index c07cb9f..8574420 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java @@ -17,10 +17,7 @@ package io.cdap.delta.bigquery; import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Clustering; -import com.google.cloud.bigquery.DatasetId; -import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.EncryptionConfiguration; import com.google.cloud.bigquery.FormatOptions; import com.google.cloud.bigquery.Job; @@ -36,7 +33,6 @@ import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; -import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.api.data.schema.Schema; @@ -50,6 +46,7 @@ import io.cdap.delta.api.ReplicationError; import io.cdap.delta.api.Sequenced; import io.cdap.delta.api.SourceProperties; +import io.cdap.delta.bigquery.event.*; import net.jodah.failsafe.Failsafe; import net.jodah.failsafe.FailsafeException; import net.jodah.failsafe.RetryPolicy; @@ -156,7 +153,7 @@ public class BigQueryEventConsumer implements EventConsumer { private static final Logger LOG = LoggerFactory.getLogger(BigQueryEventConsumer.class); private static final Gson GSON = new Gson(); private static final String RETAIN_STAGING_TABLE = "retain.staging.table"; - private static final String DIRECT_LOADING_IN_PROGRESS_PREFIX = "bigquery-direct-load-in-progress-"; + public static final String DIRECT_LOADING_IN_PROGRESS_PREFIX = "bigquery-direct-load-in-progress-"; private final DeltaTargetContext context; private final BigQuery bigQuery; @@ -183,6 +180,8 @@ public class BigQueryEventConsumer implements EventConsumer { private Offset latestOffset; private long latestSequenceNum; private Exception flushException; + + private EventDispatcher dispatcher; // have to keep all the records in memory in case there is a failure writing to GCS // cannot write to a temporary file on local disk either in case there is a failure writing to disk // Without keeping the entire batch in memory, there would be no way to recover the records that failed to write @@ -191,6 +190,25 @@ public class BigQueryEventConsumer implements EventConsumer { String project, int loadIntervalSeconds, String stagingTablePrefix, boolean requireManualDrops, @Nullable EncryptionConfiguration encryptionConfig, @Nullable Long baseRetryDelay, @Nullable String datasetName) { + + this.primaryKeyStore = new HashMap<>(); + + + String maxClusteringColumnsStr = context.getRuntimeArguments().get("gcp.bigquery.max.clustering.columns"); + this.maxClusteringColumns = maxClusteringColumnsStr == null ? 4 : Integer.parseInt(maxClusteringColumnsStr); + + EventHandlerConfig handlerConfig = new EventHandlerConfig(context, bigQuery, project, bucket, primaryKeyStore, requireManualDrops, maxClusteringColumns, encryptionConfig); + + this.dispatcher = new EventDispatcher( + new DropDatabase(this, handlerConfig), + new CreateDatabase(this, handlerConfig), + new DropTable(this, handlerConfig), + new CreateTable(this, handlerConfig), + new AlterTable(this, handlerConfig), + new TruncateTable(this, handlerConfig), + new RenameTable(this, handlerConfig) + ); + this.context = context; this.bigQuery = bigQuery; this.loadIntervalSeconds = loadIntervalSeconds; @@ -203,7 +221,6 @@ public class BigQueryEventConsumer implements EventConsumer { // these maps are only accessed in synchronized methods so they do not need to be thread safe. this.latestMergedSequence = new HashMap<>(); this.latestSeenSequence = new HashMap<>(); - this.primaryKeyStore = new HashMap<>(); this.commitRetryPolicy = new RetryPolicy<>() .withMaxAttempts(Integer.MAX_VALUE) .withMaxDuration(Duration.of(5, ChronoUnit.MINUTES)) @@ -221,10 +238,8 @@ public class BigQueryEventConsumer implements EventConsumer { String.format("cdap/delta/%s/", context.getApplicationName()), context, executorService); this.baseRetryDelay = baseRetryDelay == null ? 10L : baseRetryDelay; - String maxClusteringColumnsStr = context.getRuntimeArguments().get("gcp.bigquery.max.clustering.columns"); // current max clustering columns is set as 4 in big query side, use that as default max value // https://cloud.google.com/bigquery/docs/creating-clustered-tables#limitations - this.maxClusteringColumns = maxClusteringColumnsStr == null ? 4 : Integer.parseInt(maxClusteringColumnsStr); this.sourceRowIdSupported = context.getSourceProperties() != null && context.getSourceProperties().isRowIdSupported(); this.sourceEventOrdering = context.getSourceProperties() == null ? SourceProperties.Ordering.ORDERED : @@ -277,7 +292,8 @@ public synchronized void applyDDL(Sequenced sequencedEvent) throws Exc String normalizedStagingTableName = normalizedTableName == null ? null : BigQueryUtils.normalize(stagingTablePrefix + normalizedTableName); - runWithRetries(ctx -> handleDDL(event, normalizedDatabaseName, normalizedTableName, normalizedStagingTableName), + + runWithRetries(ctx -> dispatcher.handler(event).handleDDL(event, normalizedDatabaseName, normalizedTableName, normalizedStagingTableName), baseRetryDelay, normalizedDatabaseName, event.getOperation().getSchemaName(), @@ -320,167 +336,7 @@ public synchronized void applyDDL(Sequenced sequencedEvent) throws Exc } } - private void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, - String normalizedStagingTableName) - throws IOException, DeltaFailureException, InterruptedException { - - switch (event.getOperation().getType()) { - case CREATE_DATABASE: - DatasetId datasetId = DatasetId.of(project, normalizedDatabaseName); - if (bigQuery.getDataset(datasetId) == null) { - DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(bucket.getLocation()).build(); - try { - bigQuery.create(datasetInfo); - } catch (BigQueryException e) { - // It is possible that in multiple worker instances scenario - // dataset is created by another worker instance after this worker instance - // determined that dataset does not exists. Ignore error if dataset is created. - if (e.getCode() != BigQueryTarget.CONFLICT) { - throw e; - } - } - } - break; - case DROP_DATABASE: - datasetId = DatasetId.of(project, normalizedDatabaseName); - primaryKeyStore.clear(); - if (bigQuery.getDataset(datasetId) != null) { - if (requireManualDrops) { - String message = String.format("Encountered an event to drop dataset '%s' in project '%s', " + - "but the target is configured to require manual drops. " + - "Please manually drop the dataset to make progress.", - normalizedDatabaseName, project); - LOG.error(message); - throw new RuntimeException(message); - } - bigQuery.delete(datasetId); - } - break; - case CREATE_TABLE: - TableId tableId = TableId.of(project, normalizedDatabaseName, normalizedTableName); - Table table = bigQuery.getTable(tableId); - // SNAPSHOT data is directly loaded in the target table. Check if any such direct load was in progress - // for the current table when target received CREATE_TABLE ddl. This indicates that the snapshot was abandoned - // because of some failure scenario. Delete the existing table if any. - byte[] state = context.getState(String.format(DIRECT_LOADING_IN_PROGRESS_PREFIX + "%s-%s", - normalizedDatabaseName, normalizedTableName)); - if (table != null && state != null && Bytes.toBoolean(state)) { - bigQuery.delete(tableId); - } - List primaryKeys = event.getPrimaryKey(); - updatePrimaryKeys(tableId, primaryKeys); - // TODO: check schema of table if it exists already - if (table == null) { - List clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema()); - Clustering clustering = maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null : - Clustering.newBuilder() - .setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns, - clusteringSupportedKeys.size()))) - .build(); - TableDefinition tableDefinition = StandardTableDefinition.newBuilder() - .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema()))) - .setClustering(clustering) - .build(); - - TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); - if (encryptionConfig != null) { - builder.setEncryptionConfiguration(encryptionConfig); - } - TableInfo tableInfo = builder.build(); - bigQuery.create(tableInfo); - } - break; - case DROP_TABLE: - // need to flush changes before dropping the table, otherwise the next flush will write data that - // shouldn't exist - flush(); - tableId = TableId.of(project, normalizedDatabaseName, normalizedTableName); - primaryKeyStore.remove(tableId); - table = bigQuery.getTable(tableId); - if (table != null) { - if (requireManualDrops) { - String message = String.format("Encountered an event to drop table '%s' in dataset '%s' in project '%s', " + - "but the target is configured to require manual drops. " + - "Please manually drop the table to make progress.", - normalizedTableName, normalizedDatabaseName, project); - LOG.error(message); - throw new RuntimeException(message); - } - bigQuery.delete(tableId); - } - TableId stagingTableId = TableId.of(project, normalizedDatabaseName, normalizedStagingTableName); - Table stagingTable = bigQuery.getTable(stagingTableId); - if (stagingTable != null) { - bigQuery.delete(stagingTableId); - } - break; - case ALTER_TABLE: - // need to flush any changes before altering the table to ensure all changes before the schema change - // are in the table when it is altered. - flush(); - // after a flush, the staging table will be gone, so no need to alter it. - tableId = TableId.of(project, normalizedDatabaseName, normalizedTableName); - table = bigQuery.getTable(tableId); - primaryKeys = event.getPrimaryKey(); - Clustering clustering = maxClusteringColumns <= 0 ? null : - Clustering.newBuilder() - .setFields(primaryKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size()))) - .build(); - TableDefinition tableDefinition = StandardTableDefinition.newBuilder() - .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema()))) - .setClustering(clustering) - .build(); - TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); - if (encryptionConfig != null) { - builder.setEncryptionConfiguration(encryptionConfig); - } - TableInfo tableInfo = builder.build(); - if (table == null) { - bigQuery.create(tableInfo); - } else { - bigQuery.update(tableInfo); - } - - updatePrimaryKeys(tableId, primaryKeys); - break; - case RENAME_TABLE: - // TODO: flush changes, execute a copy job, delete previous table, drop old staging table, remove old entry - // in primaryKeyStore, put new entry in primaryKeyStore - LOG.warn("Rename DDL events are not supported. Ignoring rename event in database {} from table {} to table {}.", - event.getOperation().getDatabaseName(), event.getOperation().getPrevTableName(), - event.getOperation().getTableName()); - break; - case TRUNCATE_TABLE: - flush(); - tableId = TableId.of(project, normalizedDatabaseName, normalizedTableName); - table = bigQuery.getTable(tableId); - if (table != null) { - tableDefinition = table.getDefinition(); - bigQuery.delete(tableId); - } else { - primaryKeys = event.getPrimaryKey(); - clustering = maxClusteringColumns <= 0 ? null : - Clustering.newBuilder() - .setFields(primaryKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size()))) - .build(); - tableDefinition = StandardTableDefinition.newBuilder() - .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema()))) - .setClustering(clustering) - .build(); - } - - builder = TableInfo.newBuilder(tableId, tableDefinition); - if (encryptionConfig != null) { - builder.setEncryptionConfiguration(encryptionConfig); - } - tableInfo = builder.build(); - bigQuery.create(tableInfo); - break; - } - } - - @VisibleForTesting - static List getClusteringSupportedKeys(List primaryKeys, Schema recordSchema) { + public static List getClusteringSupportedKeys(List primaryKeys, Schema recordSchema) { List result = new ArrayList<>(); for (String key : primaryKeys) { if (Schemas.isClusteringSupported(recordSchema.getField(key))) { @@ -490,11 +346,12 @@ static List getClusteringSupportedKeys(List primaryKeys, Schema return result; } - private void updatePrimaryKeys(TableId tableId, List primaryKeys) throws DeltaFailureException, IOException { + // made this public to give access to DDLEventHandler(s) + public void updatePrimaryKeys(TableId tableId, List primaryKeys) throws DeltaFailureException, IOException { if (primaryKeys.isEmpty()) { throw new DeltaFailureException( - String.format("Table '%s' in database '%s' has no primary key. Tables without a primary key are" + - " not supported.", tableId.getTable(), tableId.getDataset())); + String.format("Table '%s' in database '%s' has no primary key. Tables without a primary key are" + + " not supported.", tableId.getTable(), tableId.getDataset())); } List existingKey = primaryKeyStore.get(tableId); if (primaryKeys.equals(existingKey)) { @@ -502,7 +359,7 @@ private void updatePrimaryKeys(TableId tableId, List primaryKeys) throws } primaryKeyStore.put(tableId, primaryKeys); context.putState(String.format("bigquery-%s-%s", tableId.getDataset(), tableId.getTable()), - Bytes.toBytes(GSON.toJson(new BigQueryTableState(primaryKeys)))); + Bytes.toBytes(GSON.toJson(new BigQueryTableState(primaryKeys)))); } private List getPrimaryKeys(TableId targetTableId) throws IOException, DeltaFailureException { @@ -522,7 +379,7 @@ private List getPrimaryKeys(TableId targetTableId) throws IOException, D return primaryKeys; } - static Schema addSupplementaryColumnsToTargetSchema(Schema original) { + public static Schema addSupplementaryColumnsToTargetSchema(Schema original) { List fields = new ArrayList<>(original.getFields().size() + 4); fields.add(Schema.Field.of(Constants.SEQUENCE_NUM, Schema.of(Schema.Type.LONG))); fields.add(Schema.Field.of(Constants.IS_DELETED, Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN)))); @@ -598,8 +455,7 @@ public synchronized void applyDML(Sequenced sequencedEvent) throws Exc } } - @VisibleForTesting - synchronized void flush() throws InterruptedException, IOException, DeltaFailureException { + public synchronized void flush() throws InterruptedException, IOException, DeltaFailureException { Map> tableBlobsByBlobType; // if this throws an IOException, we want to propagate it, since we need the app to reset state to the last // commit and replay events. This is because previous events are written directly to an outputstream to GCS diff --git a/src/main/java/io/cdap/delta/bigquery/event/AlterTable.java b/src/main/java/io/cdap/delta/bigquery/event/AlterTable.java new file mode 100644 index 0000000..d9ebf60 --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/AlterTable.java @@ -0,0 +1,53 @@ +package io.cdap.delta.bigquery.event; + +import com.google.cloud.bigquery.*; +import com.google.cloud.storage.Bucket; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.delta.api.DDLEvent; +import io.cdap.delta.api.DeltaFailureException; +import io.cdap.delta.bigquery.BigQueryEventConsumer; +import io.cdap.delta.bigquery.BigQueryTableState; +import io.cdap.delta.bigquery.Schemas; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class AlterTable extends DDLEventHandler { + + public AlterTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); } + + @Override + public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException { + + // need to flush any changes before altering the table to ensure all changes before the schema change + // are in the table when it is altered. + consumer.flush(); + // after a flush, the staging table will be gone, so no need to alter it. + TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName); + Table table = config.bigQuery.getTable(tableId); + List primaryKeys = event.getPrimaryKey(); + Clustering clustering = config.maxClusteringColumns <= 0 ? null : + Clustering.newBuilder() + .setFields(primaryKeys.subList(0, Math.min(config.maxClusteringColumns, primaryKeys.size()))) + .build(); + TableDefinition tableDefinition = StandardTableDefinition.newBuilder() + .setSchema(Schemas.convert(BigQueryEventConsumer.addSupplementaryColumnsToTargetSchema(event.getSchema()))) + .setClustering(clustering) + .build(); + TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); + if (config.encryptionConfig != null) { + builder.setEncryptionConfiguration(config.encryptionConfig); + } + TableInfo tableInfo = builder.build(); + if (table == null) { + config.bigQuery.create(tableInfo); + } else { + config.bigQuery.update(tableInfo); + } + + consumer.updatePrimaryKeys(tableId, primaryKeys); + } + + +} diff --git a/src/main/java/io/cdap/delta/bigquery/event/CreateDatabase.java b/src/main/java/io/cdap/delta/bigquery/event/CreateDatabase.java new file mode 100644 index 0000000..5ea4c35 --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/CreateDatabase.java @@ -0,0 +1,35 @@ +package io.cdap.delta.bigquery.event; + +import com.google.cloud.bigquery.*; +import com.google.cloud.storage.Bucket; +import io.cdap.delta.api.DDLEvent; +import io.cdap.delta.api.DeltaFailureException; +import io.cdap.delta.bigquery.BigQueryEventConsumer; +import io.cdap.delta.bigquery.BigQueryTarget; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class CreateDatabase extends DDLEventHandler { + + public CreateDatabase(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); } + + @Override + public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException { + DatasetId datasetId = DatasetId.of(config.project, normalizedDatabaseName); + if (config.bigQuery.getDataset(datasetId) == null) { + DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(config.bucket.getLocation()).build(); + try { + config.bigQuery.create(datasetInfo); + } catch (BigQueryException e) { + // It is possible that in multiple worker instances scenario + // dataset is created by another worker instance after this worker instance + // determined that dataset does not exists. Ignore error if dataset is created. + if (e.getCode() != BigQueryTarget.CONFLICT) { + throw e; + } + } + } + } +} diff --git a/src/main/java/io/cdap/delta/bigquery/event/CreateTable.java b/src/main/java/io/cdap/delta/bigquery/event/CreateTable.java new file mode 100644 index 0000000..6c7fdfb --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/CreateTable.java @@ -0,0 +1,54 @@ +package io.cdap.delta.bigquery.event; + +import com.google.cloud.bigquery.*; +import com.google.cloud.storage.Bucket; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.delta.api.DDLEvent; +import io.cdap.delta.api.DeltaFailureException; +import io.cdap.delta.bigquery.BigQueryEventConsumer; +import io.cdap.delta.bigquery.Schemas; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class CreateTable extends DDLEventHandler { + + public CreateTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); } + + @Override + public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException { + TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName); + Table table = config.bigQuery.getTable(tableId); + // SNAPSHOT data is directly loaded in the target table. Check if any such direct load was in progress + // for the current table when target received CREATE_TABLE ddl. This indicates that the snapshot was abandoned + // because of some failure scenario. Delete the existing table if any. + byte[] state = config.context.getState(String.format(BigQueryEventConsumer.DIRECT_LOADING_IN_PROGRESS_PREFIX + "%s-%s", + normalizedDatabaseName, normalizedTableName)); + if (table != null && state != null && Bytes.toBoolean(state)) { + config.bigQuery.delete(tableId); + } + List primaryKeys = event.getPrimaryKey(); + consumer.updatePrimaryKeys(tableId, primaryKeys); + // TODO: check schema of table if it exists already + if (table == null) { + List clusteringSupportedKeys = BigQueryEventConsumer.getClusteringSupportedKeys(primaryKeys, event.getSchema()); + Clustering clustering = config.maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null : + Clustering.newBuilder() + .setFields(clusteringSupportedKeys.subList(0, Math.min(config.maxClusteringColumns, + clusteringSupportedKeys.size()))) + .build(); + TableDefinition tableDefinition = StandardTableDefinition.newBuilder() + .setSchema(Schemas.convert(BigQueryEventConsumer.addSupplementaryColumnsToTargetSchema(event.getSchema()))) + .setClustering(clustering) + .build(); + + TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); + if (config.encryptionConfig != null) { + builder.setEncryptionConfiguration(config.encryptionConfig); + } + TableInfo tableInfo = builder.build(); + config.bigQuery.create(tableInfo); + } + } +} diff --git a/src/main/java/io/cdap/delta/bigquery/event/DDLEventHandler.java b/src/main/java/io/cdap/delta/bigquery/event/DDLEventHandler.java new file mode 100644 index 0000000..cb8059e --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/DDLEventHandler.java @@ -0,0 +1,31 @@ +package io.cdap.delta.bigquery.event; + +import io.cdap.delta.api.DDLEvent; +import io.cdap.delta.api.DeltaFailureException; +import io.cdap.delta.bigquery.BigQueryEventConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public abstract class DDLEventHandler { + + final static protected Logger LOG = LoggerFactory.getLogger(DDLEventHandler.class); + + final protected BigQueryEventConsumer consumer; + final protected EventHandlerConfig config; + + public DDLEventHandler(BigQueryEventConsumer consumer, EventHandlerConfig config) { + this.consumer = consumer; + this.config = config; + } + + public abstract void handleDDL( + DDLEvent event, + String normalizedDatabaseName, + String normalizedTableName, + String normalizedStagingTableName) + throws IOException, DeltaFailureException, InterruptedException + ; + +} diff --git a/src/main/java/io/cdap/delta/bigquery/event/DropDatabase.java b/src/main/java/io/cdap/delta/bigquery/event/DropDatabase.java new file mode 100644 index 0000000..208f0e6 --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/DropDatabase.java @@ -0,0 +1,31 @@ +package io.cdap.delta.bigquery.event; + +import com.google.cloud.bigquery.DatasetId; +import io.cdap.delta.api.DDLEvent; +import io.cdap.delta.api.DeltaFailureException; +import io.cdap.delta.bigquery.BigQueryEventConsumer; + +import java.io.IOException; + +public class DropDatabase extends DDLEventHandler { + + public DropDatabase(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); } + + @Override + public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException { + DatasetId datasetId = DatasetId.of(config.project, normalizedDatabaseName); + config.primaryKeyStore.clear(); + if (config.bigQuery.getDataset(datasetId) != null) { + if (config.requireManualDrops) { + String message = String.format("Encountered an event to drop dataset '%s' in project '%s', " + + "but the target is configured to require manual drops. " + + "Please manually drop the dataset to make progress.", + normalizedDatabaseName, config.project); + LOG.error(message); + throw new RuntimeException(message); + } + config.bigQuery.delete(datasetId); + } + + } +} diff --git a/src/main/java/io/cdap/delta/bigquery/event/DropTable.java b/src/main/java/io/cdap/delta/bigquery/event/DropTable.java new file mode 100644 index 0000000..6b7712f --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/DropTable.java @@ -0,0 +1,45 @@ +package io.cdap.delta.bigquery.event; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.storage.Bucket; +import io.cdap.delta.api.DDLEvent; +import io.cdap.delta.api.DeltaFailureException; +import io.cdap.delta.bigquery.BigQueryEventConsumer; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class DropTable extends DDLEventHandler { + + public DropTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); } + + @Override + public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException { + // need to flush changes before dropping the table, otherwise the next flush will write data that + // shouldn't exist + consumer.flush(); + TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName); + config.primaryKeyStore.remove(tableId); + Table table = config.bigQuery.getTable(tableId); + if (table != null) { + if (config.requireManualDrops) { + String message = String.format("Encountered an event to drop table '%s' in dataset '%s' in project '%s', " + + "but the target is configured to require manual drops. " + + "Please manually drop the table to make progress.", + normalizedTableName, normalizedDatabaseName, config.project); + LOG.error(message); + throw new RuntimeException(message); + } + config.bigQuery.delete(tableId); + } + TableId stagingTableId = TableId.of(config.project, normalizedDatabaseName, normalizedStagingTableName); + Table stagingTable = config.bigQuery.getTable(stagingTableId); + if (stagingTable != null) { + config.bigQuery.delete(stagingTableId); + } + + } +} diff --git a/src/main/java/io/cdap/delta/bigquery/event/EventDispatcher.java b/src/main/java/io/cdap/delta/bigquery/event/EventDispatcher.java new file mode 100644 index 0000000..4fab657 --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/EventDispatcher.java @@ -0,0 +1,37 @@ +package io.cdap.delta.bigquery.event; + +import io.cdap.delta.api.DDLEvent; +import io.cdap.delta.api.DDLOperation; +import io.cdap.delta.api.DeltaFailureException; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class EventDispatcher { + + private final Map handlers = new HashMap<>(); + + public EventDispatcher( + DropDatabase dropDatabase, + CreateDatabase createDatabase, + DropTable dropTable, + CreateTable createTable, + AlterTable alterTable, + TruncateTable truncateTable, + RenameTable renameTable + ) { + handlers.put(DDLOperation.Type.DROP_DATABASE, dropDatabase); + handlers.put(DDLOperation.Type.CREATE_DATABASE, createDatabase); + handlers.put(DDLOperation.Type.DROP_TABLE, dropTable); + handlers.put(DDLOperation.Type.CREATE_TABLE, createTable); + handlers.put(DDLOperation.Type.ALTER_TABLE, alterTable); + handlers.put(DDLOperation.Type.TRUNCATE_TABLE, truncateTable); + handlers.put(DDLOperation.Type.RENAME_TABLE, renameTable); + } + + public DDLEventHandler handler(DDLEvent event) { + return handlers.get(event.getOperation().getType()); + } + +} diff --git a/src/main/java/io/cdap/delta/bigquery/event/EventHandlerConfig.java b/src/main/java/io/cdap/delta/bigquery/event/EventHandlerConfig.java new file mode 100644 index 0000000..0cd06d3 --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/EventHandlerConfig.java @@ -0,0 +1,44 @@ +package io.cdap.delta.bigquery.event; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.EncryptionConfiguration; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.storage.Bucket; +import io.cdap.delta.api.DeltaTargetContext; + +import java.util.List; +import java.util.Map; + +public class EventHandlerConfig { + + public final DeltaTargetContext context; + public final BigQuery bigQuery; + public final String project; + public final Bucket bucket; + public final Map> primaryKeyStore; + public final boolean requireManualDrops; + public final int maxClusteringColumns; + public final EncryptionConfiguration encryptionConfig; + + public EventHandlerConfig( + DeltaTargetContext context, + BigQuery bigQuery, + String project, + Bucket bucket, + Map> primaryKeyStore, + boolean requireManualDrops, + int maxClusteringColumns, + EncryptionConfiguration encryptionConfig + + ) { + this.context = context; + this.bigQuery = bigQuery; + this.project = project; + this.bucket = bucket; + this.primaryKeyStore = primaryKeyStore; + this.requireManualDrops = requireManualDrops; + this.maxClusteringColumns = maxClusteringColumns; + this.encryptionConfig = encryptionConfig; + } +} diff --git a/src/main/java/io/cdap/delta/bigquery/event/RenameTable.java b/src/main/java/io/cdap/delta/bigquery/event/RenameTable.java new file mode 100644 index 0000000..f7ea15a --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/RenameTable.java @@ -0,0 +1,27 @@ +package io.cdap.delta.bigquery.event; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.storage.Bucket; +import io.cdap.delta.api.DDLEvent; +import io.cdap.delta.api.DeltaFailureException; +import io.cdap.delta.bigquery.BigQueryEventConsumer; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class RenameTable extends DDLEventHandler { + + public RenameTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); } + + @Override + public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException { + // TODO: flush changes, execute a copy job, delete previous table, drop old staging table, remove old entry + // in primaryKeyStore, put new entry in primaryKeyStore + LOG.warn("Rename DDL events are not supported. Ignoring rename event in database {} from table {} to table {}.", + event.getOperation().getDatabaseName(), event.getOperation().getPrevTableName(), + event.getOperation().getTableName()); + + } +} diff --git a/src/main/java/io/cdap/delta/bigquery/event/TruncateTable.java b/src/main/java/io/cdap/delta/bigquery/event/TruncateTable.java new file mode 100644 index 0000000..5c40e0b --- /dev/null +++ b/src/main/java/io/cdap/delta/bigquery/event/TruncateTable.java @@ -0,0 +1,46 @@ +package io.cdap.delta.bigquery.event; + +import com.google.cloud.bigquery.*; +import io.cdap.delta.api.DDLEvent; +import io.cdap.delta.api.DeltaFailureException; +import io.cdap.delta.bigquery.BigQueryEventConsumer; +import io.cdap.delta.bigquery.Schemas; + +import java.io.IOException; +import java.util.List; + +public class TruncateTable extends DDLEventHandler { + + public TruncateTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); } + + @Override + public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException { + consumer.flush(); + TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName); + Table table = config.bigQuery.getTable(tableId); + TableDefinition tableDefinition = null; + List primaryKeys = null; + if (table != null) { + tableDefinition = table.getDefinition(); + config.bigQuery.delete(tableId); + } else { + primaryKeys = event.getPrimaryKey(); + Clustering clustering = config.maxClusteringColumns <= 0 ? null : + Clustering.newBuilder() + .setFields(primaryKeys.subList(0, Math.min(config.maxClusteringColumns, primaryKeys.size()))) + .build(); + tableDefinition = StandardTableDefinition.newBuilder() + .setSchema(Schemas.convert(BigQueryEventConsumer.addSupplementaryColumnsToTargetSchema(event.getSchema()))) + .setClustering(clustering) + .build(); + } + + TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); + if (config.encryptionConfig != null) { + builder.setEncryptionConfiguration(config.encryptionConfig); + } + TableInfo tableInfo = builder.build(); + config.bigQuery.create(tableInfo); + + } +}