Skip to content

Commit

Permalink
Ingest service progress.
Browse files Browse the repository at this point in the history
Refactoring
  • Loading branch information
absorbb committed Dec 13, 2023
1 parent 1fb51a0 commit 3e3e569
Show file tree
Hide file tree
Showing 21 changed files with 874 additions and 158 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-build-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.21.4
go-version: 1.21.5
cache: false

- name: Bulker Test
Expand Down
2 changes: 1 addition & 1 deletion bulker.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN apt-get install -y ca-certificates curl

ENV TZ=UTC

FROM golang:1.21.4-bullseye as build
FROM golang:1.21.5-bullseye as build

ARG VERSION
ENV VERSION $VERSION
Expand Down
4 changes: 3 additions & 1 deletion bulkerapp/app/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewBatchConsumer(repository *Repository, destinationId string, batchPeriodS

func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum, batchSize, retryBatchSize int, highOffset int64) (counters BatchCounters, nextBatch bool, err error) {
counters.firstOffset = int64(kafka.OffsetBeginning)

startTime := time.Now()
var bulkerStream bulker.BulkerStream
ctx := context.WithValue(context.Background(), bulker.BatchNumberCtxKey, batchNum)

Expand Down Expand Up @@ -144,6 +144,7 @@ func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum
}
//treat failed message as processed
state.ProcessedRows++
state.ProcessingTimeSec = time.Since(startTime).Seconds()
bc.postEventsLog(state, processedObjectSample, err)
return counters, false, bc.NewError("Failed to process event to bulker stream: %v", err)
} else {
Expand All @@ -162,6 +163,7 @@ func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum
var state bulker.State
//TODO: do we need to interrupt commit if consumer is retired?
state, err = bulkerStream.Complete(ctx)
state.ProcessingTimeSec = time.Since(startTime).Seconds()
bc.postEventsLog(state, processedObjectSample, err)
if err != nil {
failedPosition = &latestMessage.TopicPartition
Expand Down
56 changes: 28 additions & 28 deletions bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,17 @@ func (r *Router) EventsHandler(c *gin.Context) {

destination := r.repository.GetDestination(destinationId)
if destination == nil {
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), "")
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), "", true)
return
}
mode = string(destination.Mode())
if tableName == "" {
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), "")
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), "", true)
return
}
topicId, err := destination.TopicId(tableName)
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't generate topicId", false, err, "")
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't generate topicId", false, err, "", true)
return
}
err = r.topicManager.EnsureDestinationTopic(destination, topicId)
Expand All @@ -137,20 +137,20 @@ func (r *Router) EventsHandler(c *gin.Context) {
if ok && kafkaErr.Code() == kafka.ErrTopicAlreadyExists {
r.Warnf("Topic %s already exists", topicId)
} else {
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't create topic", false, fmt.Errorf("topicId %s: %v", topicId, err), "")
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't create topic", false, fmt.Errorf("topicId %s: %v", topicId, err), "", true)
return
}
}

