Skip to content

Commit

Permalink
Classic compat (#22)
Browse files Browse the repository at this point in the history
* ingest: classic compat endpoint
  • Loading branch information
absorbb authored Dec 28, 2024
1 parent c3cb4c8 commit 06d48d2
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 6 deletions.
29 changes: 26 additions & 3 deletions ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var eventTypesSet = types.NewSet("page", "identify", "track", "group", "alias",

var messageIdUnsupportedChars = regexp.MustCompile(`[^a-zA-Z0-9._-]`)

type eventPatchFunc func(c *gin.Context, messageId string, event types.Json, tp string, ingestType IngestType, analyticContext types.Json) error

type Router struct {
*appbase.Router
config *Config
Expand Down Expand Up @@ -96,6 +98,14 @@ func NewRouter(appContext *Context, partitionSelector kafkabase.PartitionSelecto
"/api/s/:tp",
"/api/px/:tp",
"/api/s/s2s/:tp",
// classic compat
"/s/lib.js",
"/api/v1/s2s/event",
"/api/v1/s2s/event/",
"/api/v1/s2s/events",
"/api/v1/event",
"/api/v1/events",
"/api.:ignored",
})

httpClient := &http.Client{
Expand Down Expand Up @@ -151,6 +161,15 @@ func NewRouter(appContext *Context, partitionSelector kafkabase.PartitionSelecto
fast.Match([]string{"OPTIONS", "GET"}, "/api/px/:tp", router.PixelHandler)
fast.Match([]string{"OPTIONS", "POST"}, "/api/s/s2s/:tp", router.IngestHandler)

// classic compat
fast.Match([]string{"GET", "HEAD", "OPTIONS"}, "/s/lib.js", router.ClassicScriptHandler)
fast.Match([]string{"OPTIONS", "POST"}, "/api/v1/s2s/event", router.ClassicHandler)
fast.Match([]string{"OPTIONS", "POST"}, "/api/v1/s2s/event/", router.ClassicHandler)
fast.Match([]string{"OPTIONS", "POST"}, "/api/v1/s2s/events", router.ClassicHandler)
fast.Match([]string{"OPTIONS", "POST"}, "/api/v1/event", router.ClassicHandler)
fast.Match([]string{"OPTIONS", "POST"}, "/api/v1/events", router.ClassicHandler)
fast.Match([]string{"OPTIONS", "POST"}, "/api.:ignored", router.ClassicHandler)

fast.Match([]string{"GET", "HEAD", "OPTIONS"}, "/p.js", router.ScriptHandler)

engine.GET("/health", func(c *gin.Context) {
Expand Down Expand Up @@ -440,8 +459,8 @@ func (r *Router) processSyncDestination(message *IngestMessage, stream *StreamWi
return &SyncDestinationsResponse{Destinations: data, OK: true}
}

func (r *Router) buildIngestMessage(c *gin.Context, messageId string, event types.Json, analyticContext types.Json, tp string, loc StreamCredentials, stream *StreamWithDestinations) (ingestMessage *IngestMessage, ingestMessageBytes []byte, err error) {
err = patchEvent(c, messageId, event, tp, loc.IngestType, analyticContext)
func (r *Router) buildIngestMessage(c *gin.Context, messageId string, event types.Json, analyticContext types.Json, tp string, loc StreamCredentials, stream *StreamWithDestinations, patchFunc eventPatchFunc) (ingestMessage *IngestMessage, ingestMessageBytes []byte, err error) {
err = patchFunc(c, messageId, event, tp, loc.IngestType, analyticContext)
headers := utils.MapMap(utils.MapFilter(c.Request.Header, func(k string, v []string) bool {
return len(v) > 0 && !isInternalHeader(k)
}), func(k string, v []string) string {
Expand All @@ -450,7 +469,7 @@ func (r *Router) buildIngestMessage(c *gin.Context, messageId string, event type
}
return strings.Join(v, ",")
})
bodyType := event.GetS("type")
bodyType := utils.Ternary(tp != "classic", event.GetS("type"), event.GetS("event_type"))
ingestMessage = &IngestMessage{
IngestType: loc.IngestType,
MessageCreated: time.Now(),
Expand All @@ -466,6 +485,9 @@ func (r *Router) buildIngestMessage(c *gin.Context, messageId string, event type
HttpHeaders: headers,
HttpPayload: event,
}
if tp == "classic" {
ingestMessage.Origin.Classic = true
}
ingestMessageBytes, err1 := jsonorder.Marshal(ingestMessage)
if err1 != nil {
err = utils.Nvl(err, err1)
Expand Down Expand Up @@ -505,6 +527,7 @@ type IngestMessageOrigin struct {
Slug string `json:"slug,omitempty"`
SourceId string `json:"sourceId,omitempty"`
Domain string `json:"domain,omitempty"`
Classic bool `json:"classic,omitempty"`
}

type IngestMessage struct {
Expand Down
2 changes: 1 addition & 1 deletion ingest/router_batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (r *Router) BatchHandler(c *gin.Context) {
messageId = utils.ShortenString(messageIdUnsupportedChars.ReplaceAllString(messageId, "_"), 64)
}
c.Set(appbase.ContextMessageId, messageId)
_, ingestMessageBytes, err1 := r.buildIngestMessage(c, messageId, event, payload.Context, "event", loc, stream)
_, ingestMessageBytes, err1 := r.buildIngestMessage(c, messageId, event, payload.Context, "event", loc, stream, patchEvent)
var asyncDestinations, tagsDestinations []string
if err1 == nil {
if len(stream.AsynchronousDestinations) == 0 {
Expand Down
Loading

0 comments on commit 06d48d2

Please sign in to comment.