Skip to content

Commit

Permalink
[add] new sink bigquery
Browse files Browse the repository at this point in the history
  • Loading branch information
tuan-nguyen3-ts committed Dec 5, 2023
1 parent 2718cfc commit f907d0d
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 10 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
name: Publish package to GitHub Packages
on:
release:
types: [created]
push:
branches:
- 'main'
jobs:
publish:
runs-on: ubuntu-latest
Expand All @@ -15,6 +16,4 @@ jobs:
java-version: '11'
distribution: 'temurin'
- name: Publish package
run: mvn --batch-mode deploy
env:
GITHUB_TOKEN: ${{ secrets.REGISTRY_GITHUB_TOKEN }}
run: mvn clean package -DskipTests
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
debezium.sink.type=bigquerystream
debezium.sink.bigquerystream.project=${GCP_PROJECT_ID}
debezium.sink.bigquerystream.dataset=tuan_test
debezium.sink.bigquerystream.create-if-needed=true
debezium.sink.bigquerystream.partition-field=ts_ms
debezium.source.offset.storage.bigquery.table-name=debezium_offset_storage_custom_table

debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=src/main/resources/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=changelog

quarkus.log.console.json=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.drop.tombstones=false
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.add.fields=table,lsn



Binary file added cdc-debezium-server/src/main/resources/offsets.dat
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void initizalize() throws InterruptedException {
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
LOGGER.trace("Received {} events", records.size());

LOGGER.info("records: {}", records);
Instant start = Instant.now();
Map<String, List<DebeziumBigqueryEvent>> events = records.stream()
.map((ChangeEvent<Object, Object> e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public long uploadDestination(String destination, List<DebeziumBigqueryEvent> da
Instant start = Instant.now();
final long numRecords;
TableId tableId = getTableId(destination);

DebeziumBigqueryEvent sampleEvent = data.get(0);
Schema schema = sampleEvent.getBigQuerySchema(false, false);
if (schema == null) {
Expand All @@ -115,7 +114,7 @@ public long uploadDestination(String destination, List<DebeziumBigqueryEvent> da
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(writeDisposition))
.setClustering(clustering)
.setSchema(schema)
.setTimePartitioning(timePartitioning)
// .setTimePartitioning(timePartitioning)
.setSchemaUpdateOptions(schemaUpdateOptions)
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(createDisposition))
.setMaxBadRecords(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ private Table createTable(TableId tableId, Schema schema, Clustering clustering,
StandardTableDefinition tableDefinition =
StandardTableDefinition.newBuilder()
.setSchema(schema)
.setTimePartitioning(timePartitioning)
.setClustering(clustering)
// .setTimePartitioning(timePartitioning)
// .setClustering(clustering)
.setTableConstraints(tableConstraints)
.build();
TableInfo tableInfo =
Expand Down

0 comments on commit f907d0d

Please sign in to comment.