Skip to content

Commit

Permalink
Merge pull request #252 from cloudsufi/release/0.7
Browse files Browse the repository at this point in the history
[cherry-pick][CDAP-1703] Support Flexible column names
  • Loading branch information
vikasrathee-cs authored Dec 11, 2023
2 parents 4f138d2 + 2b57258 commit 885c477
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 78 deletions.
13 changes: 13 additions & 0 deletions docs/bigquery-cdcTarget.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ name is same as source database name. A valid name should only contain letters,
maximum length can be 1024. Any invalid chars would be replaced with underscore in the final dataset name and
any characters exceeds length limit will be truncated.

**Allow Flexible Column Naming**:
By default ony english letters, numbers and underscore are allowed in column names. If this option is enabled,
international characters are also allowed in column names with some extra special characters, which follow the
bigquery naming convention for flexible column names.
Some special characters allowed in flexible column names are:
- An ampersand (&)
- A percent sign (%)
- A colon (:)
- A lessthan sign (<)
- A space ( )

Read more about this option [here](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names).

**Encryption Key Name**: GCP Customer-managed encryption key (CMEK) used to encrypt the resources created by this target.
Encryption key name should be of the form "projects/<gcp-project-id>/locations/<key-location>/keyRings/<key-ring-name>/cryptoKeys/<key-name>".

Expand Down
7 changes: 5 additions & 2 deletions src/main/java/io/cdap/delta/bigquery/BigQueryAssessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ public class BigQueryAssessor implements TableAssessor<StandardizedTableDetail>
// tables already assessed so far, key is table name and value is schema name
private final Map<String, String> tableToSchema;
private final String datasetName;
private final boolean allowFlexibleColumnNaming;

BigQueryAssessor(String stagingTablePrefix, String datasetName) {
BigQueryAssessor(String stagingTablePrefix, String datasetName, boolean allowFlexibleColumnNaming) {
this.stagingTablePrefix = stagingTablePrefix;
this.tableToSchema = new HashMap<>();
this.datasetName = datasetName;
this.allowFlexibleColumnNaming = allowFlexibleColumnNaming;
}

@Override
Expand All @@ -58,7 +60,8 @@ public TableAssessment assess(StandardizedTableDetail tableDetail) {
for (Schema.Field field : tableDetail.getSchema().getFields()) {
try {
String bqType = toBigQueryType(field);
columnAssessments.add(ColumnAssessment.builder(BigQueryUtils.normalizeFieldName(field.getName()), bqType)
columnAssessments.add(ColumnAssessment.builder(BigQueryUtils.normalizeFieldName(field.getName(),
allowFlexibleColumnNaming), bqType)
.setSourceColumn(field.getName()).build());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Converting schema {} to {}", field.getSchema().isNullable() ?
Expand Down
56 changes: 35 additions & 21 deletions src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,15 @@ public class BigQueryEventConsumer implements EventConsumer {
private long latestSequenceNum;
private Exception flushException;
private final AtomicBoolean shouldStop;
private final boolean allowFlexibleColumnNaming;
// have to keep all the records in memory in case there is a failure writing to GCS
// cannot write to a temporary file on local disk either in case there is a failure writing to disk
// Without keeping the entire batch in memory, there would be no way to recover the records that failed to write

BigQueryEventConsumer(DeltaTargetContext context, Storage storage, BigQuery bigQuery, Bucket bucket,
String project, int loadIntervalSeconds, String stagingTablePrefix, boolean requireManualDrops,
@Nullable EncryptionConfiguration encryptionConfig, @Nullable Long baseRetryDelay,
@Nullable String datasetName, boolean softDeletesEnabled) {
@Nullable String datasetName, boolean softDeletesEnabled, boolean allowFlexibleColumnNaming) {
this.context = context;
this.bigQuery = bigQuery;
this.loadIntervalSeconds = loadIntervalSeconds;
Expand Down Expand Up @@ -242,6 +243,7 @@ public class BigQueryEventConsumer implements EventConsumer {
this.datasetName = datasetName;
this.retainStagingTable = Boolean.parseBoolean(context.getRuntimeArguments().get(RETAIN_STAGING_TABLE));
this.softDeletesEnabled = softDeletesEnabled;
this.allowFlexibleColumnNaming = allowFlexibleColumnNaming;
this.shouldStop = new AtomicBoolean(false);
}

Expand Down Expand Up @@ -289,7 +291,8 @@ public synchronized void applyDDL(Sequenced<DDLEvent> sequencedEvent) throws Exc
String normalizedStagingTableName = normalizedTableName == null ? null :
BigQueryUtils.normalizeTableName(stagingTablePrefix + normalizedTableName);

runWithRetries(ctx -> handleDDL(event, normalizedDatabaseName, normalizedTableName, normalizedStagingTableName),
runWithRetries(ctx -> handleDDL(event, normalizedDatabaseName, normalizedTableName, normalizedStagingTableName,
allowFlexibleColumnNaming),
baseRetryDelay,
normalizedDatabaseName,
event.getOperation().getSchemaName(),
Expand All @@ -313,7 +316,7 @@ public synchronized void applyDDL(Sequenced<DDLEvent> sequencedEvent) throws Exc
}

private void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName,
String normalizedStagingTableName)
String normalizedStagingTableName, boolean allowFlexibleColumnNaming)
throws IOException, DeltaFailureException, InterruptedException {

switch (event.getOperation().getType()) {
Expand Down Expand Up @@ -360,20 +363,24 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
bigQuery.delete(tableId);
}
List<String> primaryKeys = event.getPrimaryKey();
List<String> normalizedPrimaryKeys = primaryKeys.stream()
.map(BigQueryUtils::normalizeFieldName)
.collect(Collectors.toList());
List<String> normalizedPrimaryKeys = new ArrayList<>();
for (String primaryKey : primaryKeys) {
String normalizedKey = BigQueryUtils.normalizeFieldName(primaryKey, allowFlexibleColumnNaming);
normalizedPrimaryKeys.add(normalizedKey);
}
updatePrimaryKeys(tableId, normalizedPrimaryKeys);
// TODO: check schema of table if it exists already
if (table == null) {
List<String> clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema());
List<String> clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema(),
allowFlexibleColumnNaming);
Clustering clustering = maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null :
Clustering.newBuilder()
.setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns,
clusteringSupportedKeys.size())))
.build();
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId)))
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId),
allowFlexibleColumnNaming))
.setClustering(clustering)
.build();

