Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(cogs): No longer send cogs data to Kafka #3953

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
- Add configuration option to specify the instance type of Relay. ([#3938](https://github.com/getsentry/relay/pull/3938))
- Update definitions for user agent parsing. ([#3951](https://github.com/getsentry/relay/pull/3951))

**Internal**:

- No longer send COGS data to dedicated Kafka topic. ([#3953](https://github.com/getsentry/relay/pull/3953))

## 24.8.0

**Bug Fixes**:
Expand Down
16 changes: 0 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ sentry-core = "0.32.2"
sentry-kafka-schemas = { version = "0.1.86", default-features = false }
sentry-release-parser = { version = "1.3.2", default-features = false }
sentry-types = "0.32.2"
sentry_usage_accountant = { version = "0.1.0", default-features = false }
semver = "1.0.23"
serde = { version = "1.0.159", features = ["derive", "rc"] }
serde-transcode = "1.1.1"
Expand Down
16 changes: 0 additions & 16 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,16 +1495,6 @@ pub struct Cogs {
///
/// Defaults to `false`.
enabled: bool,
/// Granularity of the COGS measurements.
///
/// Measurements are aggregated based on the granularity in seconds.
///
/// Aggregated measurements are always flushed at the end of their
/// aggregation window, which means the granularity also controls the flush
/// interval.
///
/// Defaults to `60` (1 minute).
granularity_secs: u64,
/// Maximium amount of COGS measurements allowed to backlog.
///
/// Any additional COGS measurements recorded will be dropped.
Expand All @@ -1523,7 +1513,6 @@ impl Default for Cogs {
fn default() -> Self {
Self {
enabled: false,
granularity_secs: 60,
max_queue_size: 10_000,
relay_resource_id: "relay_service".to_owned(),
}
Expand Down Expand Up @@ -2496,11 +2485,6 @@ impl Config {
self.values.cogs.enabled
}

/// Granularity for COGS measurements.
pub fn cogs_granularity(&self) -> Duration {
Duration::from_secs(self.values.cogs.granularity_secs)
}

/// Maximum amount of COGS measurements buffered in memory.
pub fn cogs_max_queue_size(&self) -> u64 {
self.values.cogs.max_queue_size
Expand Down
6 changes: 1 addition & 5 deletions relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ pub enum KafkaTopic {
Spans,
/// Summary for metrics collected during a span.
MetricsSummaries,
/// COGS measurements topic.
Cogs,
/// Feedback events topic.
Feedback,
}
Expand All @@ -60,7 +58,7 @@ impl KafkaTopic {
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 15] = [
static TOPICS: [KafkaTopic; 14] = [
Events,
Attachments,
Transactions,
Expand All @@ -74,7 +72,6 @@ impl KafkaTopic {
Monitors,
Spans,
MetricsSummaries,
Cogs,
Feedback,
];
TOPICS.iter()
Expand Down Expand Up @@ -137,7 +134,6 @@ define_topic_assignments! {
monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
metrics_summaries: (KafkaTopic::MetricsSummaries, "snuba-metrics-summaries", "Summary for metrics collected during a span."),
cogs: (KafkaTopic::Cogs, "shared-resources-usage", "COGS measurements."),
feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
}

Expand Down
1 change: 0 additions & 1 deletion relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ reqwest = { workspace = true, features = [
] }
rmp-serde = { workspace = true }
rust-embed = { workspace = true, optional = true }
sentry_usage_accountant = { workspace = true, default-features = false }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
Expand Down
6 changes: 1 addition & 5 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,7 @@ impl ServiceState {
})
.transpose()?;

let cogs = CogsService::new(
&config,
#[cfg(feature = "processing")]
store.clone(),
);
let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start()));

EnvelopeProcessorService::new(
Expand Down
78 changes: 9 additions & 69 deletions relay-server/src/services/cogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ use std::sync::atomic::{AtomicBool, Ordering};

use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId};
use relay_config::Config;
use relay_system::{Addr, Controller, FromMessage, Interface, Service};
use sentry_usage_accountant::{Producer, UsageAccountant, UsageUnit};
use relay_system::{Addr, FromMessage, Interface, Service};

#[cfg(feature = "processing")]
use crate::services::store::{Store, StoreCogs};
use crate::statsd::RelayCounters;

pub struct CogsReport(CogsMeasurement);
Expand All @@ -21,52 +18,15 @@ impl FromMessage<CogsMeasurement> for CogsReport {
}
}

/// Relay [accountant producer](Producer) which produces to a dynamic
/// upstram.
enum RelayProducer {
/// Produces to Kafka via the store service.
#[cfg(feature = "processing")]
Store(Addr<Store>),
/// Discards all measurements.
Noop,
}

impl Producer for RelayProducer {
type Error = std::convert::Infallible;

fn send(&mut self, _message: Vec<u8>) -> Result<(), Self::Error> {
match self {
#[cfg(feature = "processing")]
Self::Store(addr) => addr.send(StoreCogs(_message)),
Self::Noop => {}
}

Ok(())
}
}

/// Service implementing the [`CogsReport`] interface.
pub struct CogsService {
relay_resource_id: String,
usage_accountant: UsageAccountant<RelayProducer>,
}

impl CogsService {
pub fn new(config: &Config, #[cfg(feature = "processing")] store: Option<Addr<Store>>) -> Self {
#[cfg(feature = "processing")]
let producer = store
.map(RelayProducer::Store)
.filter(|_| config.cogs_enabled())
.unwrap_or(RelayProducer::Noop);

#[cfg(not(feature = "processing"))]
let producer = RelayProducer::Noop;

let granularity = chrono::Duration::from_std(config.cogs_granularity()).ok();

pub fn new(config: &Config) -> Self {
Self {
relay_resource_id: config.cogs_relay_resource_id().to_owned(),
usage_accountant: UsageAccountant::new(producer, granularity),
}
}

Expand All @@ -77,28 +37,17 @@ impl CogsService {
ResourceId::Relay => &self.relay_resource_id,
};

let (amount, unit) = match measurement.value {
relay_cogs::Value::Time(duration) => (
duration.as_micros().try_into().unwrap_or(u64::MAX),
UsageUnit::Milliseconds,
),
let amount = match measurement.value {
relay_cogs::Value::Time(duration) => {
duration.as_micros().try_into().unwrap_or(i64::MAX)
}
};

relay_statsd::metric!(
counter(RelayCounters::CogsUsage) += amount as i64,
counter(RelayCounters::CogsUsage) += amount,
resource_id = resource_id,
app_feature = measurement.feature.as_str()
);

self.usage_accountant
.record(resource_id, measurement.feature.as_str(), amount, unit)
.unwrap_or_else(|err| match err {});
}

fn handle_shutdown(&mut self) {
self.usage_accountant
.flush()
.unwrap_or_else(|err| match err {});
}
}

Expand All @@ -107,17 +56,8 @@ impl Service for CogsService {

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
let mut shutdown = Controller::shutdown_handle();

loop {
tokio::select! {
biased;

Some(message) = rx.recv() => self.handle_report(message),
_ = shutdown.notified() => self.handle_shutdown(),

else => break,
}
while let Some(message) = rx.recv().await {
self.handle_report(message);
}
});
}
Expand Down
33 changes: 0 additions & 33 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,11 @@ pub struct StoreMetrics {
pub retention: u16,
}

#[derive(Debug)]
pub struct StoreCogs(pub Vec<u8>);

/// Service interface for the [`StoreEnvelope`] message.
#[derive(Debug)]
pub enum Store {
Envelope(StoreEnvelope),
Metrics(StoreMetrics),
Cogs(StoreCogs),
}

impl Store {
Expand All @@ -107,7 +103,6 @@ impl Store {
match self {
Store::Envelope(_) => "envelope",
Store::Metrics(_) => "metrics",
Store::Cogs(_) => "cogs",
}
}
}
Expand All @@ -130,14 +125,6 @@ impl FromMessage<StoreMetrics> for Store {
}
}

impl FromMessage<StoreCogs> for Store {
type Response = NoResponse;

fn from_message(message: StoreCogs, _: ()) -> Self {
Self::Cogs(message)
}
}

/// Service implementing the [`Store`] interface.
pub struct StoreService {
workers: WorkerGroup,
Expand Down Expand Up @@ -173,7 +160,6 @@ impl StoreService {
match message {
Store::Envelope(message) => self.handle_store_envelope(message),
Store::Metrics(message) => self.handle_store_metrics(message),
Store::Cogs(message) => self.handle_store_cogs(message),
}
})
}
Expand Down Expand Up @@ -444,16 +430,6 @@ impl StoreService {
}
}

fn handle_store_cogs(&self, StoreCogs(payload): StoreCogs) {
let message = KafkaMessage::Cogs(CogsKafkaMessage(payload));
if let Err(error) = self.produce(KafkaTopic::Cogs, message) {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to store cogs measurement"
);
}
}

fn create_metric_message<'a>(
&self,
organization_id: u64,
Expand Down Expand Up @@ -1449,10 +1425,6 @@ struct ProfileChunkKafkaMessage {
payload: Bytes,
}

#[derive(Debug, Serialize)]
#[serde(transparent)]
struct CogsKafkaMessage(Vec<u8>);

/// An enum over all possible ingest messages.
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
Expand All @@ -1479,7 +1451,6 @@ enum KafkaMessage<'a> {
message: SpanKafkaMessage<'a>,
},
MetricsSummary(MetricsSummaryKafkaMessage<'a>),
Cogs(CogsKafkaMessage),
ProfileChunk(ProfileChunkKafkaMessage),
}

Expand All @@ -1505,7 +1476,6 @@ impl Message for KafkaMessage<'_> {
KafkaMessage::CheckIn(_) => "check_in",
KafkaMessage::Span { .. } => "span",
KafkaMessage::MetricsSummary(_) => "metrics_summary",
KafkaMessage::Cogs(_) => "cogs",
KafkaMessage::ProfileChunk(_) => "profile_chunk",
}
}
Expand All @@ -1530,7 +1500,6 @@ impl Message for KafkaMessage<'_> {
| Self::Span { .. }
| Self::ReplayRecordingNotChunked(_)
| Self::MetricsSummary(_)
| Self::Cogs(_)
| Self::ProfileChunk(_) => Uuid::nil(),

// TODO(ja): Determine a partitioning key
Expand Down Expand Up @@ -1584,8 +1553,6 @@ impl Message for KafkaMessage<'_> {
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),

KafkaMessage::Cogs(CogsKafkaMessage(payload)) => Ok(payload.into()),

_ => rmp_serde::to_vec_named(&self)
.map(Cow::Owned)
.map_err(ClientError::InvalidMsgPack),
Expand Down
1 change: 0 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
spans_consumer,
profiles_consumer,
metrics_summaries_consumer,
cogs_consumer,
feedback_consumer,
)

Expand Down
Loading
Loading