diff --git a/docs/bigquery-cdcTarget.md b/docs/bigquery-cdcTarget.md index 5cfd733..7b3dda4 100644 --- a/docs/bigquery-cdcTarget.md +++ b/docs/bigquery-cdcTarget.md @@ -82,6 +82,19 @@ name is same as source database name. A valid name should only contain letters, maximum length can be 1024. Any invalid chars would be replaced with underscore in the final dataset name and any characters exceeds length limit will be truncated. +**Allow Flexible Column Naming**: +By default ony english letters, numbers and underscore are allowed in column names. If this option is enabled, +international characters are also allowed in column names with some extra special characters, which follow the +bigquery naming convention for flexible column names. +Some special characters allowed in flexible column names are: +- An ampersand (&) +- A percent sign (%) +- A colon (:) +- A lessthan sign (<) +- A space ( ) + +Read more about this option [here](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names). + **Encryption Key Name**: GCP Customer-managed encryption key (CMEK) used to encrypt the resources created by this target. Encryption key name should be of the form "projects//locations//keyRings//cryptoKeys/". diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryAssessor.java b/src/main/java/io/cdap/delta/bigquery/BigQueryAssessor.java index 7685f91..03d0a46 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryAssessor.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryAssessor.java @@ -45,11 +45,13 @@ public class BigQueryAssessor implements TableAssessor // tables already assessed so far, key is table name and value is schema name private final Map tableToSchema; private final String datasetName; + private final boolean allowFlexibleColumnNaming; - BigQueryAssessor(String stagingTablePrefix, String datasetName) { + BigQueryAssessor(String stagingTablePrefix, String datasetName, boolean allowFlexibleColumnNaming) { this.stagingTablePrefix = stagingTablePrefix; this.tableToSchema = new HashMap<>(); this.datasetName = datasetName; + this.allowFlexibleColumnNaming = allowFlexibleColumnNaming; } @Override @@ -58,7 +60,8 @@ public TableAssessment assess(StandardizedTableDetail tableDetail) { for (Schema.Field field : tableDetail.getSchema().getFields()) { try { String bqType = toBigQueryType(field); - columnAssessments.add(ColumnAssessment.builder(BigQueryUtils.normalizeFieldName(field.getName()), bqType) + columnAssessments.add(ColumnAssessment.builder(BigQueryUtils.normalizeFieldName(field.getName(), + allowFlexibleColumnNaming), bqType) .setSourceColumn(field.getName()).build()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Converting schema {} to {}", field.getSchema().isNullable() ? diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java b/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java index ccdb89e..7ca7979 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java @@ -192,6 +192,7 @@ public class BigQueryEventConsumer implements EventConsumer { private long latestSequenceNum; private Exception flushException; private final AtomicBoolean shouldStop; + private final boolean allowFlexibleColumnNaming; // have to keep all the records in memory in case there is a failure writing to GCS // cannot write to a temporary file on local disk either in case there is a failure writing to disk // Without keeping the entire batch in memory, there would be no way to recover the records that failed to write @@ -199,7 +200,7 @@ public class BigQueryEventConsumer implements EventConsumer { BigQueryEventConsumer(DeltaTargetContext context, Storage storage, BigQuery bigQuery, Bucket bucket, String project, int loadIntervalSeconds, String stagingTablePrefix, boolean requireManualDrops, @Nullable EncryptionConfiguration encryptionConfig, @Nullable Long baseRetryDelay, - @Nullable String datasetName, boolean softDeletesEnabled) { + @Nullable String datasetName, boolean softDeletesEnabled, boolean allowFlexibleColumnNaming) { this.context = context; this.bigQuery = bigQuery; this.loadIntervalSeconds = loadIntervalSeconds; @@ -242,6 +243,7 @@ public class BigQueryEventConsumer implements EventConsumer { this.datasetName = datasetName; this.retainStagingTable = Boolean.parseBoolean(context.getRuntimeArguments().get(RETAIN_STAGING_TABLE)); this.softDeletesEnabled = softDeletesEnabled; + this.allowFlexibleColumnNaming = allowFlexibleColumnNaming; this.shouldStop = new AtomicBoolean(false); } @@ -289,7 +291,8 @@ public synchronized void applyDDL(Sequenced sequencedEvent) throws Exc String normalizedStagingTableName = normalizedTableName == null ? null : BigQueryUtils.normalizeTableName(stagingTablePrefix + normalizedTableName); - runWithRetries(ctx -> handleDDL(event, normalizedDatabaseName, normalizedTableName, normalizedStagingTableName), + runWithRetries(ctx -> handleDDL(event, normalizedDatabaseName, normalizedTableName, normalizedStagingTableName, + allowFlexibleColumnNaming), baseRetryDelay, normalizedDatabaseName, event.getOperation().getSchemaName(), @@ -313,7 +316,7 @@ public synchronized void applyDDL(Sequenced sequencedEvent) throws Exc } private void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, - String normalizedStagingTableName) + String normalizedStagingTableName, boolean allowFlexibleColumnNaming) throws IOException, DeltaFailureException, InterruptedException { switch (event.getOperation().getType()) { @@ -360,20 +363,24 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor bigQuery.delete(tableId); } List primaryKeys = event.getPrimaryKey(); - List normalizedPrimaryKeys = primaryKeys.stream() - .map(BigQueryUtils::normalizeFieldName) - .collect(Collectors.toList()); + List normalizedPrimaryKeys = new ArrayList<>(); + for (String primaryKey : primaryKeys) { + String normalizedKey = BigQueryUtils.normalizeFieldName(primaryKey, allowFlexibleColumnNaming); + normalizedPrimaryKeys.add(normalizedKey); + } updatePrimaryKeys(tableId, normalizedPrimaryKeys); // TODO: check schema of table if it exists already if (table == null) { - List clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema()); + List clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema(), + allowFlexibleColumnNaming); Clustering clustering = maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null : Clustering.newBuilder() .setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns, clusteringSupportedKeys.size()))) .build(); TableDefinition tableDefinition = StandardTableDefinition.newBuilder() - .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId))) + .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId), + allowFlexibleColumnNaming)) .setClustering(clustering) .build(); @@ -417,13 +424,15 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor tableId = TableId.of(project, normalizedDatabaseName, normalizedTableName); table = bigQuery.getTable(tableId); primaryKeys = event.getPrimaryKey(); - List clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema()); + List clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema(), + allowFlexibleColumnNaming); Clustering clustering = maxClusteringColumns <= 0 ? null : Clustering.newBuilder() .setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size()))) .build(); TableDefinition tableDefinition = StandardTableDefinition.newBuilder() - .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId))) + .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId), + allowFlexibleColumnNaming)) .setClustering(clustering) .build(); TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); @@ -436,9 +445,11 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor } else { bigQuery.update(tableInfo); } - normalizedPrimaryKeys = primaryKeys.stream() - .map(BigQueryUtils::normalizeFieldName) - .collect(Collectors.toList()); + normalizedPrimaryKeys = new ArrayList<>(); + for (String primaryKey : primaryKeys) { + String normalizedKey = BigQueryUtils.normalizeFieldName(primaryKey, allowFlexibleColumnNaming); + normalizedPrimaryKeys.add(normalizedKey); + } updatePrimaryKeys(tableId, normalizedPrimaryKeys); break; case RENAME_TABLE: @@ -462,7 +473,8 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor .setFields(primaryKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size()))) .build(); tableDefinition = StandardTableDefinition.newBuilder() - .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId))) + .setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema(), tableId), + allowFlexibleColumnNaming)) .setClustering(clustering) .build(); } @@ -478,11 +490,12 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor } @VisibleForTesting - static List getClusteringSupportedKeys(List primaryKeys, Schema recordSchema) { + static List getClusteringSupportedKeys(List primaryKeys, Schema recordSchema, + boolean allowFlexibleColumnNaming) { List result = new ArrayList<>(); for (String key : primaryKeys) { - if (Schemas.isClusteringSupported(recordSchema.getField(key))) { - result.add(BigQueryUtils.normalizeFieldName(key)); + if (Schemas.isClusteringSupported(recordSchema.getField(key), allowFlexibleColumnNaming)) { + result.add(BigQueryUtils.normalizeFieldName(key, allowFlexibleColumnNaming)); } } return result; @@ -560,7 +573,7 @@ public synchronized void applyDML(Sequenced sequencedEvent) throws Exc String normalizedDatabaseName = BigQueryUtils.getNormalizedDatasetName(datasetName, event.getOperation().getDatabaseName()); String normalizedTableName = BigQueryUtils.normalizeTableName(event.getOperation().getTableName()); - DMLEvent normalizedDMLEvent = BigQueryUtils.normalize(event) + DMLEvent normalizedDMLEvent = BigQueryUtils.normalize(event, allowFlexibleColumnNaming) .setDatabaseName(normalizedDatabaseName) .setTableName(normalizedTableName) .build(); @@ -739,7 +752,7 @@ private void loadTable(TableId tableId, TableBlob blob, boolean directLoadToTarg .build(); TableDefinition tableDefinition = StandardTableDefinition.newBuilder() .setLocation(bucket.getLocation()) - .setSchema(Schemas.convert(blob.getStagingSchema())) + .setSchema(Schemas.convert(blob.getStagingSchema(), allowFlexibleColumnNaming)) .setClustering(clustering) .build(); TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); @@ -762,7 +775,8 @@ private void loadTable(TableId tableId, TableBlob blob, boolean directLoadToTarg // Explicitly set schema for load jobs com.google.cloud.bigquery.Schema bqSchema - = Schemas.convert(directLoadToTarget ? blob.getTargetSchema() : blob.getStagingSchema()); + = Schemas.convert(directLoadToTarget ? blob.getTargetSchema() : blob.getStagingSchema(), + allowFlexibleColumnNaming); LoadJobConfiguration.Builder jobConfigBuilder = LoadJobConfiguration .newBuilder(tableId, uri) .setSchema(bqSchema) @@ -1430,7 +1444,7 @@ private void addSortKeyToTargetTable(TableId targetTableId, List so Schema.Field sortKeyField = Schema.Field.of(Constants.SORT_KEYS, Schemas.getSortKeysSchema(sortKeys)); List fieldList = new ArrayList(fields); - fieldList.add(Schemas.convertToBigQueryField(sortKeyField)); + fieldList.add(Schemas.convertToBigQueryField(sortKeyField, allowFlexibleColumnNaming)); // Update the table with the new schema com.google.cloud.bigquery.Schema updatedSchema = com.google.cloud.bigquery.Schema.of(fieldList); table.toBuilder().setDefinition(StandardTableDefinition.of(updatedSchema)).build().update(); diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java b/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java index 3338c5d..bd0896e 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java @@ -211,7 +211,7 @@ public EventConsumer createConsumer(DeltaTargetContext context) throws IOExcepti return new BigQueryEventConsumer(context, storage, bigQuery, bucket, datasetProject, conf.getLoadIntervalSeconds(), conf.getStagingTablePrefix(), conf.requiresManualDrops(), encryptionConfig, null, conf.getDatasetName(), - conf.softDeletesEnabled()); + conf.softDeletesEnabled(), conf.getAllowFlexibleColumnNaming()); } @VisibleForTesting @@ -228,7 +228,7 @@ static String getStagingBucketName(@Nullable String providedBucketName, DeltaPip @Override public TableAssessor createTableAssessor(Configurer configurer) { - return new BigQueryAssessor(conf.stagingTablePrefix, conf.datasetName); + return new BigQueryAssessor(conf.stagingTablePrefix, conf.datasetName, conf.getAllowFlexibleColumnNaming()); } private static String stringifyPipelineId(DeltaPipelineId pipelineId) { @@ -337,6 +337,18 @@ public static class Conf extends PluginConfig { "underscore in the final dataset name and any characters exceeds length limit will be truncated.") private String datasetName; + @Nullable + @Description( + "By default, the target table's column names mirror those of the source table. They are normalized to include " + + "only letters, numbers, and underscores. Any invalid characters are replaced with underscores in the " + + "final column name. If set to true, the target table's column names will be adjusted to adhere to BigQuery's " + + "flexible column naming conventions, such as supporting international characters, spaces, and some more " + + "special characters (check docs) with a maximum length of 300 characters. Any invalid characters will " + + "be replaced with underscores in the final column name. Additionally, any characters exceeding the length " + + "limit will be truncated." + ) + private Boolean allowFlexibleColumnNaming; + @Nullable @Description( "Optional. GCP Customer-managed encryption key (CMEK) used to encrypt the resources created by this target.") @@ -347,6 +359,10 @@ public String getDatasetName() { return datasetName; } + public boolean getAllowFlexibleColumnNaming() { + return allowFlexibleColumnNaming != null && allowFlexibleColumnNaming; + } + @Nullable public String getEncryptionKeyName() { return encryptionKeyName; diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java b/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java index f56e32b..bcd9841 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java @@ -54,7 +54,7 @@ */ public final class BigQueryUtils { private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtils.class); - public static final int FIELD_NAME_MAX_LENGTH = 128; + public static final int FIELD_NAME_MAX_LENGTH = 300; static final String BACKTICK = "`"; private static final int DATASET_OR_TABLE_NAME_MAX_LENGTH = 1024; // Valid BigQuery dataset names can contain only letters, numbers, and underscores. @@ -67,6 +67,10 @@ public final class BigQueryUtils { private static final Pattern VALID_TABLE_NAME_REGEX = Pattern.compile("[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}\\p{Zs}]+"); private static final Pattern INVALID_TABLE_NAME_REGEX = Pattern.compile("[^\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}\\p{Zs}]+"); + private static final Pattern VALID_FIELD_NAME_REGEX = + Pattern.compile("[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}&%+=:'<>#| ]+"); + private static final Pattern INVALID_FIELD_NAME_REGEX = + Pattern.compile("[^\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}&%+=:'<>#| ]+"); private static final String BIG_QUERY_DUPLICATE_ERROR = "duplicate"; private BigQueryUtils() { @@ -201,7 +205,7 @@ public static String getNormalizedDatasetName(@Nullable String datasetName, Stri * @return the normalized name */ public static String normalizeDatasetName(String name) { - return normalize(name, DATASET_OR_TABLE_NAME_MAX_LENGTH, true, false); + return normalize(name, DATASET_OR_TABLE_NAME_MAX_LENGTH, true, false, false); } /** @@ -213,30 +217,36 @@ public static String normalizeDatasetName(String name) { * @return the normalized name */ public static String normalizeTableName(String name) { - return normalize(name, DATASET_OR_TABLE_NAME_MAX_LENGTH, true, true); + return normalize(name, DATASET_OR_TABLE_NAME_MAX_LENGTH, true, true, false); } /** * Normalize the field name according to BigQuery's requirement. * The name must contain only letters, numbers, and underscores, start with a letter or underscore. - * And it must be 128 characters or fewer. + * See here: https://cloud.google.com/bigquery/docs/schemas#flexible-column-names + * And it must be 300 characters or fewer. * @param name the field name to be normalized * @return the normalized name */ - public static String normalizeFieldName(String name) { - return normalize(name, FIELD_NAME_MAX_LENGTH, false, false); + public static String normalizeFieldName(String name, boolean allowFlexibleColumnNaming) { + // use extended charset is set false due to backward compatibility + // use allowFlexibleColumnNaming to determine whether to use extended charset + return normalize(name, FIELD_NAME_MAX_LENGTH, false, false, allowFlexibleColumnNaming); } - private static String normalize(String name, int maxLength, boolean canStartWithNumber, boolean useExtendedCharset) { + private static String normalize(String name, int maxLength, boolean canStartWithNumber, boolean useExtendedCharset, + boolean allowFlexibleColumnNames) { if (name == null || name.isEmpty()) { return name; } - // replace invalid chars with underscores if there are any - if (useExtendedCharset && !VALID_TABLE_NAME_REGEX.matcher(name).matches()) { + if (allowFlexibleColumnNames && !VALID_FIELD_NAME_REGEX.matcher(name).matches()) { + name = INVALID_FIELD_NAME_REGEX.matcher(name).replaceAll("_"); + } + if (useExtendedCharset && !VALID_TABLE_NAME_REGEX.matcher(name).matches() && !allowFlexibleColumnNames) { name = INVALID_TABLE_NAME_REGEX.matcher(name).replaceAll("_"); } - if (!useExtendedCharset && !VALID_DATASET_NAME_REGEX.matcher(name).matches()) { + if (!useExtendedCharset && !VALID_DATASET_NAME_REGEX.matcher(name).matches() && !allowFlexibleColumnNames) { name = INVALID_DATASET_NAME_REGEX.matcher(name).replaceAll("_"); } @@ -257,25 +267,25 @@ private static String normalize(String name, int maxLength, boolean canStartWith return name; } - public static DMLEvent.Builder normalize(DMLEvent event) { + public static DMLEvent.Builder normalize(DMLEvent event, boolean allowFlexibleColumnNaming) { DMLEvent.Builder normalizedEventBuilder = DMLEvent.builder(event); if (event.getRow() != null) { - normalizedEventBuilder.setRow(normalize(event.getRow())); + normalizedEventBuilder.setRow(normalize(event.getRow(), allowFlexibleColumnNaming)); } if (event.getPreviousRow() != null) { - normalizedEventBuilder.setPreviousRow(normalize(event.getPreviousRow())); + normalizedEventBuilder.setPreviousRow(normalize(event.getPreviousRow(), allowFlexibleColumnNaming)); } return normalizedEventBuilder; } - private static StructuredRecord normalize(StructuredRecord record) { + private static StructuredRecord normalize(StructuredRecord record, boolean allowFlexibleColumnNaming) { Schema schema = record.getSchema(); List fields = schema.getFields(); List normalizedFields = new ArrayList<>(fields.size()); Map valueMap = new HashMap<>(); for (Schema.Field field : fields) { - String normalizedName = normalizeFieldName(field.getName()); + String normalizedName = normalizeFieldName(field.getName(), allowFlexibleColumnNaming); normalizedFields.add(Schema.Field.of(normalizedName, field.getSchema())); valueMap.put(normalizedName, record.get(field.getName())); } diff --git a/src/main/java/io/cdap/delta/bigquery/MultiGCSWriter.java b/src/main/java/io/cdap/delta/bigquery/MultiGCSWriter.java index c821662..53b1035 100644 --- a/src/main/java/io/cdap/delta/bigquery/MultiGCSWriter.java +++ b/src/main/java/io/cdap/delta/bigquery/MultiGCSWriter.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -54,6 +55,7 @@ public class MultiGCSWriter { private static final Logger LOG = LoggerFactory.getLogger(MultiGCSWriter.class); private static final Gson GSON = new Gson(); + private static final String ENGLISH_CHARACTERS_REGEX = "[\\w]+"; private final Storage storage; private final String bucket; private final String baseObjectName; @@ -116,6 +118,12 @@ private boolean isJsonFormat(StructuredRecord record) { if (logicalType != null && logicalType.equals(Schema.LogicalType.DATETIME)) { return true; } + // If the field name is not in english characters, then we will use json format + // We do this as the avro load job in BQ does not support non-english characters in field names for now + String fieldName = field.getName(); + if (!Pattern.matches(ENGLISH_CHARACTERS_REGEX, fieldName)) { + return true; + } } return false; } diff --git a/src/main/java/io/cdap/delta/bigquery/Schemas.java b/src/main/java/io/cdap/delta/bigquery/Schemas.java index a0d7d1b..c3f1b92 100644 --- a/src/main/java/io/cdap/delta/bigquery/Schemas.java +++ b/src/main/java/io/cdap/delta/bigquery/Schemas.java @@ -45,14 +45,14 @@ private Schemas() { } - public static com.google.cloud.bigquery.Schema convert(Schema schema) { - return com.google.cloud.bigquery.Schema.of(convertFields(schema.getFields())); + public static com.google.cloud.bigquery.Schema convert(Schema schema, boolean allowFlexibleColumnNaming) { + return com.google.cloud.bigquery.Schema.of(convertFields(schema.getFields(), allowFlexibleColumnNaming)); } - private static List convertFields(List fields) { + private static List convertFields(List fields, boolean allowFlexibleColumnNaming) { List output = new ArrayList<>(); for (Schema.Field field : fields) { - output.add(convertToBigQueryField(field)); + output.add(convertToBigQueryField(field, allowFlexibleColumnNaming)); } return output; } @@ -109,13 +109,13 @@ private static StandardSQLTypeName convertLogicalType(Schema fieldSchema) { * Check if the BigQuery data type associated with the {@link Schema.Field} can be added * as a clustering column while creating BigQuery table. */ - public static boolean isClusteringSupported(Schema.Field field) { - Field bigQueryField = convertToBigQueryField(field); + public static boolean isClusteringSupported(Schema.Field field, boolean allowFlexibleColumnNaming) { + Field bigQueryField = convertToBigQueryField(field, allowFlexibleColumnNaming); return CLUSTERING_SUPPORTED_TYPES.contains(bigQueryField.getType().getStandardType()); } - public static Field convertToBigQueryField(Schema.Field field) { - String normalizedName = BigQueryUtils.normalizeFieldName(field.getName()); + public static Field convertToBigQueryField(Schema.Field field, boolean allowFlexibleColumnNaming) { + String normalizedName = BigQueryUtils.normalizeFieldName(field.getName(), allowFlexibleColumnNaming); boolean isNullable = field.getSchema().isNullable(); Schema fieldSchema = field.getSchema(); fieldSchema = isNullable ? fieldSchema.getNonNullable() : fieldSchema; @@ -145,7 +145,7 @@ public static Field convertToBigQueryField(Schema.Field field) { } output = Field.newBuilder(normalizedName, bqType).setMode(Field.Mode.REPEATED).build(); } else if (type == Schema.Type.RECORD) { - List subFields = convertFields(fieldSchema.getFields()); + List subFields = convertFields(fieldSchema.getFields(), allowFlexibleColumnNaming); output = Field.newBuilder(normalizedName, StandardSQLTypeName.STRUCT, FieldList.of(subFields)).build(); } else { StandardSQLTypeName bqType = convertType(type); diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryAssessorTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryAssessorTest.java index b6c5659..cf6ef02 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryAssessorTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryAssessorTest.java @@ -30,7 +30,7 @@ public class BigQueryAssessorTest { @Test public void testAssessTable_duplicatedTableName() { - BigQueryAssessor assessor = new BigQueryAssessor("staging_prefix", null); + BigQueryAssessor assessor = new BigQueryAssessor("staging_prefix", null, false); String dbName = "testDB"; String tableName = "tableName"; String schemaName1 = "schemaName1"; diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryConsumerTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryConsumerTest.java index 7a1b1f9..2ce1ba9 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryConsumerTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryConsumerTest.java @@ -143,7 +143,7 @@ public void testConsumerMultipleTableInsertEvents() throws Exception { bigQuery, bucket, "project", LOAD_INTERVAL_SECONDS, "_staging", false, null, 2L, - DATASET, false); + DATASET, false, false); eventConsumer.start(); generateDDL(eventConsumer, tables); @@ -190,7 +190,7 @@ public void testConsumerEmptyDataset() throws Exception { bigQuery, bucket, "project", LOAD_INTERVAL_SECONDS, "_staging", false, null, 2L, - EMPTY_DATASET_NAME, false); + EMPTY_DATASET_NAME, false, false); eventConsumer.start(); generateDDL(eventConsumer, tables); diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryConsumerUnorderedSourceTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryConsumerUnorderedSourceTest.java index a130eef..ff4d86c 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryConsumerUnorderedSourceTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryConsumerUnorderedSourceTest.java @@ -156,7 +156,7 @@ public void testSnapshotAndConcurrentUpdateEvents() throws Exception { String dataset = "unordered_scenario_1"; BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(context, storage, bigQuery, bucket, project, 1, STAGING_TABLE_PREFIX, false, null, null, - dataset, false); + dataset, false, false); List tableNames = Arrays.asList("users1", "users2"); try { long sequenceNum = createDatasetAndTable(eventConsumer, dataset, tableNames); @@ -283,7 +283,7 @@ public void testSnapshotAndConcurrentUpdateEvents() throws Exception { public void testConcurrentUpdateEventsWithoutSnapshot() throws Exception { BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(context, storage, bigQuery, bucket, project, 1, STAGING_TABLE_PREFIX, false, null, null, - null, false); + null, false, false); String dataset = "unordered_scenario_2"; List tableNames = Arrays.asList("users1", "users2"); try { @@ -397,7 +397,7 @@ public void testConcurrentUpdatesWithAlterEvent() throws Exception { String dataset = "unordered_scenario_3"; BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(context, storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, false, null, null, - dataset, false); + dataset, false, false); List tableNames = Arrays.asList("users1", "users2"); try { long sequenceNum = createDatasetAndTable(eventConsumer, dataset, tableNames); diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryEventConsumerTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryEventConsumerTest.java index 343bf4b..a6d9ffa 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryEventConsumerTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryEventConsumerTest.java @@ -148,7 +148,8 @@ public void testCreateTableWithClustering() throws Exception { runtimeArguments.put("gcp.bigquery.max.clustering.columns", "4"); BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(new MockContext(300, runtimeArguments), storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, - true, null, 1L, null, false); + true, null, 1L, null, false, + false); String dataset = "testTableCreationWithClustering"; String tableName = "users"; List primaryKeys = new ArrayList<>(); @@ -193,7 +194,7 @@ public void testHandleCreateTableAlreadyExists() throws Exception { runtimeArguments.put("gcp.bigquery.max.clustering.columns", "4"); BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(new MockContext(300, runtimeArguments), storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, - true, null, 1L, null, false); + true, null, 1L, null, false, false); String dataset = "testTableCreationWithClustering_" + UUID.randomUUID().toString().replaceAll("-", "_"); String tableName = "users"; List primaryKeys = new ArrayList<>(); @@ -211,7 +212,7 @@ public void testHandleCreateTableAlreadyExists() throws Exception { TableId tableId = TableId.of(dataset, tableName); StandardTableDefinition tableDefinition = StandardTableDefinition.newBuilder() - .setSchema(Schemas.convert(schema)) + .setSchema(Schemas.convert(schema, false)) .build(); TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition); TableInfo tableInfo = builder.build(); @@ -243,7 +244,8 @@ public void testCreateTableWithInvalidTypesForClustering() throws Exception { Bucket bucket = storage.create(BucketInfo.of(bucketName)); BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(new MockContext(300, Collections.emptyMap()), storage, bigQuery, bucket, project, 0, - STAGING_TABLE_PREFIX, true, null, 1L, null, false); + STAGING_TABLE_PREFIX, true, null, 1L, null, false, + false); String dataset = "testInvalidTypesForClustering"; String allinvalidsTableName = "allinvalids"; @@ -320,7 +322,8 @@ public void testManualDropRetries() throws Exception { BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(new MockContext(300, new HashMap()), storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, - true, null, 1L, null, false); + true, null, 1L, null, false, + false); String dataset = "testManualDropRetries"; String tableName = "users"; @@ -364,7 +367,7 @@ public void testManualDrops() throws Exception { BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(MockContext.INSTANCE, storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, true, null, null, - null, false); + null, false, false); String dataset = "testManualDrops"; String tableName = "users"; @@ -435,7 +438,7 @@ public void testAlter() throws Exception { BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(MockContext.INSTANCE, storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, false, null, null, - null, false); + null, false, false); String dataset = "testAlter"; String tableName = "users"; @@ -493,7 +496,7 @@ public void testInsertUpdateDelete() throws Exception { BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(MockContext.INSTANCE, storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, false, null, null, - null, false); + null, false, false); String dataset = "testInsertUpdateDelete"; try { @@ -510,7 +513,7 @@ public void testInsertTruncate() throws Exception { BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(MockContext.INSTANCE, storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, false, null, null, - null, false); + null, false, false); String dataset = "testInsertTruncate"; try { @@ -527,7 +530,7 @@ public void testSoftDeletes() throws Exception { BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(MockContext.INSTANCE, storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, false, null, null, - null, true); + null, true, false); String dataset = "testInsertUpdateSoftDelete"; try { @@ -543,7 +546,7 @@ public void testBigQuerySchemaNormalization() throws Exception { Bucket bucket = storage.create(BucketInfo.of(bucketName)); BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(MockContext.INSTANCE, storage, bigQuery, bucket, project, 0, STAGING_TABLE_PREFIX, false, null, null, - null, false); + null, false, false); String dataset = "testSchemaNormalization"; String tableName = "test_table"; diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java index 2d929ce..0c385ee 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java @@ -153,23 +153,113 @@ public void testNormalizeTableName() { @Test public void testNormalizeFieldName() { + int maxColumnNameLength = 300; + String prefix = "_"; // only contains number and letter - assertEquals("a2fs", BigQueryUtils.normalizeFieldName("a2fs")); + assertEquals("a2fs", BigQueryUtils.normalizeFieldName("a2fs", false)); // only contains number and letter start with number - assertEquals("_2fas", BigQueryUtils.normalizeFieldName("2fas")); - // only contains number and letter and length is 128 - String name = Strings.repeat("a1", 64); - assertEquals(name, BigQueryUtils.normalizeFieldName(name)); - // only contains number and letter, starts with number and length is 128 - name = Strings.repeat("1a", 64); - assertEquals("_" + name.substring(0, 127), BigQueryUtils.normalizeFieldName(name)); + assertEquals("_2fas", BigQueryUtils.normalizeFieldName("2fas", false)); + // only contains number and letter and length is 300 + String name = Strings.repeat("a1", 150); + assertEquals(name, BigQueryUtils.normalizeFieldName(name, false)); + // only contains number and letter, starts with number and length is 300 + name = Strings.repeat("1a", 150); + assertEquals(prefix + name.substring(0, maxColumnNameLength - prefix.length()), + BigQueryUtils.normalizeFieldName(name, false)); // only contains number and letter and length is 130 - name = Strings.repeat("a1", 65); - assertEquals(name.substring(0, 128), BigQueryUtils.normalizeFieldName(name)); + name = Strings.repeat("a1", 151); + assertEquals(name.substring(0, maxColumnNameLength), BigQueryUtils.normalizeFieldName(name, false)); // contains invalid character - assertEquals("ab_c", BigQueryUtils.normalizeFieldName("ab?/c")); + assertEquals("ab_c", BigQueryUtils.normalizeFieldName("ab?/c", false)); // contains space - assertEquals("a2_fs", BigQueryUtils.normalizeFieldName("a2 fs")); + assertEquals("a2_fs", BigQueryUtils.normalizeFieldName("a2 fs", false)); + // contains hyphen + assertEquals("a2-fs", BigQueryUtils.normalizeFieldName("a2-fs", true)); + // contains chinese character + assertEquals("你好世界", BigQueryUtils.normalizeFieldName("你好世界", true)); + // contains japanese character + assertEquals("こんにちは世界", BigQueryUtils.normalizeFieldName("こんにちは世界", true)); + // contains emoji + assertEquals("_", BigQueryUtils.normalizeFieldName("👍", true)); + + // Testing valid characters + + // underscore is a valid character + assertEquals("valid_", BigQueryUtils.normalizeFieldName("valid_", true)); + // space is a valid character + assertEquals("Space is valid", BigQueryUtils.normalizeFieldName("Space is valid", true)); + // ampersand is valid + assertEquals("ampersand&", BigQueryUtils.normalizeFieldName("ampersand&", true)); + // percent is valid and not replaced + assertEquals("percent%", BigQueryUtils.normalizeFieldName("percent%", true)); + // equals is valid and not replaced + assertEquals("equals=", BigQueryUtils.normalizeFieldName("equals=", true)); + // plus is valid and not replaced + assertEquals("plus+", BigQueryUtils.normalizeFieldName("plus+", true)); + // colon is valid and not replaced + assertEquals("colon:", BigQueryUtils.normalizeFieldName("colon:", true)); + // apostrophe is valid and not replaced + assertEquals("apostrophe'", BigQueryUtils.normalizeFieldName("apostrophe'", true)); + // less than is valid and not replaced + assertEquals("less_than<", BigQueryUtils.normalizeFieldName("less_than<", true)); + // greater than is valid and not replaced + assertEquals("greater_than>", BigQueryUtils.normalizeFieldName("greater_than>", true)); + // number sign is valid and not replaced + assertEquals("number_sign#", BigQueryUtils.normalizeFieldName("number_sign#", true)); + // vertical line is valid and not replaced + assertEquals("vertical_line|", BigQueryUtils.normalizeFieldName("vertical_line|", true)); + + // Testing invalid characters + + // test for tab + assertEquals("tab_", BigQueryUtils.normalizeFieldName("tab\t", true)); + // exclamation is replaced with underscore + assertEquals("exclamation_", BigQueryUtils.normalizeFieldName("exclamation!", true)); + // quotation is replaced with underscore + assertEquals("quotation_", BigQueryUtils.normalizeFieldName("quotation\"", true)); + // dollar is replaced with underscore + assertEquals("dollar_", BigQueryUtils.normalizeFieldName("dollar$", true)); + // left parenthesis is replaced with underscore + assertEquals("left_parenthesis_", BigQueryUtils.normalizeFieldName("left_parenthesis(", true)); + // right parenthesis is replaced with underscore + assertEquals("right_parenthesis_", BigQueryUtils.normalizeFieldName("right_parenthesis)", true)); + // asterisk is replaced with underscore + assertEquals("asterisk_", BigQueryUtils.normalizeFieldName("asterisk*", true)); + // comma is replaced with underscore + assertEquals("comma_", BigQueryUtils.normalizeFieldName("comma,", true)); + // period is replaced with underscore + assertEquals("period_", BigQueryUtils.normalizeFieldName("period.", true)); + // slash is replaced with underscore + assertEquals("slash_", BigQueryUtils.normalizeFieldName("slash/", true)); + // semicolon is replaced with underscore + assertEquals("semicolon_", BigQueryUtils.normalizeFieldName("semicolon;", true)); + // question mark is replaced with underscore + assertEquals("question_mark_", BigQueryUtils.normalizeFieldName("question_mark?", true)); + // at sign is replaced with underscore + assertEquals("at_sign_", BigQueryUtils.normalizeFieldName("at_sign@", true)); + // left square bracket is replaced with underscore + assertEquals("left_square_bracket_", BigQueryUtils.normalizeFieldName("left_square_bracket[", true)); + // backslash is replaced with underscore + assertEquals("backslash_", BigQueryUtils.normalizeFieldName("backslash\\", true)); + // right square bracket is replaced with underscore + assertEquals("right_square_bracket_", BigQueryUtils.normalizeFieldName("right_square_bracket]", true)); + // circumflex accent is replaced with underscore + assertEquals("circumflex_accent_", BigQueryUtils.normalizeFieldName("circumflex_accent^", true)); + // grave accent is replaced with underscore + assertEquals("grave_accent_", BigQueryUtils.normalizeFieldName("grave_accent`", true)); + // left curly bracket is replaced with underscore + assertEquals("left_curly_bracket_", BigQueryUtils.normalizeFieldName("left_curly_bracket{", true)); + // right curly bracket is replaced with underscore + assertEquals("right_curly_bracket_", BigQueryUtils.normalizeFieldName("right_curly_bracket}", true)); + // tilde is replaced with underscore + assertEquals("tilde_", BigQueryUtils.normalizeFieldName("tilde~", true)); + + // mixed valid and invalid characters + assertEquals("mixed%valid_invalid_", BigQueryUtils.normalizeFieldName("mixed%valid?invalid@", true)); + + // test for 2 space + assertEquals("a2 fs", BigQueryUtils.normalizeFieldName("a2 fs", true)); + } @Test diff --git a/widgets/bigquery-cdcTarget.json b/widgets/bigquery-cdcTarget.json index 17edba1..f7eb4f9 100644 --- a/widgets/bigquery-cdcTarget.json +++ b/widgets/bigquery-cdcTarget.json @@ -46,6 +46,22 @@ "label": "Dataset Name", "widget-type": "textbox" }, + { + "name": "allowFlexibleColumnNaming", + "label": "Allow Flexible Column Naming", + "widget-type": "toggle", + "widget-attributes": { + "default": "false", + "on": { + "value": "true", + "label": "Yes" + }, + "off": { + "value": "false", + "label": "No" + } + } + }, { "widget-type": "textbox", "label": "Encryption Key Name",