Skip to content

Commit

Permalink
feat: write to old table as well
Browse files Browse the repository at this point in the history
  • Loading branch information
nityanandagohain committed Aug 13, 2024
1 parent 991959a commit ebd3dc2
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 126 deletions.
250 changes: 127 additions & 123 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro
return nil, err
}

// insertLogsSQL := renderInsertLogsSQL(cfg)
insertLogsSQL := renderInsertLogsSQL(cfg)
insertLogsSQLV2 := renderInsertLogsSQLV2(cfg)
id := uuid.New()
collector := usage.NewUsageCollector(
Expand All @@ -99,9 +99,9 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro
}

return &clickhouseLogsExporter{
id: id,
db: client,
// insertLogsSQL: insertLogsSQL,
id: id,
db: client,
insertLogsSQL: insertLogsSQL,
insertLogsSQLV2: insertLogsSQLV2,
logger: logger,
cfg: cfg,
Expand Down Expand Up @@ -214,19 +214,18 @@ func tsBucket(ts int64, bucketSize int64) int64 {
}

func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.Logs) error {
// JSON (sorted) for {bucket_start: {attribs: fingerprint}}
resourcesSeen := map[int64]map[string]string{}

var insertLogsStmtV2 driver.Batch
var insertResourcesStmtV2 driver.Batch
// var statement driver.Batch
var statement driver.Batch
var tagStatement driver.Batch
var err error

defer func() {
// if statement != nil {
// _ = statement.Abort()
// }
if statement != nil {
_ = statement.Abort()
}
if tagStatement != nil {
_ = tagStatement.Abort()
}
Expand All @@ -243,10 +242,10 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
return errors.New("shutdown has been called")
default:
start := time.Now()
// statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection())
// if err != nil {
// return fmt.Errorf("PrepareBatch:%w", err)
// }
statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection())
if err != nil {
return fmt.Errorf("PrepareBatch:%w", err)
}

tagStatement, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES), driver.WithReleaseConnection())
if err != nil {
Expand Down Expand Up @@ -281,7 +280,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}

// remove after sometime
// resources = addTemporaryUnderscoreSupport(resources)
resources = addTemporaryUnderscoreSupport(resources)

for j := 0; j < logs.ScopeLogs().Len(); j++ {
scope := logs.ScopeLogs().At(j).Scope()
Expand Down Expand Up @@ -334,7 +333,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}

// remove after sometime
// attributes = addTemporaryUnderscoreSupport(attributes)
attributes = addTemporaryUnderscoreSupport(attributes)

err = insertLogsStmtV2.Append(
uint64(lBucketStart),
Expand All @@ -358,54 +357,72 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
scopeAttributes.StringValues,
)
if err != nil {
return fmt.Errorf("LOGSv2: StatementAppend:%w", err)
return fmt.Errorf("StatementAppendLogsV2:%w", err)
}

// old table
// err = statement.Append(
// ts,
// ots,
// e.ksuid.String(),
// utils.TraceIDToHexOrEmptyString(r.TraceID()),
// utils.SpanIDToHexOrEmptyString(r.SpanID()),
// uint32(r.Flags()),
// r.SeverityText(),
// uint8(r.SeverityNumber()),
// getStringifiedBody(r.Body()),
// resources.StringKeys,
// resources.StringValues,
// attributes.StringKeys,
// attributes.StringValues,
// attributes.IntKeys,
// attributes.IntValues,
// attributes.FloatKeys,
// attributes.FloatValues,
// attributes.BoolKeys,
// attributes.BoolValues,
// scopeName,
// scopeVersion,
// scopeAttributes.StringKeys,
// scopeAttributes.StringValues,
// )
// if err != nil {
// return fmt.Errorf("StatementAppend:%w", err)
// }
err = statement.Append(
ts,
ots,
e.ksuid.String(),
utils.TraceIDToHexOrEmptyString(r.TraceID()),
utils.SpanIDToHexOrEmptyString(r.SpanID()),
uint32(r.Flags()),
r.SeverityText(),
uint8(r.SeverityNumber()),
getStringifiedBody(r.Body()),
resources.StringKeys,
resources.StringValues,
attributes.StringKeys,
attributes.StringValues,
attributes.IntKeys,
attributes.IntValues,
attributes.FloatKeys,
attributes.FloatValues,
attributes.BoolKeys,
attributes.BoolValues,
scopeName,
scopeVersion,
scopeAttributes.StringKeys,
scopeAttributes.StringValues,
)
if err != nil {
return fmt.Errorf("StatementAppend:%w", err)
}
e.ksuid = e.ksuid.Next()
}
}
}

