diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 299604f5..15122c3f 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -75,7 +75,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro return nil, err } - // insertLogsSQL := renderInsertLogsSQL(cfg) + insertLogsSQL := renderInsertLogsSQL(cfg) insertLogsSQLV2 := renderInsertLogsSQLV2(cfg) id := uuid.New() collector := usage.NewUsageCollector( @@ -99,9 +99,9 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro } return &clickhouseLogsExporter{ - id: id, - db: client, - // insertLogsSQL: insertLogsSQL, + id: id, + db: client, + insertLogsSQL: insertLogsSQL, insertLogsSQLV2: insertLogsSQLV2, logger: logger, cfg: cfg, @@ -214,19 +214,18 @@ func tsBucket(ts int64, bucketSize int64) int64 { } func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.Logs) error { - // JSON (sorted) for {bucket_start: {attribs: fingerprint}} resourcesSeen := map[int64]map[string]string{} var insertLogsStmtV2 driver.Batch var insertResourcesStmtV2 driver.Batch - // var statement driver.Batch + var statement driver.Batch var tagStatement driver.Batch var err error defer func() { - // if statement != nil { - // _ = statement.Abort() - // } + if statement != nil { + _ = statement.Abort() + } if tagStatement != nil { _ = tagStatement.Abort() } @@ -243,10 +242,10 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L return errors.New("shutdown has been called") default: start := time.Now() - // statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection()) - // if err != nil { - // return fmt.Errorf("PrepareBatch:%w", err) - // } + statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection()) + if err != nil { + return fmt.Errorf("PrepareBatch:%w", err) + } tagStatement, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES), driver.WithReleaseConnection()) if err != nil { @@ -281,7 +280,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } // remove after sometime - // resources = addTemporaryUnderscoreSupport(resources) + resources = addTemporaryUnderscoreSupport(resources) for j := 0; j < logs.ScopeLogs().Len(); j++ { scope := logs.ScopeLogs().At(j).Scope() @@ -334,7 +333,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } // remove after sometime - // attributes = addTemporaryUnderscoreSupport(attributes) + attributes = addTemporaryUnderscoreSupport(attributes) err = insertLogsStmtV2.Append( uint64(lBucketStart), @@ -358,54 +357,72 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L scopeAttributes.StringValues, ) if err != nil { - return fmt.Errorf("LOGSv2: StatementAppend:%w", err) + return fmt.Errorf("StatementAppendLogsV2:%w", err) } // old table - // err = statement.Append( - // ts, - // ots, - // e.ksuid.String(), - // utils.TraceIDToHexOrEmptyString(r.TraceID()), - // utils.SpanIDToHexOrEmptyString(r.SpanID()), - // uint32(r.Flags()), - // r.SeverityText(), - // uint8(r.SeverityNumber()), - // getStringifiedBody(r.Body()), - // resources.StringKeys, - // resources.StringValues, - // attributes.StringKeys, - // attributes.StringValues, - // attributes.IntKeys, - // attributes.IntValues, - // attributes.FloatKeys, - // attributes.FloatValues, - // attributes.BoolKeys, - // attributes.BoolValues, - // scopeName, - // scopeVersion, - // scopeAttributes.StringKeys, - // scopeAttributes.StringValues, - // ) - // if err != nil { - // return fmt.Errorf("StatementAppend:%w", err) - // } + err = statement.Append( + ts, + ots, + e.ksuid.String(), + utils.TraceIDToHexOrEmptyString(r.TraceID()), + utils.SpanIDToHexOrEmptyString(r.SpanID()), + uint32(r.Flags()), + r.SeverityText(), + uint8(r.SeverityNumber()), + getStringifiedBody(r.Body()), + resources.StringKeys, + resources.StringValues, + attributes.StringKeys, + attributes.StringValues, + attributes.IntKeys, + attributes.IntValues, + attributes.FloatKeys, + attributes.FloatValues, + attributes.BoolKeys, + attributes.BoolValues, + scopeName, + scopeVersion, + scopeAttributes.StringKeys, + scopeAttributes.StringValues, + ) + if err != nil { + return fmt.Errorf("StatementAppend:%w", err) + } e.ksuid = e.ksuid.Next() } } } - var errResource error - dbWriteStart := time.Now() - // err = statement.Send() - // insert to the new table - err = insertLogsStmtV2.Send() - // insert into the resource bucket table - insertResourcesStmtV2, errResource = e.db.PrepareBatch( + insertResourcesStmtV2, err = e.db.PrepareBatch( ctx, fmt.Sprintf("INSERT into %s.%s", databaseName, DISTRIBUTED_LOGS_RESOURCE_BUCKET_V2), driver.WithReleaseConnection(), ) + if err != nil { + return fmt.Errorf("couldn't PrepareBatch for inserting resource fingerprints :%w", err) + } + + for bucketTs, resources := range resourcesSeen { + for resourceLabels, fingerprint := range resources { + insertResourcesStmtV2.Append( + resourceLabels, + fingerprint, + bucketTs, + ) + } + } + + var errResource error + dbWriteStart := time.Now() + err = statement.Send() + // insert to the new table + errV2 := insertLogsStmtV2.Send() + // insert into the resource bucket table + errResource = insertResourcesStmtV2.Send() + if err != nil { + return fmt.Errorf("couldn't send batch insert resources statement:%w", err) + } stats.RecordWithTags(ctx, []tag.Mutator{ @@ -417,8 +434,11 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L if err != nil { return fmt.Errorf("StatementSend:%w", err) } + if errV2 != nil { + return fmt.Errorf("StatementSendV2:%w", err) + } if errResource != nil { - return fmt.Errorf("couldn't PrepareBatch for inserting resource fingerprints :%w", err) + return fmt.Errorf("ResourceStatementSendV2:%w", err) } duration := time.Since(start) @@ -429,22 +449,6 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, e.id.String())}, ExporterSigNozSentLogRecords.M(int64(v.Count)), ExporterSigNozSentLogRecordsBytes.M(int64(v.Size))) } - resourceCount := 0 - for bucketTs, resources := range resourcesSeen { - for resourceLabels, fingerprint := range resources { - insertResourcesStmtV2.Append( - resourceLabels, - fingerprint, - bucketTs, - ) - resourceCount += 1 - } - } - - err = insertResourcesStmtV2.Send() - if err != nil { - return fmt.Errorf("couldn't send batch insert resources statement:%w", err) - } // push tag attributes tagWriteStart := time.Now() err = tagStatement.Send() @@ -642,58 +646,58 @@ func formatKey(k string) string { return strings.ReplaceAll(k, ".", "_") } -// const ( -// // language=ClickHouse SQL -// insertLogsSQLTemplate = `INSERT INTO %s.%s ( -// timestamp, -// observed_timestamp, -// id, -// trace_id, -// span_id, -// trace_flags, -// severity_text, -// severity_number, -// body, -// resources_string_key, -// resources_string_value, -// attributes_string_key, -// attributes_string_value, -// attributes_int64_key, -// attributes_int64_value, -// attributes_float64_key, -// attributes_float64_value, -// attributes_bool_key, -// attributes_bool_value, -// scope_name, -// scope_version, -// scope_string_key, -// scope_string_value -// ) VALUES ( -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// ?, -// )` -// ) +const ( + // language=ClickHouse SQL + insertLogsSQLTemplate = `INSERT INTO %s.%s ( + timestamp, + observed_timestamp, + id, + trace_id, + span_id, + trace_flags, + severity_text, + severity_number, + body, + resources_string_key, + resources_string_value, + attributes_string_key, + attributes_string_value, + attributes_int64_key, + attributes_int64_value, + attributes_float64_key, + attributes_float64_value, + attributes_bool_key, + attributes_bool_value, + scope_name, + scope_version, + scope_string_key, + scope_string_value + ) VALUES ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + )` +) const ( // language=ClickHouse SQL @@ -767,9 +771,9 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro return db, nil } -// func renderInsertLogsSQL(cfg *Config) string { -// return fmt.Sprintf(insertLogsSQLTemplate, databaseName, DISTRIBUTED_LOGS_TABLE) -// } +func renderInsertLogsSQL(cfg *Config) string { + return fmt.Sprintf(insertLogsSQLTemplate, databaseName, DISTRIBUTED_LOGS_TABLE) +} func renderInsertLogsSQLV2(cfg *Config) string { return fmt.Sprintf(insertLogsSQLTemplateV2, databaseName, DISTRIBUTED_LOGS_TABLE_V2) diff --git a/exporter/clickhouselogsexporter/logsv2/fingerprint.go b/exporter/clickhouselogsexporter/logsv2/fingerprint.go index a1434e17..79c9a4de 100644 --- a/exporter/clickhouselogsexporter/logsv2/fingerprint.go +++ b/exporter/clickhouselogsexporter/logsv2/fingerprint.go @@ -46,7 +46,7 @@ func (node *DimensionHierarchyNode) Identifier(attributes map[string]any) []IdLa return result } -// TODO(Raj): Consider parsing this stuff out from json +// TODO(Raj/Nitya): Consider parsing this stuff out from json func ResourceHierarchy() *DimensionHierarchyNode { return &DimensionHierarchyNode{ labels: []string{ diff --git a/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.down.sql b/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.down.sql index 9a7888ef..ffa0601e 100644 --- a/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.down.sql +++ b/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.down.sql @@ -1,2 +1,19 @@ -- Please run the below commands if you are trying to fix schema migration issue https://signoz.io/docs/userguide/logs_troubleshooting/#schema-migrator-dirty-database-version -SELECT 1 \ No newline at end of file +ALTER TABLE signoz_logs.tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource') CODEC(ZSTD(1)); +ALTER TABLE signoz_logs.distributed_tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource') CODEC(ZSTD(1)); + + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP INDEX IF EXISTS scope_name_idx; + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_name; +ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_name; + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_version; +ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_version; + + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_string_key; +ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_string_key; + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_string_value; +ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_string_value; \ No newline at end of file diff --git a/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql b/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql index 95ff8302..3f65f18b 100644 --- a/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql +++ b/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql @@ -11,5 +11,22 @@ -- ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS instrumentation_scope_attributes_string_value Array(String) CODEC(ZSTD(1)) -- ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD INDEX IF NOT EXISTS instrumentation_scope_idx (instrumentation_scope) TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4 +-- Don't run the commands below +ALTER TABLE signoz_logs.tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource', 'scope') CODEC(ZSTD(1)); +ALTER TABLE signoz_logs.distributed_tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource', 'scope') CODEC(ZSTD(1)); -SELECT 1 \ No newline at end of file + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS scope_name String CODEC(ZSTD(1)); +ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS scope_name String CODEC(ZSTD(1)); + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS scope_version String CODEC(ZSTD(1)); +ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS scope_version String CODEC(ZSTD(1)); + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS scope_string_key Array(String) CODEC(ZSTD(1)); +ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS scope_string_key Array(String) CODEC(ZSTD(1)); + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS scope_string_value Array(String) CODEC(ZSTD(1)); +ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS scope_string_value Array(String) CODEC(ZSTD(1)); + + +ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD INDEX IF NOT EXISTS scope_name_idx (scope_name) TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4; \ No newline at end of file diff --git a/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql b/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql index a904c49b..7686148b 100644 --- a/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql +++ b/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql @@ -3,6 +3,35 @@ DROP TABLE IF EXISTS signoz_logs.attribute_keys_float64_final_mv ON CLUSTER {{. DROP TABLE IF EXISTS signoz_logs.attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; DROP TABLE IF EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_string_key) as name, 'String' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_int64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_int64_key) as name, 'Int64' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_float64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_float64_key) as name, 'Float64' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_bool_key) as name, 'Bool' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_resource_keys AS +SELECT +distinct arrayJoin(resources_string_key) as name, 'String' datatype +FROM signoz_logs.logs +ORDER BY name; DROP TABLE IF EXISTS signoz_logs.logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}}; DROP TABLE IF EXISTS signoz_logs.distributed_logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}};