From e1a97292bd1dd9f7e86d91d319d1d2202ab9467c Mon Sep 17 00:00:00 2001 From: Krithika Sundararajan Date: Thu, 29 Sep 2022 08:21:18 +0800 Subject: [PATCH] Add treatment metadata to BQ / Kafka logs (#41) * Add treatment metadata to BQ / Kafka logs * Fix unit test failure Co-authored-by: Krithika Sundararajan --- api/proto/logs.proto | 18 +++++++--- treatment-service/controller/treatment.go | 9 ++++- .../monitoring/bqtreatmentlogger.go | 13 ++++++-- .../monitoring/kafkatreatmentlogger.go | 8 +++++ treatment-service/monitoring/logs.pb.go | 33 ++++++++++++++----- .../monitoring/treatmentlogger.go | 21 ++++++++---- .../monitoring/treatmentlogger_test.go | 23 ++++++++----- 7 files changed, 94 insertions(+), 31 deletions(-) diff --git a/api/proto/logs.proto b/api/proto/logs.proto index f7e4d361..e2978de5 100644 --- a/api/proto/logs.proto +++ b/api/proto/logs.proto @@ -6,7 +6,8 @@ option go_package = "/monitoring"; import "google/protobuf/timestamp.proto"; message Request { - // The header from the incoming request to XP services. The map value is a comma-delimited string. + // The header from the incoming request to XP services. The map value is a + // comma-delimited string. string header = 1; // The JSON body of the request to XP services, UTF-8-encoded @@ -15,7 +16,8 @@ message Request { // key message TreatmentServiceResultLogKey { - // The unique request id generated by Treatment Service, for every incoming request + // The unique request id generated by Treatment Service, for every incoming + // request string request_id = 1; // The time at which the final response from Treatment Service is generated @@ -24,7 +26,8 @@ message TreatmentServiceResultLogKey { // message message TreatmentServiceResultLogMessage { - // The unique request id generated by Treatment Service, for every incoming request + // The unique request id generated by Treatment Service, for every incoming + // request string request_id = 1; // The time at which the final response from Treatment Service is generated @@ -33,7 +36,8 @@ message TreatmentServiceResultLogMessage { // The original request to Treatment Service Request request = 3; - // The parsed segment based on registered experiment variables via Project Settings + // The parsed segment based on registered experiment variables via Project + // Settings string segment = 4; // The project's id of the request @@ -48,9 +52,13 @@ message TreatmentServiceResultLogMessage { // The assigned treatment's name based on matched experiment string treatment_name = 8; - // The assigned treatment's config (JSON string) in based on matched experiment + // The assigned treatment's config (JSON string) in based on matched + // experiment string treatment_config = 9; // The error (if any) of the request string error = 10; + + // The assigned treatment metadata, if any + string treatment_metadata = 11; } diff --git a/treatment-service/controller/treatment.go b/treatment-service/controller/treatment.go index 1ad5ac7f..df6fe5c0 100644 --- a/treatment-service/controller/treatment.go +++ b/treatment-service/controller/treatment.go @@ -63,6 +63,7 @@ func (t TreatmentController) FetchTreatment(w http.ResponseWriter, r *http.Reque var selectedTreatment *pubsub.ExperimentTreatment var lookupRequestFilters []models.SegmentFilter var errorLog *monitoring.ErrorResponseLog + var switchbackWindowId *int64 if t.AppContext.AssignedTreatmentLogger != nil { defer func() { // Capture potential errors from other calls to service layer and prevent it from @@ -97,6 +98,13 @@ func (t TreatmentController) FetchTreatment(w http.ResponseWriter, r *http.Reque Request: requestJson, Segmenters: requestFilters, } + if filteredExperiment != nil { + assignedTreatmentLog.TreatmentMetadata = &monitoring.TreatmentMetadata{ + ExperimentVersion: filteredExperiment.Version, + ExperimentType: string(models.ProtobufExperimentTypeToOpenAPI(filteredExperiment.Type)), + SwitchbackWindowId: switchbackWindowId, + } + } if errorLog != nil { assignedTreatmentLog.Error = errorLog @@ -165,7 +173,6 @@ func (t TreatmentController) FetchTreatment(w http.ResponseWriter, r *http.Reque return } - var switchbackWindowId *int64 selectedTreatment, switchbackWindowId, err = t.TreatmentService.GetTreatment(filteredExperiment, randomizationKeyValue) if err != nil { statusCode = http.StatusInternalServerError diff --git a/treatment-service/monitoring/bqtreatmentlogger.go b/treatment-service/monitoring/bqtreatmentlogger.go index ab8aba76..f9dc26dd 100644 --- a/treatment-service/monitoring/bqtreatmentlogger.go +++ b/treatment-service/monitoring/bqtreatmentlogger.go @@ -31,8 +31,9 @@ type BQLogRow struct { ExperimentId int64 `bigquery:"experiment_id"` ExperimentName string `bigquery:"experiment_name"` - TreatmentName string `bigquery:"treatment_name"` - TreatmentConfig string `bigquery:"treatment_config"` + TreatmentName string `bigquery:"treatment_name"` + TreatmentConfig string `bigquery:"treatment_config"` + TreatmentMetadata string `bigquery:"treatment_metadata"` Error string `bigquery:"error"` } @@ -79,6 +80,14 @@ func (p *BQLogPublisher) Publish(logs []*AssignedTreatmentLog) error { bqlogRow.TreatmentConfig = treatmentConfig } + if l.TreatmentMetadata != nil { + treatmentMetadata, err := json.Marshal(l.TreatmentMetadata) + if err != nil { + return err + } + bqlogRow.TreatmentMetadata = string(treatmentMetadata) + } + if l.Error != nil { errorJson, err := json.Marshal(l.Error) if err != nil { diff --git a/treatment-service/monitoring/kafkatreatmentlogger.go b/treatment-service/monitoring/kafkatreatmentlogger.go index 679e0709..2aa01653 100644 --- a/treatment-service/monitoring/kafkatreatmentlogger.go +++ b/treatment-service/monitoring/kafkatreatmentlogger.go @@ -153,6 +153,14 @@ func newProtobufKafkaLogEntry( message.TreatmentConfig = treatmentConfig } + if log.TreatmentMetadata != nil { + treatmentMetadata, err := json.Marshal(log.TreatmentMetadata) + if err != nil { + return nil, nil, err + } + message.TreatmentMetadata = string(treatmentMetadata) + } + if log.Error != nil { errorJson, err := json.Marshal(log.Error) if err != nil { diff --git a/treatment-service/monitoring/logs.pb.go b/treatment-service/monitoring/logs.pb.go index da23aea3..fae80e18 100644 --- a/treatment-service/monitoring/logs.pb.go +++ b/treatment-service/monitoring/logs.pb.go @@ -26,7 +26,8 @@ type Request struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // The header from the incoming request to XP services. The map value is a comma-delimited string. + // The header from the incoming request to XP services. The map value is a + // comma-delimited string. Header string `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` // The JSON body of the request to XP services, UTF-8-encoded Body string `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` @@ -84,7 +85,8 @@ type TreatmentServiceResultLogKey struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // The unique request id generated by Treatment Service, for every incoming request + // The unique request id generated by Treatment Service, for every incoming + // request RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` // The time at which the final response from Treatment Service is generated EventTimestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=event_timestamp,json=eventTimestamp,proto3" json:"event_timestamp,omitempty"` @@ -142,13 +144,15 @@ type TreatmentServiceResultLogMessage struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // The unique request id generated by Treatment Service, for every incoming request + // The unique request id generated by Treatment Service, for every incoming + // request RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` // The time at which the final response from Treatment Service is generated EventTimestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=event_timestamp,json=eventTimestamp,proto3" json:"event_timestamp,omitempty"` // The original request to Treatment Service Request *Request `protobuf:"bytes,3,opt,name=request,proto3" json:"request,omitempty"` - // The parsed segment based on registered experiment variables via Project Settings + // The parsed segment based on registered experiment variables via Project + // Settings Segment string `protobuf:"bytes,4,opt,name=segment,proto3" json:"segment,omitempty"` // The project's id of the request ProjectId uint32 `protobuf:"varint,5,opt,name=project_id,json=projectId,proto3" json:"project_id,omitempty"` @@ -158,10 +162,13 @@ type TreatmentServiceResultLogMessage struct { ExperimentName string `protobuf:"bytes,7,opt,name=experiment_name,json=experimentName,proto3" json:"experiment_name,omitempty"` // The assigned treatment's name based on matched experiment TreatmentName string `protobuf:"bytes,8,opt,name=treatment_name,json=treatmentName,proto3" json:"treatment_name,omitempty"` - // The assigned treatment's config (JSON string) in based on matched experiment + // The assigned treatment's config (JSON string) in based on matched + // experiment TreatmentConfig string `protobuf:"bytes,9,opt,name=treatment_config,json=treatmentConfig,proto3" json:"treatment_config,omitempty"` // The error (if any) of the request Error string `protobuf:"bytes,10,opt,name=error,proto3" json:"error,omitempty"` + // The assigned treatment metadata, if any + TreatmentMetadata string `protobuf:"bytes,11,opt,name=treatment_metadata,json=treatmentMetadata,proto3" json:"treatment_metadata,omitempty"` } func (x *TreatmentServiceResultLogMessage) Reset() { @@ -266,6 +273,13 @@ func (x *TreatmentServiceResultLogMessage) GetError() string { return "" } +func (x *TreatmentServiceResultLogMessage) GetTreatmentMetadata() string { + if x != nil { + return x.TreatmentMetadata + } + return "" +} + var File_api_proto_logs_proto protoreflect.FileDescriptor var file_api_proto_logs_proto_rawDesc = []byte{ @@ -285,7 +299,7 @@ var file_api_proto_logs_proto_rawDesc = []byte{ 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, - 0xa4, 0x03, 0x0a, 0x20, 0x54, 0x72, 0x65, 0x61, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x72, + 0xd3, 0x03, 0x0a, 0x20, 0x54, 0x72, 0x65, 0x61, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x4c, 0x6f, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, @@ -311,8 +325,11 @@ var file_api_proto_logs_proto_rawDesc = []byte{ 0x6e, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x72, 0x65, 0x61, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x0d, 0x5a, 0x0b, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, - 0x6f, 0x72, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2d, 0x0a, 0x12, 0x74, 0x72, 0x65, 0x61, 0x74, 0x6d, + 0x65, 0x6e, 0x74, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x0b, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x11, 0x74, 0x72, 0x65, 0x61, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x42, 0x0d, 0x5a, 0x0b, 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, + 0x72, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/treatment-service/monitoring/treatmentlogger.go b/treatment-service/monitoring/treatmentlogger.go index edad8b35..fc51103a 100644 --- a/treatment-service/monitoring/treatmentlogger.go +++ b/treatment-service/monitoring/treatmentlogger.go @@ -15,14 +15,21 @@ type ErrorResponseLog struct { Error string } +type TreatmentMetadata struct { + ExperimentType string `json:"experiment_type"` + ExperimentVersion int64 `json:"experiment_version"` + SwitchbackWindowId *int64 `json:"switchback_window_id"` +} + type AssignedTreatmentLog struct { - ProjectID models.ProjectId - RequestID string - Experiment *_pubsub.Experiment - Treatment *_pubsub.ExperimentTreatment - Request *Request - Segmenters []models.SegmentFilter - Error *ErrorResponseLog + ProjectID models.ProjectId + RequestID string + Experiment *_pubsub.Experiment + Treatment *_pubsub.ExperimentTreatment + TreatmentMetadata *TreatmentMetadata + Request *Request + Segmenters []models.SegmentFilter + Error *ErrorResponseLog } type AssignedTreatmentPublisher interface { diff --git a/treatment-service/monitoring/treatmentlogger_test.go b/treatment-service/monitoring/treatmentlogger_test.go index cc7ecb38..55620d01 100644 --- a/treatment-service/monitoring/treatmentlogger_test.go +++ b/treatment-service/monitoring/treatmentlogger_test.go @@ -35,6 +35,7 @@ func TestNewProtobufKafkaLogEntry(t *testing.T) { treatmentCfg, _ := structpb.NewStruct(map[string]interface{}{ "treatment-key": "treatment-value", }) + var windowId int64 = 3 assignedTreatmentLog := &AssignedTreatmentLog{ ProjectID: 0, RequestID: "1", @@ -46,6 +47,11 @@ func TestNewProtobufKafkaLogEntry(t *testing.T) { Name: "test-treatment", Config: treatmentCfg, }, + TreatmentMetadata: &TreatmentMetadata{ + ExperimentType: "Switchback", + ExperimentVersion: 2, + SwitchbackWindowId: &windowId, + }, Request: &Request{}, Segmenters: []models.SegmentFilter{ {Key: "key", Value: []*_segmenters.SegmenterValue{{Value: &_segmenters.SegmenterValue_String_{String_: "value"}}}}, @@ -82,14 +88,15 @@ func TestNewProtobufKafkaLogEntry(t *testing.T) { expectedKeyJSON, err := json.Marshal(assignedTreatmentLogKeyJSON) assert.NoError(t, err) assignedTreatmentLogValueJSON := map[string]interface{}{ - "eventTimestamp": decodedResultLogMessage.EventTimestamp.AsTime(), - "experimentId": "1", - "experimentName": "test-exp", - "request": map[string]interface{}{}, - "requestId": "1", - "segment": "{\"key\":[\"value\"]}", - "treatmentConfig": "{\"treatment-key\":\"treatment-value\"}", - "treatmentName": "test-treatment", + "eventTimestamp": decodedResultLogMessage.EventTimestamp.AsTime(), + "experimentId": "1", + "experimentName": "test-exp", + "request": map[string]interface{}{}, + "requestId": "1", + "segment": "{\"key\":[\"value\"]}", + "treatmentConfig": "{\"treatment-key\":\"treatment-value\"}", + "treatmentName": "test-treatment", + "treatmentMetadata": "{\"experiment_type\":\"Switchback\",\"experiment_version\":2,\"switchback_window_id\":3}", } expectedValueJSON, err := json.Marshal(assignedTreatmentLogValueJSON) assert.NoError(t, err)