Expand Down Expand Up @@ -417,13 +424,15 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
tableId = TableId.of(project, normalizedDatabaseName, normalizedTableName);
table = bigQuery.getTable(tableId);
primaryKeys = event.getPrimaryKey();
List<String> clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema());
List<String> clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema(),
allowFlexibleColumnNaming);
Clustering clustering = maxClusteringColumns <= 0 ? null :
Clustering.newBuilder()
.setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size())))
.build();
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId)))
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId),
allowFlexibleColumnNaming))
.setClustering(clustering)
.build();
TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition);
Expand All @@ -436,9 +445,11 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
} else {
bigQuery.update(tableInfo);
}
normalizedPrimaryKeys = primaryKeys.stream()
.map(BigQueryUtils::normalizeFieldName)
.collect(Collectors.toList());
normalizedPrimaryKeys = new ArrayList<>();
for (String primaryKey : primaryKeys) {
String normalizedKey = BigQueryUtils.normalizeFieldName(primaryKey, allowFlexibleColumnNaming);
normalizedPrimaryKeys.add(normalizedKey);
}
updatePrimaryKeys(tableId, normalizedPrimaryKeys);
break;
case RENAME_TABLE:
Expand All @@ -462,7 +473,8 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
.setFields(primaryKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size())))
.build();
tableDefinition = StandardTableDefinition.newBuilder()
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId)))
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId),
allowFlexibleColumnNaming))
.setClustering(clustering)
.build();
}
Expand All @@ -478,11 +490,12 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
}

