Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network latency (traces-metrics) correlation for kafka #5652

Merged
merged 9 commits into from
Aug 27, 2024

Conversation

shivanshuraj1333
Copy link
Member

@shivanshuraj1333 shivanshuraj1333 commented Aug 7, 2024

@shivanshuraj1333 shivanshuraj1333 marked this pull request as ready for review August 7, 2024 08:23
@srikanthccv
Copy link
Member

@shivanshuraj1333 When PR is ready, please explicitly request reviews (from those most familiar with the changes) here or on Slack, so we know it's time to look.

@shivanshuraj1333
Copy link
Member Author

  1. some updates, caching the attributes in a triplet form client_id, service_instance_id, service_name
  2. first trace calculation happens, and then the metrics calculation happens
  3. metrics would run multiple parallel queries using the native query builder
  4. at the end I'm merging all the queries, under a single column latency, and trace results in throughput
  5. based on the above triplet, a response is combined in a single data object

Overall this query may be slower as compared to other APIs, as there's cascade b/w trace and metrics, metrics query is faster, trace query is bit slow for data over 3 days

cc: @ankitnayan

@ankitnayan
Copy link
Collaborator

please share scanning speed (rows/s and bytes/s ) with settings max_threads=12 and min_bytes_to_use_direct_io=1. Also use explain indexes=1 <query> to show the efficiency of indexes used

@shivanshuraj1333
Copy link
Member Author

EXPLAIN indexes=1
WITH consumer_query AS (
    SELECT
        serviceName,
        quantile(0.99)(durationNano) / 1000000 AS p99,
        COUNT(*) AS total_requests,
        SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count,
        avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size
    FROM signoz_traces.distributed_signoz_index_v2
    WHERE
        timestamp >= '1724688229000000000'
        AND timestamp <= '1724688529000000000'
        AND kind = 5
        AND msgSystem = 'kafka'
        AND stringTagMap['messaging.destination.name'] = 'topic1'
        AND stringTagMap['messaging.destination.partition.id'] = '0'
        AND stringTagMap['messaging.kafka.consumer.group'] = 'cg1'
    GROUP BY serviceName
)

SELECT
    serviceName AS service_name,
    p99,
    COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
    COALESCE(total_requests / 1800, 0) AS throughput,  -- Convert nanoseconds to seconds
    COALESCE(avg_msg_size, 0) AS avg_msg_size
FROM
    consumer_query
ORDER BY
    serviceName
SETTINGS max_threads = 12, min_bytes_to_use_direct_io = 1;

---

Query id: bd586b0b-fd76-48a4-8ed4-54058de286dc

┌─explain────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + (Before ORDER BY + (Projection + Before ORDER BY)) [lifted up part]))        │
│   Sorting (Sorting for ORDER BY)                                                                       │
│     Expression ((Before ORDER BY + (Projection + Before ORDER BY)))                                    │
│       Aggregating                                                                                      │
│         Filter                                                                                         │
│           ReadFromMergeTree (timestampSort)                                                            │
│           Indexes:                                                                                     │
│             PrimaryKey                                                                                 │
│               Keys:                                                                                    │
│                 timestamp                                                                              │
│               Condition: and((timestamp in (-Inf, '1724688529']), (timestamp in ['1724688229', +Inf))) │
│               Parts: 3/3                                                                               │
│               Granules: 19/82                                                                          │
└────────────────────────────────────────────────────────────────────────────────────────────────────────┘

13 rows in set. Elapsed: 0.020 sec.

---


EXPLAIN indexes=1
WITH producer_query AS (
    SELECT
        serviceName,
        quantile(0.99)(durationNano) / 1000000 AS p99,
        count(*) AS total_count,
        SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count
    FROM signoz_traces.distributed_signoz_index_v2
    WHERE
        timestamp >= '1724688229000000000'
        AND timestamp <= '1724688529000000000'
        AND kind = 4
        AND msgSystem = 'kafka'
        AND stringTagMap['messaging.destination.name'] = 'topic1'
        AND stringTagMap['messaging.destination.partition.id'] = '0'
    GROUP BY serviceName
)

SELECT
    serviceName AS service_name,
    p99,
    COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage,
    COALESCE(total_count / 1800, 0) AS throughput  -- Convert nanoseconds to seconds
FROM
    producer_query
ORDER BY
    serviceName
SETTINGS max_threads = 12, min_bytes_to_use_direct_io = 1;

----

Query id: 85ba7c52-f578-44e8-b715-b974991dece3

