Skip to content

Commit

Permalink
source-postgres-batch: Emit partial-progress checkpoints
Browse files Browse the repository at this point in the history
It's somewhat debatable how useful partial-progress checkpoints
are nowadays since there are no hard limits on Flow transaction
sizing or anything, but every other Batch SQL capture has them,
so it's probably good for `source-postgres-batch` to have them
as well for consistency if nothing else.
  • Loading branch information
willdonnelly committed Aug 15, 2024
1 parent b8164e7 commit 882151d
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions source-postgres-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ import (
"golang.org/x/sync/errgroup"
)

const (
// When processing large tables we like to emit checkpoints every so often. But
// emitting a checkpoint after every row could be a significant drag on overall
// throughput, and isn't really needed anyway. So instead we emit one for every
// N rows, plus another when the query results are fully processed.
documentsPerCheckpoint = 1000
)

// BatchSQLDriver represents a generic "batch SQL" capture behavior, parameterized
// by a config schema, connect function, and value translation logic.
type BatchSQLDriver struct {
Expand Down Expand Up @@ -712,6 +720,11 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template
state.CursorValues = cursorValues

count++
if count%documentsPerCheckpoint == 0 {
if err := c.streamStateCheckpoint(stateKey, state); err != nil {
return err
}
}
}

log.WithFields(log.Fields{
Expand Down

0 comments on commit 882151d

Please sign in to comment.