var errResource error
dbWriteStart := time.Now()
// err = statement.Send()
// insert to the new table
err = insertLogsStmtV2.Send()
// insert into the resource bucket table
insertResourcesStmtV2, errResource = e.db.PrepareBatch(
insertResourcesStmtV2, err = e.db.PrepareBatch(
ctx,
fmt.Sprintf("INSERT into %s.%s", databaseName, DISTRIBUTED_LOGS_RESOURCE_BUCKET_V2),
driver.WithReleaseConnection(),
)
if err != nil {
return fmt.Errorf("couldn't PrepareBatch for inserting resource fingerprints :%w", err)
}

for bucketTs, resources := range resourcesSeen {
for resourceLabels, fingerprint := range resources {
insertResourcesStmtV2.Append(
resourceLabels,
fingerprint,
bucketTs,
)
}
}

var errResource error
dbWriteStart := time.Now()
err = statement.Send()
// insert to the new table
errV2 := insertLogsStmtV2.Send()
// insert into the resource bucket table
errResource = insertResourcesStmtV2.Send()
if err != nil {
return fmt.Errorf("couldn't send batch insert resources statement:%w", err)
}

stats.RecordWithTags(ctx,
[]tag.Mutator{
Expand All @@ -417,8 +434,11 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
if err != nil {
return fmt.Errorf("StatementSend:%w", err)
}
if errV2 != nil {
return fmt.Errorf("StatementSendV2:%w", err)
}
if errResource != nil {
return fmt.Errorf("couldn't PrepareBatch for inserting resource fingerprints :%w", err)
return fmt.Errorf("ResourceStatementSendV2:%w", err)
}

duration := time.Since(start)
Expand All @@ -429,22 +449,6 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, e.id.String())}, ExporterSigNozSentLogRecords.M(int64(v.Count)), ExporterSigNozSentLogRecordsBytes.M(int64(v.Size)))
}

resourceCount := 0
for bucketTs, resources := range resourcesSeen {
for resourceLabels, fingerprint := range resources {
insertResourcesStmtV2.Append(
resourceLabels,
fingerprint,
bucketTs,
)
resourceCount += 1
}
}

err = insertResourcesStmtV2.Send()
if err != nil {
return fmt.Errorf("couldn't send batch insert resources statement:%w", err)
}
// push tag attributes
tagWriteStart := time.Now()
err = tagStatement.Send()
Expand Down Expand Up @@ -642,58 +646,58 @@ func formatKey(k string) string {
return strings.ReplaceAll(k, ".", "_")
}

// const (
// // language=ClickHouse SQL
// insertLogsSQLTemplate = `INSERT INTO %s.%s (
// timestamp,
// observed_timestamp,
// id,
// trace_id,
// span_id,
// trace_flags,
// severity_text,
// severity_number,
// body,
// resources_string_key,
// resources_string_value,
// attributes_string_key,
// attributes_string_value,
// attributes_int64_key,
// attributes_int64_value,
// attributes_float64_key,
// attributes_float64_value,
// attributes_bool_key,
// attributes_bool_value,
// scope_name,
// scope_version,
// scope_string_key,
// scope_string_value
// ) VALUES (
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// ?,
// )`
// )
const (
// language=ClickHouse SQL
insertLogsSQLTemplate = `INSERT INTO %s.%s (
timestamp,
observed_timestamp,
id,
trace_id,
span_id,
trace_flags,
severity_text,
severity_number,
body,
resources_string_key,
resources_string_value,
attributes_string_key,
attributes_string_value,
attributes_int64_key,
attributes_int64_value,
attributes_float64_key,
attributes_float64_value,
attributes_bool_key,
attributes_bool_value,
scope_name,
scope_version,
scope_string_key,
scope_string_value
) VALUES (
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
)`
)

const (
// language=ClickHouse SQL
Expand Down Expand Up @@ -767,9 +771,9 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro
return db, nil
}

// func renderInsertLogsSQL(cfg *Config) string {
// return fmt.Sprintf(insertLogsSQLTemplate, databaseName, DISTRIBUTED_LOGS_TABLE)
// }
func renderInsertLogsSQL(cfg *Config) string {
return fmt.Sprintf(insertLogsSQLTemplate, databaseName, DISTRIBUTED_LOGS_TABLE)
}

func renderInsertLogsSQLV2(cfg *Config) string {
return fmt.Sprintf(insertLogsSQLTemplateV2, databaseName, DISTRIBUTED_LOGS_TABLE_V2)
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhouselogsexporter/logsv2/fingerprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (node *DimensionHierarchyNode) Identifier(attributes map[string]any) []IdLa
return result
}

// TODO(Raj): Consider parsing this stuff out from json
// TODO(Raj/Nitya): Consider parsing this stuff out from json
func ResourceHierarchy() *DimensionHierarchyNode {
return &DimensionHierarchyNode{
labels: []string{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
-- Please run the below commands if you are trying to fix schema migration issue https://signoz.io/docs/userguide/logs_troubleshooting/#schema-migrator-dirty-database-version
SELECT 1
ALTER TABLE signoz_logs.tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource') CODEC(ZSTD(1));
ALTER TABLE signoz_logs.distributed_tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource') CODEC(ZSTD(1));


ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP INDEX IF EXISTS scope_name_idx;

ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_name;
ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_name;

ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_version;
ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_version;


ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_string_key;
ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_string_key;

ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_string_value;
ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} DROP column IF EXISTS scope_string_value;
Loading

0 comments on commit ebd3dc2

Please sign in to comment.