┌─explain────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + (Before ORDER BY + (Projection + Before ORDER BY)) [lifted up part]))        │
│   Sorting (Sorting for ORDER BY)                                                                       │
│     Expression ((Before ORDER BY + (Projection + Before ORDER BY)))                                    │
│       Aggregating                                                                                      │
│         Filter                                                                                         │
│           ReadFromMergeTree (timestampSort)                                                            │
│           Indexes:                                                                                     │
│             PrimaryKey                                                                                 │
│               Keys:                                                                                    │
│                 timestamp                                                                              │
│               Condition: and((timestamp in (-Inf, '1724688529']), (timestamp in ['1724688229', +Inf))) │
│               Parts: 3/3                                                                               │
│               Granules: 19/82                                                                          │
└────────────────────────────────────────────────────────────────────────────────────────────────────────┘

13 rows in set. Elapsed: 0.019 sec.

----


EXPLAIN indexes=1
SELECT
    stringTagMap['messaging.client_id'] AS client_id,
	stringTagMap['service.instance.id'] AS service_instance_id,
    serviceName AS service_name,
    count(*) / 1800 AS throughput  -- Convert nanoseconds to seconds
FROM signoz_traces.distributed_signoz_index_v2
WHERE
    timestamp >= '1724688229000000000'
    AND timestamp <= '1724688529000000000'
    AND kind = 5
    AND msgSystem = 'kafka' 
    AND stringTagMap['messaging.kafka.consumer.group'] = 'cg1'
GROUP BY service_name, client_id, service_instance_id
ORDER BY throughput DESC
SETTINGS max_threads = 12, min_bytes_to_use_direct_io = 1;

---

Query id: 55e56551-0071-441b-adfe-440ea13e3316

┌─explain────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Expression (Projection)                                                                                │
│   Sorting (Sorting for ORDER BY)                                                                       │
│     Expression (Before ORDER BY)                                                                       │
│       Aggregating                                                                                      │
│         Filter                                                                                         │
│           ReadFromMergeTree (timestampSort)                                                            │
│           Indexes:                                                                                     │
│             PrimaryKey                                                                                 │
│               Keys:                                                                                    │
│                 timestamp                                                                              │
│               Condition: and((timestamp in (-Inf, '1724688529']), (timestamp in ['1724688229', +Inf))) │
│               Parts: 3/3                                                                               │
│               Granules: 19/82                                                                          │
└────────────────────────────────────────────────────────────────────────────────────────────────────────┘

13 rows in set. Elapsed: 0.016 sec.

---


EXPLAIN indexes=1
SELECT
    service_name,
    client_id,
    service_instance_id,
    ts,
    avg(per_series_value) as value
FROM (
    SELECT
        fingerprint,
        any(service_name) as service_name,
        any(client_id) as client_id,
        any(service_instance_id) as service_instance_id,
        toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts,
        avg(value) as per_series_value
    FROM signoz_metrics.distributed_samples_v4
    INNER JOIN (
        SELECT DISTINCT
            JSONExtractString(labels, 'service_name') as service_name,
            JSONExtractString(labels, 'client_id') as client_id,
            JSONExtractString(labels, 'service_instance_id') as service_instance_id,
            fingerprint
        FROM signoz_metrics.time_series_v4
        WHERE
            metric_name = 'kafka_consumer_fetch_latency_avg'
            AND temporality = 'Unspecified'
            AND unix_milli >= 1724688229000
            AND unix_milli < 1724688529000
            AND JSONExtractString(labels, 'service_name') = 'consumer-svc'
            AND JSONExtractString(labels, 'client_id') = 'consumer-cg1-1'
            AND JSONExtractString(labels, 'service_instance_id') = 'ccf49550-2e8f-4c7b-be29-b9e0891ef93d'
    ) as filtered_time_series
    USING fingerprint
    WHERE
        metric_name = 'kafka_consumer_fetch_latency_avg'
        AND unix_milli >= 1724688229000
        AND unix_milli < 1724688529000
    GROUP BY fingerprint, ts
    ORDER BY fingerprint, ts
)
WHERE isNaN(per_series_value) = 0
GROUP BY service_name, client_id, service_instance_id, ts
ORDER BY service_name ASC, client_id ASC, service_instance_id ASC, ts ASC
SETTINGS max_threads = 12, min_bytes_to_use_direct_io = 1;

---

Query id: e49a6eb1-8384-4b79-b8c9-7a04d01c5526