@VisibleForTesting
static List<String> getClusteringSupportedKeys(List<String> primaryKeys, Schema recordSchema) {
static List<String> getClusteringSupportedKeys(List<String> primaryKeys, Schema recordSchema,
boolean allowFlexibleColumnNaming) {
List<String> result = new ArrayList<>();
for (String key : primaryKeys) {
if (Schemas.isClusteringSupported(recordSchema.getField(key))) {
result.add(BigQueryUtils.normalizeFieldName(key));
if (Schemas.isClusteringSupported(recordSchema.getField(key), allowFlexibleColumnNaming)) {
result.add(BigQueryUtils.normalizeFieldName(key, allowFlexibleColumnNaming));
}
}
return result;
Expand Down Expand Up @@ -560,7 +573,7 @@ public synchronized void applyDML(Sequenced<DMLEvent> sequencedEvent) throws Exc
String normalizedDatabaseName = BigQueryUtils.getNormalizedDatasetName(datasetName,
event.getOperation().getDatabaseName());
String normalizedTableName = BigQueryUtils.normalizeTableName(event.getOperation().getTableName());
DMLEvent normalizedDMLEvent = BigQueryUtils.normalize(event)
DMLEvent normalizedDMLEvent = BigQueryUtils.normalize(event, allowFlexibleColumnNaming)
.setDatabaseName(normalizedDatabaseName)
.setTableName(normalizedTableName)
.build();
Expand Down Expand Up @@ -739,7 +752,7 @@ private void loadTable(TableId tableId, TableBlob blob, boolean directLoadToTarg
.build();
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
.setLocation(bucket.getLocation())
.setSchema(Schemas.convert(blob.getStagingSchema()))
.setSchema(Schemas.convert(blob.getStagingSchema(), allowFlexibleColumnNaming))
.setClustering(clustering)
.build();
TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition);
Expand All @@ -762,7 +775,8 @@ private void loadTable(TableId tableId, TableBlob blob, boolean directLoadToTarg

// Explicitly set schema for load jobs
com.google.cloud.bigquery.Schema bqSchema
= Schemas.convert(directLoadToTarget ? blob.getTargetSchema() : blob.getStagingSchema());
= Schemas.convert(directLoadToTarget ? blob.getTargetSchema() : blob.getStagingSchema(),
allowFlexibleColumnNaming);
LoadJobConfiguration.Builder jobConfigBuilder = LoadJobConfiguration
.newBuilder(tableId, uri)
.setSchema(bqSchema)
Expand Down Expand Up @@ -1430,7 +1444,7 @@ private void addSortKeyToTargetTable(TableId targetTableId, List<Schema.Type> so
Schema.Field sortKeyField = Schema.Field.of(Constants.SORT_KEYS, Schemas.getSortKeysSchema(sortKeys));

List<Field> fieldList = new ArrayList<Field>(fields);
fieldList.add(Schemas.convertToBigQueryField(sortKeyField));
fieldList.add(Schemas.convertToBigQueryField(sortKeyField, allowFlexibleColumnNaming));
// Update the table with the new schema
com.google.cloud.bigquery.Schema updatedSchema = com.google.cloud.bigquery.Schema.of(fieldList);
table.toBuilder().setDefinition(StandardTableDefinition.of(updatedSchema)).build().update();
Expand Down
20 changes: 18 additions & 2 deletions src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public EventConsumer createConsumer(DeltaTargetContext context) throws IOExcepti
return new BigQueryEventConsumer(context, storage, bigQuery, bucket, datasetProject,
conf.getLoadIntervalSeconds(), conf.getStagingTablePrefix(),
conf.requiresManualDrops(), encryptionConfig, null, conf.getDatasetName(),
conf.softDeletesEnabled());
conf.softDeletesEnabled(), conf.getAllowFlexibleColumnNaming());
}

@VisibleForTesting
Expand All @@ -228,7 +228,7 @@ static String getStagingBucketName(@Nullable String providedBucketName, DeltaPip

@Override
public TableAssessor<StandardizedTableDetail> createTableAssessor(Configurer configurer) {
return new BigQueryAssessor(conf.stagingTablePrefix, conf.datasetName);
return new BigQueryAssessor(conf.stagingTablePrefix, conf.datasetName, conf.getAllowFlexibleColumnNaming());
}

private static String stringifyPipelineId(DeltaPipelineId pipelineId) {
Expand Down Expand Up @@ -337,6 +337,18 @@ public static class Conf extends PluginConfig {
"underscore in the final dataset name and any characters exceeds length limit will be truncated.")
private String datasetName;

@Nullable
@Description(
"By default, the target table's column names mirror those of the source table. They are normalized to include " +
"only letters, numbers, and underscores. Any invalid characters are replaced with underscores in the " +
"final column name. If set to true, the target table's column names will be adjusted to adhere to BigQuery's " +
"flexible column naming conventions, such as supporting international characters, spaces, and some more " +
"special characters (check docs) with a maximum length of 300 characters. Any invalid characters will " +
"be replaced with underscores in the final column name. Additionally, any characters exceeding the length " +
"limit will be truncated."
)
private Boolean allowFlexibleColumnNaming;

@Nullable
@Description(
"Optional. GCP Customer-managed encryption key (CMEK) used to encrypt the resources created by this target.")
Expand All @@ -347,6 +359,10 @@ public String getDatasetName() {
return datasetName;
}

public boolean getAllowFlexibleColumnNaming() {
return allowFlexibleColumnNaming != null && allowFlexibleColumnNaming;
}

@Nullable
public String getEncryptionKeyName() {
return encryptionKeyName;
Expand Down
Loading

0 comments on commit 885c477

Please sign in to comment.