Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jun 27, 2023
2 parents ad47a32 + c93e274 commit 7ff2cb5
Show file tree
Hide file tree
Showing 62 changed files with 4,374 additions and 461 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/dev-debian.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ jobs:
- name: install cargo binaries
run: |
cargo binstall --no-confirm --no-symlinks cargo-deb cargo-zigbuild
cargo binstall --no-confirm --no-symlinks cargo-deb
- uses: dtolnay/rust-toolchain@stable
with:
targets: x86_64-unknown-linux-musl

- uses: goto-bus-stop/setup-zig@v2

- name: build project release
working-directory: ./nexus
run: cargo zigbuild --release --target=x86_64-unknown-linux-musl
run: cargo build --release --target=x86_64-unknown-linux-musl

- name: create peerdb-server deb package
working-directory: ./nexus/
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:

jobs:
flow_test:
runs-on: ubuntu-latest
runs-on: ubuntu-latest-16-cores
timeout-minutes: 30
services:
pg_cdc:
Expand Down Expand Up @@ -57,7 +57,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 3 ./...
gotestsum --format testname -- -p 1 ./...
working-directory: ./flow
env:
TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json
Expand Down
6 changes: 2 additions & 4 deletions .github/workflows/stable-debian.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ jobs:
- name: install cargo binaries
run: |
cargo binstall --no-confirm --no-symlinks cargo-deb cargo-zigbuild
cargo binstall --no-confirm --no-symlinks cargo-deb
- uses: dtolnay/rust-toolchain@stable
with:
targets: x86_64-unknown-linux-musl

- uses: goto-bus-stop/setup-zig@v2

- name: Set Cargo version as Git tag
working-directory: ./nexus/server
run: |
Expand All @@ -47,7 +45,7 @@ jobs:
- name: build project release
working-directory: ./nexus
run: cargo zigbuild --release --target=x86_64-unknown-linux-musl
run: cargo build --release --target=x86_64-unknown-linux-musl

- name: create peerdb-server deb package
working-directory: ./nexus/
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ Query supported data-stores with a Postgres-compatible SQL interface
| Data-store | Support | Status |
| --- | --- | --- |
| BigQuery | SELECT commands | STABLE |
| Snowflake | SELECT commands | Under development |
| PostgreSQL | DML + SELECT commands | Under development |
| Snowflake | SELECT commands | Beta |
| PostgreSQL | DML + SELECT commands | Beta |

### PeerDB MIRROR