body, err := io.ReadAll(c.Request.Body)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, "")
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, "", true)
return
}
bytesRead = len(body)
err = r.producer.ProduceAsync(topicId, uuid.New(), body, map[string]string{MetricsMetaHeader: metricsMeta})
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, "")
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, "", true)
return
}
c.JSON(http.StatusOK, gin.H{"message": "ok"})
Expand Down Expand Up @@ -179,12 +179,12 @@ func (r *Router) BulkHandler(c *gin.Context) {

destination := r.repository.GetDestination(destinationId)
if destination == nil {
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), "")
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), "", true)
return
}
mode = string(destination.Mode())
if tableName == "" {
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), "")
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), "", true)
return
}
var streamOptions []bulker.StreamOption
Expand All @@ -195,7 +195,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
destination.InitBulkerInstance()
bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...)
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "create stream error", true, err, "")
rError = r.ResponseError(c, http.StatusInternalServerError, "create stream error", true, err, "", true)
return
}
scanner := bufio.NewScanner(c.Request.Body)
Expand All @@ -205,7 +205,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
eventBytes := scanner.Bytes()
if len(eventBytes) >= 5 && string(eventBytes[:5]) == "ABORT" {
_, _ = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "aborted", false, fmt.Errorf(string(eventBytes)), "")
rError = r.ResponseError(c, http.StatusBadRequest, "aborted", false, fmt.Errorf(string(eventBytes)), "", true)
return
}
bytesRead += len(eventBytes)
Expand All @@ -214,25 +214,25 @@ func (r *Router) BulkHandler(c *gin.Context) {
dec.UseNumber()
if err = dec.Decode(&obj); err != nil {
_, _ = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "unmarhsal error", false, err, "")
rError = r.ResponseError(c, http.StatusBadRequest, "unmarhsal error", false, err, "", true)
return
}
if _, _, err = bulkerStream.Consume(c, obj); err != nil {
_, _ = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "stream consume error", false, err, "")
rError = r.ResponseError(c, http.StatusBadRequest, "stream consume error", false, err, "", true)
return
}
consumed++
}
if err = scanner.Err(); err != nil {
_, _ = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "scanner error", false, err, "")
rError = r.ResponseError(c, http.StatusBadRequest, "scanner error", false, err, "", true)
return
}
if consumed > 0 {
state, err := bulkerStream.Complete(c)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "stream complete error", false, err, "")
rError = r.ResponseError(c, http.StatusBadRequest, "stream complete error", false, err, "", true)
return
}
r.Infof("Bulk stream for %s mode: %s Completed. Processed: %d in %dms.", jobId, mode, state.SuccessfulRows, time.Since(start).Milliseconds())
Expand Down Expand Up @@ -267,8 +267,8 @@ func (r *Router) IngestHandler(c *gin.Context) {
}
if rError != nil {
obj := map[string]any{"body": string(body), "error": rError.PublicError.Error(), "status": "FAILED"}
r.eventsLogService.PostAsync(&eventslog.ActorEvent{eventslog.EventTypeIncomingError, eventsLogId, obj})
r.eventsLogService.PostAsync(&eventslog.ActorEvent{eventslog.EventTypeIncomingAll, eventsLogId, obj})
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncomingError, ActorId: eventsLogId, Event: obj})
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncomingAll, ActorId: eventsLogId, Event: obj})
metrics.IngestHandlerRequests(domain, "error", rError.ErrorType).Inc()
_ = r.producer.ProduceAsync(r.config.KafkaDestinationsDeadLetterTopicName, uuid.New(), body, map[string]string{"error": rError.Error.Error()})
} else {
Expand All @@ -279,29 +279,29 @@ func (r *Router) IngestHandler(c *gin.Context) {
obj["status"] = "SKIPPED"
obj["error"] = "no destinations found for stream"
}
r.eventsLogService.PostAsync(&eventslog.ActorEvent{eventslog.EventTypeIncomingAll, eventsLogId, obj})
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncomingAll, ActorId: eventsLogId, Event: obj})
metrics.IngestHandlerRequests(domain, "success", "").Inc()
}
}()
body, err := io.ReadAll(c.Request.Body)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, "")
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, "", true)
return
}
ingestMessage := IngestMessage{}
err = jsoniter.Unmarshal(body, &ingestMessage)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error parsing IngestMessage", false, fmt.Errorf("%v: %s", err, string(body)), "")
rError = r.ResponseError(c, http.StatusBadRequest, "error parsing IngestMessage", false, fmt.Errorf("%v: %s", err, string(body)), "", true)
return
}
messageId := ingestMessage.MessageId
domain = utils.DefaultString(ingestMessage.Origin.Slug, ingestMessage.Origin.Domain)
r.Debugf("[ingest] Message ID: %s Domain: %s", messageId, domain)
logFormat := "[ingest] Message ID: %s Domain: %s"
logPrefix := fmt.Sprintf("[ingest] Message ID: %s Domain: %s", messageId, domain)
r.Debugf(logPrefix)

