Skip to content

Commit

Permalink
ref(cogs): No longer send cogs data to Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Aug 27, 2024
1 parent 1543031 commit 931fe9b
Show file tree
Hide file tree
Showing 11 changed files with 15 additions and 180 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
- Add configuration option to specify the instance type of Relay. ([#3938](https://github.com/getsentry/relay/pull/3938))
- Update definitions for user agent parsing. ([#3951](https://github.com/getsentry/relay/pull/3951))

**Internal**:

- No longer send COGS data to dedicated Kafka topic. ([#3953](https://github.com/getsentry/relay/pull/3953))

## 24.8.0

**Bug Fixes**:
Expand Down
16 changes: 0 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ sentry-core = "0.32.2"
sentry-kafka-schemas = { version = "0.1.86", default-features = false }
sentry-release-parser = { version = "1.3.2", default-features = false }
sentry-types = "0.32.2"
sentry_usage_accountant = { version = "0.1.0", default-features = false }
semver = "1.0.23"
serde = { version = "1.0.159", features = ["derive", "rc"] }
serde-transcode = "1.1.1"
Expand Down
16 changes: 0 additions & 16 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,16 +1495,6 @@ pub struct Cogs {
///
/// Defaults to `false`.
enabled: bool,
/// Granularity of the COGS measurements.
///
/// Measurements are aggregated based on the granularity in seconds.
///
/// Aggregated measurements are always flushed at the end of their
/// aggregation window, which means the granularity also controls the flush
/// interval.
///
/// Defaults to `60` (1 minute).
granularity_secs: u64,
/// Maximium amount of COGS measurements allowed to backlog.
///
/// Any additional COGS measurements recorded will be dropped.
Expand All @@ -1523,7 +1513,6 @@ impl Default for Cogs {
fn default() -> Self {
Self {
enabled: false,
granularity_secs: 60,
max_queue_size: 10_000,
relay_resource_id: "relay_service".to_owned(),
}
Expand Down Expand Up @@ -2496,11 +2485,6 @@ impl Config {
self.values.cogs.enabled
}

/// Granularity for COGS measurements.
pub fn cogs_granularity(&self) -> Duration {
Duration::from_secs(self.values.cogs.granularity_secs)
}

/// Maximum amount of COGS measurements buffered in memory.
pub fn cogs_max_queue_size(&self) -> u64 {
self.values.cogs.max_queue_size
Expand Down
6 changes: 1 addition & 5 deletions relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ pub enum KafkaTopic {
Spans,
/// Summary for metrics collected during a span.
MetricsSummaries,
/// COGS measurements topic.
Cogs,
/// Feedback events topic.
Feedback,
}
Expand All @@ -60,7 +58,7 @@ impl KafkaTopic {
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 15] = [
static TOPICS: [KafkaTopic; 14] = [
Events,
Attachments,
Transactions,
Expand All @@ -74,7 +72,6 @@ impl KafkaTopic {
Monitors,
Spans,
MetricsSummaries,
Cogs,
Feedback,
];
TOPICS.iter()
Expand Down Expand Up @@ -137,7 +134,6 @@ define_topic_assignments! {
monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
metrics_summaries: (KafkaTopic::MetricsSummaries, "snuba-metrics-summaries", "Summary for metrics collected during a span."),
cogs: (KafkaTopic::Cogs, "shared-resources-usage", "COGS measurements."),
feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
}

Expand Down
1 change: 0 additions & 1 deletion relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ reqwest = { workspace = true, features = [
] }
rmp-serde = { workspace = true }
rust-embed = { workspace = true, optional = true }
sentry_usage_accountant = { workspace = true, default-features = false }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
Expand Down
6 changes: 1 addition & 5 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,7 @@ impl ServiceState {
})
.transpose()?;

let cogs = CogsService::new(
&config,
#[cfg(feature = "processing")]
store.clone(),
);
let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start()));

EnvelopeProcessorService::new(
Expand Down
78 changes: 9 additions & 69 deletions relay-server/src/services/cogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ use std::sync::atomic::{AtomicBool, Ordering};

use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId};
use relay_config::Config;
use relay_system::{Addr, Controller, FromMessage, Interface, Service};
use sentry_usage_accountant::{Producer, UsageAccountant, UsageUnit};
use relay_system::{Addr, FromMessage, Interface, Service};

#[cfg(feature = "processing")]
use crate::services::store::{Store, StoreCogs};
use crate::statsd::RelayCounters;

pub struct CogsReport(CogsMeasurement);
Expand All @@ -21,52 +18,15 @@ impl FromMessage<CogsMeasurement> for CogsReport {
}
}

/// Relay [accountant producer](Producer) which produces to a dynamic
/// upstram.
enum RelayProducer {
/// Produces to Kafka via the store service.
#[cfg(feature = "processing")]
Store(Addr<Store>),
/// Discards all measurements.
Noop,
}

impl Producer for RelayProducer {
type Error = std::convert::Infallible;

fn send(&mut self, _message: Vec<u8>) -> Result<(), Self::Error> {
match self {
#[cfg(feature = "processing")]
Self::Store(addr) => addr.send(StoreCogs(_message)),
Self::Noop => {}
}

Ok(())
}
}

/// Service implementing the [`CogsReport`] interface.
pub struct CogsService {
relay_resource_id: String,
usage_accountant: UsageAccountant<RelayProducer>,
}

impl CogsService {
pub fn new(config: &Config, #[cfg(feature = "processing")] store: Option<Addr<Store>>) -> Self {
#[cfg(feature = "processing")]
let producer = store
.map(RelayProducer::Store)
.filter(|_| config.cogs_enabled())
.unwrap_or(RelayProducer::Noop);

#[cfg(not(feature = "processing"))]
let producer = RelayProducer::Noop;

let granularity = chrono::Duration::from_std(config.cogs_granularity()).ok();

pub fn new(config: &Config) -> Self {
Self {
relay_resource_id: config.cogs_relay_resource_id().to_owned(),
usage_accountant: UsageAccountant::new(producer, granularity),
}
}

Expand All @@ -77,28 +37,17 @@ impl CogsService {
ResourceId::Relay => &self.relay_resource_id,
};

let (amount, unit) = match measurement.value {
relay_cogs::Value::Time(duration) => (
duration.as_micros().try_into().unwrap_or(u64::MAX),
UsageUnit::Milliseconds,
),
let amount = match measurement.value {
relay_cogs::Value::Time(duration) => {
duration.as_micros().try_into().unwrap_or(i64::MAX)
}
};

relay_statsd::metric!(
counter(RelayCounters::CogsUsage) += amount as i64,
counter(RelayCounters::CogsUsage) += amount,
resource_id = resource_id,
app_feature = measurement.feature.as_str()
);

self.usage_accountant
.record(resource_id, measurement.feature.as_str(), amount, unit)
.unwrap_or_else(|err| match err {});
}

fn handle_shutdown(&mut self) {
self.usage_accountant
.flush()
.unwrap_or_else(|err| match err {});
}
}

Expand All @@ -107,17 +56,8 @@ impl Service for CogsService {

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
let mut shutdown = Controller::shutdown_handle();

loop {
tokio::select! {
biased;

Some(message) = rx.recv() => self.handle_report(message),
_ = shutdown.notified() => self.handle_shutdown(),

else => break,
}
while let Some(message) = rx.recv().await {
self.handle_report(message);
}
});
}
Expand Down
33 changes: 0 additions & 33 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,11 @@ pub struct StoreMetrics {
pub retention: u16,
}

#[derive(Debug)]
pub struct StoreCogs(pub Vec<u8>);

/// Service interface for the [`StoreEnvelope`] message.
#[derive(Debug)]
pub enum Store {
Envelope(StoreEnvelope),
Metrics(StoreMetrics),
Cogs(StoreCogs),
}

impl Store {
Expand All @@ -107,7 +103,6 @@ impl Store {
match self {
Store::Envelope(_) => "envelope",
Store::Metrics(_) => "metrics",
Store::Cogs(_) => "cogs",
}
}
}
Expand All @@ -130,14 +125,6 @@ impl FromMessage<StoreMetrics> for Store {
}
}

impl FromMessage<StoreCogs> for Store {
type Response = NoResponse;

fn from_message(message: StoreCogs, _: ()) -> Self {
Self::Cogs(message)
}
}

/// Service implementing the [`Store`] interface.
pub struct StoreService {
workers: WorkerGroup,
Expand Down Expand Up @@ -173,7 +160,6 @@ impl StoreService {
match message {
Store::Envelope(message) => self.handle_store_envelope(message),
Store::Metrics(message) => self.handle_store_metrics(message),
Store::Cogs(message) => self.handle_store_cogs(message),
}
})
}
Expand Down Expand Up @@ -444,16 +430,6 @@ impl StoreService {
}
}

fn handle_store_cogs(&self, StoreCogs(payload): StoreCogs) {
let message = KafkaMessage::Cogs(CogsKafkaMessage(payload));
if let Err(error) = self.produce(KafkaTopic::Cogs, message) {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to store cogs measurement"
);
}
}

fn create_metric_message<'a>(
&self,
organization_id: u64,
Expand Down Expand Up @@ -1449,10 +1425,6 @@ struct ProfileChunkKafkaMessage {
payload: Bytes,
}

#[derive(Debug, Serialize)]
#[serde(transparent)]
struct CogsKafkaMessage(Vec<u8>);

/// An enum over all possible ingest messages.
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
Expand All @@ -1479,7 +1451,6 @@ enum KafkaMessage<'a> {
message: SpanKafkaMessage<'a>,
},
MetricsSummary(MetricsSummaryKafkaMessage<'a>),
Cogs(CogsKafkaMessage),
ProfileChunk(ProfileChunkKafkaMessage),
}

Expand All @@ -1505,7 +1476,6 @@ impl Message for KafkaMessage<'_> {
KafkaMessage::CheckIn(_) => "check_in",
KafkaMessage::Span { .. } => "span",
KafkaMessage::MetricsSummary(_) => "metrics_summary",
KafkaMessage::Cogs(_) => "cogs",
KafkaMessage::ProfileChunk(_) => "profile_chunk",
}
}
Expand All @@ -1530,7 +1500,6 @@ impl Message for KafkaMessage<'_> {
| Self::Span { .. }
| Self::ReplayRecordingNotChunked(_)
| Self::MetricsSummary(_)
| Self::Cogs(_)
| Self::ProfileChunk(_) => Uuid::nil(),

// TODO(ja): Determine a partitioning key
Expand Down Expand Up @@ -1584,8 +1553,6 @@ impl Message for KafkaMessage<'_> {
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),

KafkaMessage::Cogs(CogsKafkaMessage(payload)) => Ok(payload.into()),

_ => rmp_serde::to_vec_named(&self)
.map(Cow::Owned)
.map_err(ClientError::InvalidMsgPack),
Expand Down
6 changes: 0 additions & 6 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def inner(options=None):
"spans": get_topic_name("spans"),
"profiles": get_topic_name("profiles"),
"metrics_summaries": get_topic_name("metrics_summaries"),
"cogs": get_topic_name("cogs"),
"feedback": get_topic_name("feedback"),
}

Expand Down Expand Up @@ -364,11 +363,6 @@ def metrics_summaries_consumer(consumer_fixture):
yield from consumer_fixture(MetricsSummariesConsumer, "metrics_summaries")


@pytest.fixture
def cogs_consumer(consumer_fixture):
yield from consumer_fixture(CogsConsumer, "cogs")


class MetricsConsumer(ConsumerBase):
def get_metric(self, timeout=None):
message = self.poll(timeout=timeout)
Expand Down
Loading

0 comments on commit 931fe9b

Please sign in to comment.