Expand All @@ -84,8 +84,8 @@ Real-time syncing of data from source to target based on change-feed or CDC (log

| Feature | Source | Target | Status |
| --- | --- | --- | --- |
| CDC | PostgreSQL | BigQuery | Under development |
| CDC | PostgreSQL | Snowflake | Under development |
| CDC | PostgreSQL | BigQuery | Beta |
| CDC | PostgreSQL | Snowflake | Beta |
| Initial Load | PostgreSQL | BigQuery | Coming Soon! |
| Initial Load | PostgreSQL | Snowflake | Coming Soon! |

Expand Down
72 changes: 58 additions & 14 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,53 @@ version: '3.9'

services:
temporalite:
container_name: temporalite
image: slamdev/temporalite:0.3.0
entrypoint: |
temporalite start -n default --ephemeral --ip 0.0.0.0 --log-level warn
volumes:
- temporalitedata:/data
ports:
- 7233:7233
- 8233:8233

temporal-admin-tools:
container_name: temporal-admin-tools
depends_on:
- temporalite
environment:
- TEMPORAL_CLI_ADDRESS=temporalite:7233
image: temporalio/admin-tools:1.17.5
stdin_open: true
tty: true
healthcheck:
test:
[
"CMD",
"tctl",
"--address",
"temporalite:7233",
"workflow",
"list"
]
interval: 1s
timeout: 5s
retries: 30

catalog:
container_name: catalog
image: debezium/postgres:14-alpine
ports:
# mapping is from host to container
- 5432:5432
environment:
POSTGRES_USER: postgres
PGUSER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
extra_hosts:
- "host.docker.internal:host-gateway"
command:
[
"postgres",
"-c",
"log_statement=all",
"-c",
"log_destination=stderr"
]
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test:
[
Expand All @@ -42,6 +65,7 @@ services:
start_period: 60s

flow_api:
container_name: flow_api
build:
context: .
dockerfile: stacks/flow-api.Dockerfile
Expand All @@ -50,18 +74,32 @@ services:
environment:
CATALOG_DSN: postgres://postgres:postgres@catalog:5432/postgres?sslmode=disable
TEMPORAL_HOST_PORT: temporalite:7233
GIN_MODE: release
depends_on:
- catalog
- temporalite
catalog:
condition: service_healthy
temporal-admin-tools:
condition: service_healthy
healthcheck:
test: "curl --fail http://flow_api:8112/health || exit 1"
interval: 5s
timeout: 5s
retries: 5

flow_worker:
container_name: flow_worker
build:
context: .
dockerfile: stacks/flow-worker.Dockerfile
environment:
ENABLE_PROFILING: true
PROFILING_SERVER: 0.0.0.0:6060
TEMPORAL_HOST_PORT: temporalite:7233
ports:
- 6060:6060
depends_on:
- temporalite
temporal-admin-tools:
condition: service_healthy

peerdb:
build:
Expand All @@ -79,5 +117,11 @@ services:
ports:
- 9900:9900
depends_on:
- catalog
- flow_api
catalog:
condition: service_healthy
flow_api:
condition: service_healthy

volumes:
pgdata:
temporalitedata:
6 changes: 3 additions & 3 deletions flow/activities/fetch_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func (a *FetchConfigActivity) FetchConfig(
return nil, fmt.Errorf("failed to create connection pool: %w", err)
}

sourceConnectionConfig, err := fetchPeerConfig(ctx, pool, input.PeerFlowName, "source_peer")
sourceConnectionConfig, err := FetchPeerConfig(ctx, pool, input.PeerFlowName, "source_peer")
if err != nil {
return nil, fmt.Errorf("failed to unmarshal source connection config: %w", err)
}

destinationConnectionConfig, err := fetchPeerConfig(ctx, pool, input.PeerFlowName, "destination_peer")
destinationConnectionConfig, err := FetchPeerConfig(ctx, pool, input.PeerFlowName, "destination_peer")
if err != nil {
return nil, fmt.Errorf("failed to unmarshal destination connection config: %w", err)
}
Expand Down Expand Up @@ -73,7 +73,7 @@ func (a *FetchConfigActivity) FetchConfig(
}

// fetchPeerConfig retrieves the config for a given peer by join label.
func fetchPeerConfig(ctx context.Context, pool *pgxpool.Pool, flowName string, label string) (*protos.Peer, error) {
func FetchPeerConfig(ctx context.Context, pool *pgxpool.Pool, flowName string, label string) (*protos.Peer, error) {
var name string
var dbtype int32
var opts []byte
Expand Down
12 changes: 12 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type IFlowable interface {
// ReplicateQRepPartition replicates a QRepPartition from the source to the destination.
ReplicateQRepPartition(ctx context.Context, partition *protos.QRepPartition) error

// ConsolidateQRepPartitions consolidates the QRepPartitions into the destination.
ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error

DropFlow(ctx context.Context, config *protos.DropFlowInput) error
}

Expand Down Expand Up @@ -346,6 +349,15 @@ func (a *FlowableActivity) ReplicateQRepPartition(ctx context.Context,
return nil
}

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error {
dst, err := connectors.GetConnector(ctx, config.DestinationPeer)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
}

return dst.ConsolidateQRepPartitions(config)
}

func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.FlowConnectionConfigs) error {
src, err := connectors.GetConnector(ctx, config.Source)
defer connectors.CloseConnector(src)
Expand Down
79 changes: 79 additions & 0 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"time"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -89,18 +90,96 @@ func (a *APIServer) StartPeerFlow(reqCtx context.Context, input *peerflow.PeerFl
return workflowID, nil
}

func genConfigForQRepFlow(config *protos.QRepConfig, flowOptions map[string]interface{},
queryString string, destinationTableIdentifier string) error {
config.InitialCopyOnly = false
config.MaxParallelWorkers = uint32(flowOptions["parallelism"].(float64))
config.DestinationTableIdentifier = destinationTableIdentifier
config.Query = queryString
config.WatermarkColumn = flowOptions["watermark_column"].(string)
config.WatermarkTable = flowOptions["watermark_table_name"].(string)
config.BatchSizeInt = uint32(flowOptions["batch_size_int"].(float64))
config.BatchDurationSeconds = uint32(flowOptions["batch_duration_timestamp"].(float64))
config.WaitBetweenBatchesSeconds = uint32(flowOptions["refresh_interval"].(float64))
if flowOptions["sync_data_format"].(string) == "avro" {
config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO
} else if flowOptions["sync_data_format"].(string) == "default" {
config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT
} else {
return fmt.Errorf("unsupported sync data format: %s", flowOptions["sync_data_format"].(string))
}
if flowOptions["mode"].(string) == "append" {
tempWriteMode := &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
}
config.WriteMode = tempWriteMode
} else if flowOptions["mode"].(string) == "upsert" {
upsertKeyColumns := make([]string, 0)
for _, column := range flowOptions["unique_key_columns"].([]interface{}) {
upsertKeyColumns = append(upsertKeyColumns, column.(string))
}

tempWriteMode := &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT,
UpsertKeyColumns: upsertKeyColumns,
}
config.WriteMode = tempWriteMode
} else {
return fmt.Errorf("unsupported write mode: %s", flowOptions["mode"].(string))
}
return nil
}

func (a *APIServer) StartQRepFlow(reqCtx context.Context, config *protos.QRepConfig) (string, error) {
workflowID := fmt.Sprintf("%s-qrepflow-%s", config.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: shared.PeerFlowTaskQueue,
}

if config.SourcePeer == nil || config.DestinationPeer == nil {
sourcePeer, err := activities.FetchPeerConfig(reqCtx, a.pool, config.FlowJobName, "source_peer")
if err != nil {
return "", fmt.Errorf("unable to fetch source peer config: %w", err)
}
config.SourcePeer = sourcePeer

destinationPeer, err := activities.FetchPeerConfig(reqCtx, a.pool, config.FlowJobName, "destination_peer")
if err != nil {
return "", fmt.Errorf("unable to fetch destination peer config: %w", err)
}
config.DestinationPeer = destinationPeer

var destinationTableIdentifier string
var queryString string
var flowOptions map[string]interface{}
row := a.pool.QueryRow(reqCtx,
"SELECT DESTINATION_TABLE_IDENTIFIER, QUERY_STRING, FLOW_METADATA FROM FLOWS WHERE NAME = $1",
config.FlowJobName)
err = row.Scan(&destinationTableIdentifier, &queryString, &flowOptions)
if err != nil {
return "", fmt.Errorf("unable to fetch flow metadata: %w", err)
}

err = genConfigForQRepFlow(config, flowOptions, queryString, destinationTableIdentifier)
if err != nil {
return "", fmt.Errorf("unable to generate config for QRepFlow: %w", err)
}
}

lastPartition := &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
}
numPartitionsProcessed := 0

_, err := a.temporalClient.ExecuteWorkflow(
reqCtx, // context
workflowOptions, // workflow start options
peerflow.QRepFlowWorkflow, // workflow function
config, // workflow input
lastPartition, // last partition
numPartitionsProcessed, // number of partitions processed
)
if err != nil {
return "", fmt.Errorf("unable to start QRepFlow workflow: %w", err)
Expand Down
18 changes: 18 additions & 0 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ func main() {
EnvVars: []string{"TEMPORAL_HOST_PORT"},
}

profilingFlag := &cli.BoolFlag{
Name: "enable-profiling",
Value: false, // Default is off
Usage: "Enable profiling for the application",
EnvVars: []string{"ENABLE_PROFILING"},
}

profilingServerFlag := &cli.StringFlag{
Name: "profiling-server",
Value: "localhost:6060", // Default is localhost:6060
Usage: "HTTP server address for profiling",
EnvVars: []string{"PROFILING_SERVER"},
}

app := &cli.App{
Name: "PeerDB Flows CLI",
Commands: []*cli.Command{
Expand All @@ -38,10 +52,14 @@ func main() {
temporalHostPort := ctx.String("temporal-host-port")
return WorkerMain(&WorkerOptions{
TemporalHostPort: temporalHostPort,
EnableProfiling: ctx.Bool("enable-profiling"),
ProfilingServer: ctx.String("profiling-server"),
})
},
Flags: []cli.Flag{
temporalHostPortFlag,
profilingFlag,
profilingServerFlag,
},
},
{
Expand Down
Loading

0 comments on commit 7ff2cb5

Please sign in to comment.