┌─explain────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Expression (Projection)                                                                                                                                                                                                                                    │
│   Sorting (Sorting for ORDER BY)                                                                                                                                                                                                                           │
│     Expression (Before ORDER BY)                                                                                                                                                                                                                           │
│       Aggregating                                                                                                                                                                                                                                          │
│         Expression (Before GROUP BY)                                                                                                                                                                                                                       │
│           Filter ((WHERE + (Projection + Before ORDER BY)))                                                                                                                                                                                                │
│             Filter (HAVING)                                                                                                                                                                                                                                │
│               Aggregating                                                                                                                                                                                                                                  │
│                 Expression ((Before GROUP BY + ))                                                                                                                                                                                                          │
│                   Join (JOIN FillRightFirst)                                                                                                                                                                                                               │
│                     Filter (( + Before JOIN))                                                                                                                                                                                                              │
│                       ReadFromMergeTree (signoz_metrics.samples_v4)                                                                                                                                                                                        │
│                       Indexes:                                                                                                                                                                                                                             │
│                         MinMax                                                                                                                                                                                                                             │
│                           Keys:                                                                                                                                                                                                                            │
│                             unix_milli                                                                                                                                                                                                                     │
│                           Condition: and((unix_milli in (-Inf, 1724688528999]), (unix_milli in [1724688229000, +Inf)))                                                                                                                                     │
│                           Parts: 1/69                                                                                                                                                                                                                      │
│                           Granules: 98/10421                                                                                                                                                                                                               │
│                         Partition                                                                                                                                                                                                                          │
│                           Keys:                                                                                                                                                                                                                            │
│                             toDate(divide(unix_milli, 1000))                                                                                                                                                                                               │
│                           Condition: and((toDate(divide(unix_milli, 1000)) in (-Inf, 19961]), (toDate(divide(unix_milli, 1000)) in [19961, +Inf)))                                                                                                         │
│                           Parts: 1/1                                                                                                                                                                                                                       │
│                           Granules: 98/98                                                                                                                                                                                                                  │
│                         PrimaryKey                                                                                                                                                                                                                         │
│                           Keys:                                                                                                                                                                                                                            │
│                             metric_name                                                                                                                                                                                                                    │
│                             unix_milli                                                                                                                                                                                                                     │
│                           Condition: and((unix_milli in (-Inf, 1724688528999]), and((unix_milli in [1724688229000, +Inf)), (metric_name in ['kafka_consumer_fetch_latency_avg', 'kafka_consumer_fetch_latency_avg'])))                                     │
│                           Parts: 1/1                                                                                                                                                                                                                       │
│                           Granules: 5/98                                                                                                                                                                                                                   │
│                     Expression ((Joined actions + (Rename joined columns + Projection)))                                                                                                                                                                   │
│                       Distinct                                                                                                                                                                                                                             │
│                         Distinct (Preliminary DISTINCT)                                                                                                                                                                                                    │
│                           Expression (Before ORDER BY)                                                                                                                                                                                                     │
│                             ReadFromMergeTree (signoz_metrics.time_series_v4)                                                                                                                                                                              │
│                             Indexes:                                                                                                                                                                                                                       │
│                               MinMax                                                                                                                                                                                                                       │
│                                 Keys:                                                                                                                                                                                                                      │
│                                   unix_milli                                                                                                                                                                                                               │
│                                 Condition: and((unix_milli in (-Inf, 1724688528999]), (unix_milli in [1724688229000, +Inf)))                                                                                                                               │
│                                 Parts: 0/38                                                                                                                                                                                                                │
│                                 Granules: 0/187                                                                                                                                                                                                            │
│                               Partition                                                                                                                                                                                                                    │
│                                 Keys:                                                                                                                                                                                                                      │
│                                   toDate(divide(unix_milli, 1000))                                                                                                                                                                                         │
│                                 Condition: and((toDate(divide(unix_milli, 1000)) in (-Inf, 19961]), (toDate(divide(unix_milli, 1000)) in [19961, +Inf)))                                                                                                   │
│                                 Parts: 0/0                                                                                                                                                                                                                 │
│                                 Granules: 0/0                                                                                                                                                                                                              │
│                               PrimaryKey                                                                                                                                                                                                                   │
│                                 Keys:                                                                                                                                                                                                                      │
│                                   temporality                                                                                                                                                                                                              │
│                                   metric_name                                                                                                                                                                                                              │
│                                   unix_milli                                                                                                                                                                                                               │
│                                 Condition: and((unix_milli in (-Inf, 1724688528999]), and((unix_milli in [1724688229000, +Inf)), and((metric_name in ['kafka_consumer_fetch_latency_avg', 'kafka_consumer_fetch_latency_avg']), (temporality in ['Unspecified', 'Unspecified'])))) │
│                                 Parts: 0/0                                                                                                                                                                                                                 │
│                                 Granules: 0/0                                                                                                                                                                                                              │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

58 rows in set. Elapsed: 0.025 sec.

@shivanshuraj1333 shivanshuraj1333 merged commit 7191168 into SigNoz:develop Aug 27, 2024
11 of 12 checks passed
@ankitnayan
Copy link
Collaborator

the query is not going to work for users sending >1K spans/s. No index being used apart from timestamp.

This PR might help in secondary indexes SigNoz/signoz-otel-collector#366 OR we will need to use columns. We shall revisit perf later

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants