Skip to content

Commit

Permalink
#1040 bulker: flatter don't omit nil values for sources sync and for …
Browse files Browse the repository at this point in the history
…file storages

bulker: bigquery: Use Primary Keys
bulker: use daily partitioning
bulker, syncctl, ingest: /health endpoints return JSON
  • Loading branch information
absorbb committed Dec 5, 2023
1 parent 582b538 commit fd65068
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 65 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/tag-release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: CI
on:
push:
tags:
- 'v[0-9]+.[0-9]+.[0-9]+'

jobs:
docker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
ref: ${{ github.ref }}
# Make sure the value of GITHUB_TOKEN will not be persisted in repo's config
persist-credentials: false
- name: Set outputs
id: vars
run: |
echo "timestamp=$(date '+%F_%H:%M:%S')" >> $GITHUB_OUTPUT
- uses: benjlevesque/short-sha@v2.2
id: short-sha
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
- name: Build and push
uses: docker/build-push-action@v5
with:
file: "bulker.Dockerfile"
push: true
build-args: |
VERSION=${{ github.ref_name }}
BUILD_TIMESTAMP=${{ steps.vars.outputs.timestamp }}
tags: jitsucom/bulker:latest,jitsucom/bulker:${{ github.ref_name }}
62 changes: 32 additions & 30 deletions .github/workflows/test-build-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on:
push:
branches:
- 'main'
tags-ignore:
- 'v[0-9]+.[0-9]+.[0-9]+'
workflow_dispatch:

env:
Expand All @@ -20,7 +22,7 @@ jobs:
- uses: actions/checkout@v3
with:
fetch-depth: 0
ref: ${{ github.head_ref }}
ref: ${{ github.ref }}
# Make sure the value of GITHUB_TOKEN will not be persisted in repo's config
persist-credentials: false

Expand Down Expand Up @@ -57,32 +59,32 @@ jobs:
SLACK_COLOR: ${{ job.status }}
SLACK_ICON_EMOJI: ":github-mark:"
SLACK_MESSAGE: "${{ job.status }}: Bulker Test: ${{ github.event.head_commit.message }}"

