Skip to content

Commit

Permalink
Add treatment metadata to BQ / Kafka logs (#41)
Browse files Browse the repository at this point in the history
* Add treatment metadata to BQ / Kafka logs

* Fix unit test failure

Co-authored-by: Krithika Sundararajan <krithika.sundararajan@go-jek.com>
  • Loading branch information
krithika369 and Krithika Sundararajan authored Sep 29, 2022
1 parent 30c52a5 commit e1a9729
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 31 deletions.
18 changes: 13 additions & 5 deletions api/proto/logs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ option go_package = "/monitoring";
import "google/protobuf/timestamp.proto";

message Request {
// The header from the incoming request to XP services. The map value is a comma-delimited string.
// The header from the incoming request to XP services. The map value is a
// comma-delimited string.
string header = 1;

// The JSON body of the request to XP services, UTF-8-encoded
Expand All @@ -15,7 +16,8 @@ message Request {

// key
message TreatmentServiceResultLogKey {
// The unique request id generated by Treatment Service, for every incoming request
// The unique request id generated by Treatment Service, for every incoming
// request
string request_id = 1;

// The time at which the final response from Treatment Service is generated
Expand All @@ -24,7 +26,8 @@ message TreatmentServiceResultLogKey {

// message
message TreatmentServiceResultLogMessage {
// The unique request id generated by Treatment Service, for every incoming request
// The unique request id generated by Treatment Service, for every incoming
// request
string request_id = 1;

// The time at which the final response from Treatment Service is generated
Expand All @@ -33,7 +36,8 @@ message TreatmentServiceResultLogMessage {
// The original request to Treatment Service
Request request = 3;

// The parsed segment based on registered experiment variables via Project Settings
// The parsed segment based on registered experiment variables via Project
// Settings
string segment = 4;

// The project's id of the request
Expand All @@ -48,9 +52,13 @@ message TreatmentServiceResultLogMessage {
// The assigned treatment's name based on matched experiment
string treatment_name = 8;

// The assigned treatment's config (JSON string) in based on matched experiment
// The assigned treatment's config (JSON string) in based on matched
// experiment
string treatment_config = 9;

// The error (if any) of the request
string error = 10;

// The assigned treatment metadata, if any
string treatment_metadata = 11;
}
9 changes: 8 additions & 1 deletion treatment-service/controller/treatment.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (t TreatmentController) FetchTreatment(w http.ResponseWriter, r *http.Reque
var selectedTreatment *pubsub.ExperimentTreatment
var lookupRequestFilters []models.SegmentFilter
var errorLog *monitoring.ErrorResponseLog
var switchbackWindowId *int64
if t.AppContext.AssignedTreatmentLogger != nil {
defer func() {
// Capture potential errors from other calls to service layer and prevent it from
Expand Down Expand Up @@ -97,6 +98,13 @@ func (t TreatmentController) FetchTreatment(w http.ResponseWriter, r *http.Reque
Request: requestJson,
Segmenters: requestFilters,
}
if filteredExperiment != nil {
assignedTreatmentLog.TreatmentMetadata = &monitoring.TreatmentMetadata{
ExperimentVersion: filteredExperiment.Version,
ExperimentType: string(models.ProtobufExperimentTypeToOpenAPI(filteredExperiment.Type)),
SwitchbackWindowId: switchbackWindowId,
}
}

if errorLog != nil {
assignedTreatmentLog.Error = errorLog
Expand Down Expand Up @@ -165,7 +173,6 @@ func (t TreatmentController) FetchTreatment(w http.ResponseWriter, r *http.Reque
return
}

var switchbackWindowId *int64
selectedTreatment, switchbackWindowId, err = t.TreatmentService.GetTreatment(filteredExperiment, randomizationKeyValue)
if err != nil {
statusCode = http.StatusInternalServerError
Expand Down
13 changes: 11 additions & 2 deletions treatment-service/monitoring/bqtreatmentlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type BQLogRow struct {
ExperimentId int64 `bigquery:"experiment_id"`
ExperimentName string `bigquery:"experiment_name"`

TreatmentName string `bigquery:"treatment_name"`
TreatmentConfig string `bigquery:"treatment_config"`
TreatmentName string `bigquery:"treatment_name"`
TreatmentConfig string `bigquery:"treatment_config"`
TreatmentMetadata string `bigquery:"treatment_metadata"`

Error string `bigquery:"error"`
}
Expand Down Expand Up @@ -79,6 +80,14 @@ func (p *BQLogPublisher) Publish(logs []*AssignedTreatmentLog) error {
bqlogRow.TreatmentConfig = treatmentConfig
}

if l.TreatmentMetadata != nil {
treatmentMetadata, err := json.Marshal(l.TreatmentMetadata)
if err != nil {
return err
}
bqlogRow.TreatmentMetadata = string(treatmentMetadata)
}

if l.Error != nil {
errorJson, err := json.Marshal(l.Error)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions treatment-service/monitoring/kafkatreatmentlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ func newProtobufKafkaLogEntry(
message.TreatmentConfig = treatmentConfig
}

if log.TreatmentMetadata != nil {
treatmentMetadata, err := json.Marshal(log.TreatmentMetadata)
if err != nil {
return nil, nil, err
}
message.TreatmentMetadata = string(treatmentMetadata)
}

if log.Error != nil {
errorJson, err := json.Marshal(log.Error)
if err != nil {
Expand Down
33 changes: 25 additions & 8 deletions treatment-service/monitoring/logs.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 14 additions & 7 deletions treatment-service/monitoring/treatmentlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@ type ErrorResponseLog struct {
Error string
}

type TreatmentMetadata struct {
ExperimentType string `json:"experiment_type"`
ExperimentVersion int64 `json:"experiment_version"`
SwitchbackWindowId *int64 `json:"switchback_window_id"`
}

type AssignedTreatmentLog struct {
ProjectID models.ProjectId
RequestID string
Experiment *_pubsub.Experiment
Treatment *_pubsub.ExperimentTreatment
Request *Request
Segmenters []models.SegmentFilter
Error *ErrorResponseLog
ProjectID models.ProjectId
RequestID string
Experiment *_pubsub.Experiment
Treatment *_pubsub.ExperimentTreatment
TreatmentMetadata *TreatmentMetadata
Request *Request
Segmenters []models.SegmentFilter
Error *ErrorResponseLog
}

type AssignedTreatmentPublisher interface {
Expand Down
23 changes: 15 additions & 8 deletions treatment-service/monitoring/treatmentlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestNewProtobufKafkaLogEntry(t *testing.T) {
treatmentCfg, _ := structpb.NewStruct(map[string]interface{}{
"treatment-key": "treatment-value",
})
var windowId int64 = 3
assignedTreatmentLog := &AssignedTreatmentLog{
ProjectID: 0,
RequestID: "1",
Expand All @@ -46,6 +47,11 @@ func TestNewProtobufKafkaLogEntry(t *testing.T) {
Name: "test-treatment",
Config: treatmentCfg,
},
TreatmentMetadata: &TreatmentMetadata{
ExperimentType: "Switchback",
ExperimentVersion: 2,
SwitchbackWindowId: &windowId,
},
Request: &Request{},
Segmenters: []models.SegmentFilter{
{Key: "key", Value: []*_segmenters.SegmenterValue{{Value: &_segmenters.SegmenterValue_String_{String_: "value"}}}},
Expand Down Expand Up @@ -82,14 +88,15 @@ func TestNewProtobufKafkaLogEntry(t *testing.T) {
expectedKeyJSON, err := json.Marshal(assignedTreatmentLogKeyJSON)
assert.NoError(t, err)
assignedTreatmentLogValueJSON := map[string]interface{}{
"eventTimestamp": decodedResultLogMessage.EventTimestamp.AsTime(),
"experimentId": "1",
"experimentName": "test-exp",
"request": map[string]interface{}{},
"requestId": "1",
"segment": "{\"key\":[\"value\"]}",
"treatmentConfig": "{\"treatment-key\":\"treatment-value\"}",
"treatmentName": "test-treatment",
"eventTimestamp": decodedResultLogMessage.EventTimestamp.AsTime(),
"experimentId": "1",
"experimentName": "test-exp",
"request": map[string]interface{}{},
"requestId": "1",
"segment": "{\"key\":[\"value\"]}",
"treatmentConfig": "{\"treatment-key\":\"treatment-value\"}",
"treatmentName": "test-treatment",
"treatmentMetadata": "{\"experiment_type\":\"Switchback\",\"experiment_version\":2,\"switchback_window_id\":3}",
}
expectedValueJSON, err := json.Marshal(assignedTreatmentLogValueJSON)
assert.NoError(t, err)
Expand Down

0 comments on commit e1a9729

Please sign in to comment.