Skip to content

Commit

Permalink
ref(metrics): Remodel metric submission flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Jul 26, 2024
1 parent f6e4940 commit 8c70b12
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 557 deletions.
10 changes: 5 additions & 5 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use serde::Deserialize;
use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items};
use crate::service::ServiceState;
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{ProcessMetricMeta, ProcessMetrics, ProcessingGroup};
use crate::services::project_cache::{CheckEnvelope, ValidateEnvelope};
use crate::services::processor::{MetricData, ProcessMetricMeta, ProcessingGroup};
use crate::services::project_cache::{CheckEnvelope, ProcessMetrics, ValidateEnvelope};
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope, MultipartError};

Expand Down Expand Up @@ -273,9 +273,9 @@ fn queue_envelope(

if !metric_items.is_empty() {
relay_log::trace!("sending metrics into processing queue");
state.processor().send(ProcessMetrics {
items: metric_items.into_vec(),
start_time: envelope.meta().start_time(),
state.project_cache().send(ProcessMetrics {
data: MetricData::Raw(metric_items.into_vec()),
start_time: envelope.meta().start_time().into(),
sent_at: envelope.sent_at(),
project_key: envelope.meta().public_key(),
source: envelope.meta().into(),
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ impl ServiceState {
test_store: test_store.clone(),
#[cfg(feature = "processing")]
store_forwarder: store.clone(),
aggregator: aggregator.clone(),
},
metric_outcomes.clone(),
)
Expand All @@ -259,7 +260,6 @@ impl ServiceState {
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_services,
metric_outcomes,
redis_pool.clone(),
)
.spawn_handler(project_cache_rx);
Expand Down
Loading

0 comments on commit 8c70b12

Please sign in to comment.