docker:
needs: bulker-test
runs-on: ubuntu-latest
steps:
- name: Set outputs
id: vars
run: |
echo "timestamp=$(date '+%F_%H:%M:%S')" >> $GITHUB_OUTPUT
- uses: benjlevesque/short-sha@v2.2
id: short-sha
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
- name: Build and push
uses: docker/build-push-action@v5
with:
file: "bulker.Dockerfile"
push: true
build-args: |
VERSION=${{ steps.short-sha.outputs.sha }}
BUILD_TIMESTAMP=${{ steps.vars.outputs.timestamp }}
tags: jitsucom/bulker:latest,jitsucom/bulker:${{ steps.short-sha.outputs.sha }}
#
# docker:
# needs: bulker-test
# runs-on: ubuntu-latest
# steps:
# - name: Set outputs
# id: vars
# run: |
# echo "timestamp=$(date '+%F_%H:%M:%S')" >> $GITHUB_OUTPUT
# - uses: benjlevesque/short-sha@v2.2
# id: short-sha
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v3
# - name: Set up Docker Buildx
# uses: docker/setup-buildx-action@v3
# - name: Login to Docker Hub
# uses: docker/login-action@v3
# with:
# username: ${{ secrets.DOCKERHUB_USERNAME }}
# password: ${{ secrets.DOCKERHUB_PASSWORD }}
# - name: Build and push
# uses: docker/build-push-action@v5
# with:
# file: "bulker.Dockerfile"
# push: true
# build-args: |
# VERSION=${{ steps.short-sha.outputs.sha }}
# BUILD_TIMESTAMP=${{ steps.vars.outputs.timestamp }}
# tags: jitsucom/bulker:latest,jitsucom/bulker:${{ steps.short-sha.outputs.sha }}
31 changes: 18 additions & 13 deletions bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hjson/hjson-go/v4"
"github.com/jitsucom/bulker/bulkerapp/metrics"
bulker "github.com/jitsucom/bulker/bulkerlib"
"github.com/jitsucom/bulker/bulkerlib/implementations/sql"
"github.com/jitsucom/bulker/bulkerlib/types"
"github.com/jitsucom/bulker/eventslog"
"github.com/jitsucom/bulker/jitsubase/appbase"
Expand Down Expand Up @@ -47,7 +48,7 @@ type Router struct {
func NewRouter(appContext *Context) *Router {
authTokens := strings.Split(appContext.config.AuthTokens, ",")
tokenSecrets := strings.Split(appContext.config.TokenSecrets, ",")
base := appbase.NewRouterBase(authTokens, tokenSecrets, []string{"/ready"})
base := appbase.NewRouterBase(authTokens, tokenSecrets, []string{"/ready", "/health"})

router := &Router{
Router: base,
Expand All @@ -67,18 +68,8 @@ func NewRouter(appContext *Context) *Router {
fast.POST("/ingest", router.IngestHandler)
fast.POST("/test", router.TestConnectionHandler)
fast.GET("/log/:eventType/:actorId", router.EventsLogHandler)
fast.GET("/ready", func(c *gin.Context) {
if router.kafkaConfig == nil {
c.Status(http.StatusOK)
return
}
if router.topicManager.IsReady() {
c.Status(http.StatusOK)
} else {
logging.Errorf("Health check: FAILED")
c.AbortWithStatus(http.StatusServiceUnavailable)
}
})
fast.GET("/ready", router.Health)
fast.GET("/health", router.Health)

engine.POST("/bulk/:destinationId", router.BulkHandler)
engine.GET("/failed/:destinationId", router.FailedHandler)
Expand All @@ -97,6 +88,19 @@ func NewRouter(appContext *Context) *Router {
return router
}

func (r *Router) Health(c *gin.Context) {
if r.kafkaConfig == nil {
c.JSON(http.StatusOK, gin.H{"status": "pass"})
return
}
if r.topicManager.IsReady() {
c.JSON(http.StatusOK, gin.H{"status": "pass"})
} else {
logging.Errorf("Health check: FAILED")
c.JSON(http.StatusServiceUnavailable, gin.H{"status": "fail", "output": "topic manager is not ready"})
}
}

func (r *Router) EventsHandler(c *gin.Context) {
destinationId := c.Param("destinationId")
tableName := c.Query("tableName")
Expand Down Expand Up @@ -188,6 +192,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
if len(pkeys) > 0 {
streamOptions = append(streamOptions, bulker.WithPrimaryKey(pkeys...), bulker.WithDeduplicate())
}
streamOptions = append(streamOptions, sql.WithoutOmitNils())
destination.InitBulkerInstance()
bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/file_storage/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (ps *AbstractFileStorageStream) init(ctx context.Context) error {

func (ps *AbstractFileStorageStream) preprocess(object types2.Object) (types2.Object, error) {
if ps.flatten {
flatObject, err := implementations2.DefaultFlattener.FlattenObject(object, nil)
flatObject, err := implementations2.NewFlattener(false).FlattenObject(object, nil)
if err != nil {
return nil, err
} else {
Expand Down
6 changes: 2 additions & 4 deletions bulkerlib/implementations/flattener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"reflect"
)

var DefaultFlattener = NewFlattener()

type Flattener interface {
FlattenObject(object map[string]any, sqlTypeHints types.SQLTypes) (map[string]any, error)
}
Expand All @@ -17,9 +15,9 @@ type FlattenerImpl struct {
omitNilValues bool
}

func NewFlattener() Flattener {
func NewFlattener(omitNilValues bool) Flattener {
return &FlattenerImpl{
omitNilValues: true,
omitNilValues: omitNilValues,
}
}

Expand Down
4 changes: 3 additions & 1 deletion bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type AbstractSQLStream struct {
tableName string
merge bool
mergeWindow int
omitNils bool

state bulker.State
inited bool
Expand Down Expand Up @@ -50,6 +51,7 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
var customFields = ColumnTypesOption.Get(&ps.options)
ps.pkColumns = pkColumns.ToSlice()
ps.timestampColumn = bulker.TimestampOption.Get(&ps.options)
ps.omitNils = OmitNilsOption.Get(&ps.options)

//TODO: max column?
ps.state = bulker.State{Status: bulker.Active}
Expand All @@ -61,7 +63,7 @@ func (ps *AbstractSQLStream) preprocess(object types.Object) (*Table, types.Obje
if ps.state.Status != bulker.Active {
return nil, nil, fmt.Errorf("stream is not active. Status: %s", ps.state.Status)
}
batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes)
batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes, ps.omitNils)
if err != nil {
return nil, nil, err
}
Expand Down
53 changes: 40 additions & 13 deletions bulkerlib/implementations/sql/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const (
bigqueryTruncateTemplate = "TRUNCATE TABLE %s"
bigquerySelectTemplate = "SELECT %s FROM %s%s%s"

bigqueryPKHashLabel = "jitsu_pk_hash"
bigqueryPKNameLabel = "jitsu_pk_name"

bigqueryRowsLimitPerInsertOperation = 500
)

Expand Down Expand Up @@ -281,14 +284,15 @@ func (bq *BigQuery) GetTableSchema(ctx context.Context, tableName string) (*Tabl
dt, _ := bq.GetDataType(string(field.Type))
table.Columns[field.Name] = types2.SQLColumn{Type: string(field.Type), DataType: dt}
}
for k, v := range meta.Labels {
if strings.HasPrefix(k, BulkerManagedPkConstraintPrefix) {
pkFields := strings.Split(v, "---")
for _, pkField := range pkFields {
table.PKFields.Put(pkField)
}
table.PrimaryKeyName = k
break
jitsuPKName := meta.Labels[bigqueryPKNameLabel]
jitsuPKHash := meta.Labels[bigqueryPKHashLabel]

tableConstraints := meta.TableConstraints
if tableConstraints != nil && tableConstraints.PrimaryKey != nil {
table.PKFields.PutAll(tableConstraints.PrimaryKey.Columns)
if table.PKFields.Hash() == jitsuPKHash {
// PK was not changed since Jitsu managed this table
table.PrimaryKeyName = jitsuPKName
}
}
if meta.TimePartitioning != nil {
Expand Down Expand Up @@ -336,16 +340,25 @@ func (bq *BigQuery) CreateTable(ctx context.Context, table *Table) (err error) {
bigQueryType := bigquery.FieldType(strings.ToUpper(column.GetDDLType()))
bqSchema = append(bqSchema, &bigquery.FieldSchema{Name: bq.ColumnName(columnName), Type: bigQueryType})
}
var tableConstraints *bigquery.TableConstraints
var labels map[string]string
if len(table.PKFields) > 0 && table.PrimaryKeyName != "" {
labels = map[string]string{table.PrimaryKeyName: strings.Join(table.GetPKFields(), "---")}
tableConstraints = &bigquery.TableConstraints{
PrimaryKey: &bigquery.PrimaryKey{
Columns: table.GetPKFields(),
},
}
labels = map[string]string{
bigqueryPKHashLabel: table.PKFields.Hash(),
bigqueryPKNameLabel: table.PrimaryKeyName,
}
}
tableMetaData := bigquery.TableMetadata{Name: tableName, Schema: bqSchema, Labels: labels}
tableMetaData := bigquery.TableMetadata{Name: tableName, Schema: bqSchema, TableConstraints: tableConstraints, Labels: labels}
if table.Partition.Field == "" && table.TimestampColumn != "" {
// partition by timestamp column
table = table.Clone()
table.Partition.Field = table.TimestampColumn
table.Partition.Granularity = MONTH
table.Partition.Granularity = DAY
}
if table.Partition.Field != "" && table.Partition.Granularity != ALL {
var partitioningType bigquery.TimePartitioningType
Expand Down Expand Up @@ -418,12 +431,26 @@ func (bq *BigQuery) PatchTableSchema(ctx context.Context, patchSchema *Table) er
}
//patch primary keys - delete old
if patchSchema.DeletePkFields {
metadata.Labels = map[string]string{}
delete(metadata.Labels, bigqueryPKHashLabel)
delete(metadata.Labels, bigqueryPKNameLabel)
if metadata.TableConstraints != nil {
metadata.TableConstraints.PrimaryKey = &bigquery.PrimaryKey{}
}
}

//patch primary keys - create new
if len(patchSchema.PKFields) > 0 && patchSchema.PrimaryKeyName != "" {
metadata.Labels = map[string]string{patchSchema.PrimaryKeyName: strings.Join(patchSchema.GetPKFields(), ",")}
if metadata.Labels == nil {
metadata.Labels = map[string]string{}
}
metadata.Labels[bigqueryPKHashLabel] = patchSchema.PKFields.Hash()
metadata.Labels[bigqueryPKNameLabel] = patchSchema.PrimaryKeyName
if metadata.TableConstraints == nil {
metadata.TableConstraints = &bigquery.TableConstraints{}
}
metadata.TableConstraints.PrimaryKey = &bigquery.PrimaryKey{
Columns: patchSchema.GetPKFields(),
}
}
for _, columnName := range patchSchema.SortedColumnNames() {
column := patchSchema.Columns[columnName]
Expand Down
15 changes: 15 additions & 0 deletions bulkerlib/implementations/sql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ var (
ParseFunc: utils.ParseInt,
}

OmitNilsOption = bulker.ImplementationOption[bool]{
Key: "omitNils",
DefaultValue: true,
ParseFunc: utils.ParseBool,
}

localBatchFileOption = bulker.ImplementationOption[string]{Key: "BULKER_OPTION_LOCAL_BATCH_FILE"}

s3BatchFileOption = bulker.ImplementationOption[*S3OptionConfig]{Key: "BULKER_OPTION_S3_BATCH_FILE"}
Expand All @@ -50,6 +56,7 @@ var (
func init() {
bulker.RegisterOption(&DeduplicateWindow)
bulker.RegisterOption(&ColumnTypesOption)
bulker.RegisterOption(&OmitNilsOption)
}

type S3OptionConfig struct {
Expand All @@ -60,6 +67,14 @@ type S3OptionConfig struct {
Folder string `mapstructure:"folder,omitempty" json:"folder,omitempty" yaml:"folder,omitempty"`
}

func WithOmitNils() bulker.StreamOption {
return bulker.WithOption(&OmitNilsOption, true)
}

func WithoutOmitNils() bulker.StreamOption {
return bulker.WithOption(&OmitNilsOption, false)
}

func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption {
return bulker.WithOption(&DeduplicateWindow, deduplicateWindow)
}
Expand Down
4 changes: 2 additions & 2 deletions bulkerlib/implementations/sql/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ const SqlTypePrefix = "__sql_type"
// ProcessEvents processes events objects without applying mapping rules
// returns table headerm array of processed objects
// or error if at least 1 was occurred
func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes) (*TypesHeader, types.Object, error) {
func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, omitNils bool) (*TypesHeader, types.Object, error) {
sqlTypesHints, err := extractSQLTypesHints(event)
if err != nil {
return nil, nil, err
}
for k, v := range customTypes {
sqlTypesHints[k] = v
}
flatObject, err := implementations.DefaultFlattener.FlattenObject(event, sqlTypesHints)
flatObject, err := implementations.NewFlattener(omitNils).FlattenObject(event, sqlTypesHints)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit fd65068

Please sign in to comment.