stream := r.getStream(ingestMessage)
if stream == nil {
rError = r.ResponseError(c, http.StatusBadRequest, "stream not found", false, nil, logFormat, messageId, domain)
rError = r.ResponseError(c, http.StatusBadRequest, "stream not found", false, nil, logPrefix, true)
return
}
eventsLogId = stream.Stream.Id
Expand All @@ -324,13 +324,13 @@ func (r *Router) IngestHandler(c *gin.Context) {
r.Debugf("[ingest] Message ID: %s Producing for: %s topic: %s key: %s", messageId, destination.ConnectionId, topic, messageKey)
if err != nil {
metrics.IngestedMessages(destination.ConnectionId, "error", "message marshal error").Inc()
rError = r.ResponseError(c, http.StatusBadRequest, "message marshal error", false, err, logFormat, messageId, domain)
rError = r.ResponseError(c, http.StatusBadRequest, "message marshal error", false, err, logPrefix, true)
continue
}
err = r.producer.ProduceAsync(topic, messageKey, payload, nil)
if err != nil {
metrics.IngestedMessages(destination.ConnectionId, "error", "producer error").Inc()
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, logFormat, messageId, domain)
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, logPrefix, true)
continue
}
metrics.IngestedMessages(destination.ConnectionId, "success", "").Inc()
Expand Down Expand Up @@ -369,7 +369,7 @@ func (r *Router) FailedHandler(c *gin.Context) {
err = consumer.Assign([]kafka.TopicPartition{{Topic: &topicId, Partition: 0, Offset: kafka.OffsetBeginning}})
}
if err != nil {
r.ResponseError(c, http.StatusInternalServerError, "consumer error", true, err, "")
r.ResponseError(c, http.StatusInternalServerError, "consumer error", true, err, "", true)
return
}
start := time.Now()
Expand Down Expand Up @@ -406,14 +406,14 @@ func (r *Router) FailedHandler(c *gin.Context) {
func (r *Router) TestConnectionHandler(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
_ = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, "")
_ = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, "", true)
return
}
bulkerCfg := bulker.Config{}
destinationConfig := map[string]any{}
err = utils.ParseObject(body, &destinationConfig)
if err != nil {
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "parse failed", false, err, "")
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "parse failed", false, err, "", true)
return
} else {
r.Debugf("[test] parsed config for destination %s: %+v", utils.MapNVL(destinationConfig, "id", ""), destinationConfig)
Expand All @@ -427,7 +427,7 @@ func (r *Router) TestConnectionHandler(c *gin.Context) {
if b != nil {
_ = b.Close()
}
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "error creating bulker", false, err, "")
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "error creating bulker", false, err, "", true)
return
}
_ = b.Close()
Expand Down
17 changes: 9 additions & 8 deletions bulkerlib/bulker.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,15 @@ const (
// State is used as a Batch storing result
type State struct {
//Representation of message processing. For SQL warehouses it is table schema
Representation any `json:"representation"`
Status Status `json:"status"`
LastError error `json:"-"`
LastErrorText string `json:"error,omitempty"`
ProcessedRows int `json:"processedRows"`
SuccessfulRows int `json:"successfulRows"`
ErrorRowIndex int `json:"errorRowIndex,omitempty"`
*WarehouseState `json:",inline,omitempty"`
Representation any `json:"representation"`
Status Status `json:"status"`
LastError error `json:"-"`
LastErrorText string `json:"error,omitempty"`
ProcessedRows int `json:"processedRows"`
SuccessfulRows int `json:"successfulRows"`
ErrorRowIndex int `json:"errorRowIndex,omitempty"`
ProcessingTimeSec float64 `json:"processingTimeSec"`
*WarehouseState `json:",inline,omitempty"`
}

type WarehouseState struct {
Expand Down
2 changes: 1 addition & 1 deletion ingest.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN apt-get install -y ca-certificates curl

ENV TZ=UTC

FROM golang:1.21.4-bullseye as build
FROM golang:1.21.5-bullseye as build

ARG VERSION
ENV VERSION $VERSION
Expand Down
17 changes: 2 additions & 15 deletions ingest/backup_logger.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package main

import (
"bytes"
"encoding/base64"
"encoding/json"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/logging"
"github.com/jitsucom/bulker/jitsubase/safego"
Expand All @@ -30,7 +27,7 @@ func NewBackupLogger(config *Config) *BackupLogger {
}
}

func (bl *BackupLogger) Log(logger string, event any) (err error) {
func (bl *BackupLogger) Log(logger string, eventBytes []byte) (err error) {
if bl.config.BackupLogDir == "" {
return
}
Expand All @@ -46,17 +43,7 @@ func (bl *BackupLogger) Log(logger string, event any) (err error) {
}
bl.Unlock()
}
buff := bytes.Buffer{}
if data, ok := event.([]byte); ok {
_, err = base64.NewEncoder(base64.RawStdEncoding, &buff).Write(data)
buff.Write([]byte("\n"))
} else {
err = json.NewEncoder(&buff).Encode(event)
}
if err != nil {
return
}
_, err = (rwriter.(io.WriteCloser)).Write(buff.Bytes())
_, err = (rwriter.(io.WriteCloser)).Write(eventBytes)
return
}

Expand Down
5 changes: 5 additions & 0 deletions ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Config struct {

DatabaseURL string `mapstructure:"DATABASE_URL"`

DataDomain string `mapstructure:"DATA_DOMAIN"`

// For ingest endpoint only
GlobalHashSecret string `mapstructure:"GLOBAL_HASH_SECRET" default:"dea42a58-acf4-45af-85bb-e77e94bd5025"`
// For ingest endpoint only
Expand All @@ -33,6 +35,9 @@ type Config struct {

RepositoryRefreshPeriodSec int `mapstructure:"REPOSITORY_REFRESH_PERIOD_SEC" default:"5"`

RotorURL string `mapstructure:"ROTOR_URL"`
DeviceFunctionsTimeoutMs int `mapstructure:"DEVICE_FUNCTIONS_TIMEOUT_MS" default:"200"`

// # GRACEFUL SHUTDOWN
//Timeout that give running batch tasks time to finish during shutdown.
ShutdownTimeoutSec int `mapstructure:"SHUTDOWN_TIMEOUT_SEC" default:"10"`
Expand Down
20 changes: 20 additions & 0 deletions ingest/destination_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

var DeviceOptions = map[string]map[string]any{
"logrocket": {
"type": "internal-plugin",
"name": "logrocket",
},
"tag": {
"type": "internal-plugin",
"name": "tag",
},
"ga4-tag": {
"type": "internal-plugin",
"name": "ga4-tag",
},
"gtm": {
"type": "internal-plugin",
"name": "gtm",
},
}
Loading

0 comments on commit 3e3e569

Please sign in to comment.