Skip to content

Commit

Permalink
deploy: b0c0e43
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Sep 18, 2024
1 parent 59d6df6 commit 79b2c78
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 149 deletions.
2 changes: 1 addition & 1 deletion relay_monitors/fn.process_check_in.html
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><meta name="generator" content="rustdoc"><meta name="description" content="Normalizes a monitor check-in payload."><title>process_check_in in relay_monitors - Rust</title><script>if(window.location.protocol!=="file:")document.head.insertAdjacentHTML("beforeend","SourceSerif4-Regular-46f98efaafac5295.ttf.woff2,FiraSans-Regular-018c141bf0843ffd.woff2,FiraSans-Medium-8f9a781e4970d388.woff2,SourceCodePro-Regular-562dcc5011b6de7d.ttf.woff2,SourceCodePro-Semibold-d899c5a5c4aeb14a.ttf.woff2".split(",").map(f=>`<link rel="preload" as="font" type="font/woff2" crossorigin href="../static.files/${f}">`).join(""))</script><link rel="stylesheet" href="../static.files/normalize-76eba96aa4d2e634.css"><link rel="stylesheet" href="../static.files/rustdoc-c5d6553a23f1e5a6.css"><meta name="rustdoc-vars" data-root-path="../" data-static-root-path="../static.files/" data-current-crate="relay_monitors" data-themes="" data-resource-suffix="" data-rustdoc-version="1.81.0 (eeb90cda1 2024-09-04)" data-channel="1.81.0" data-search-js="search-d234aafac6c221dd.js" data-settings-js="settings-4313503d2e1961c2.js" ><script src="../static.files/storage-118b08c4c78b968e.js"></script><script defer src="sidebar-items.js"></script><script defer src="../static.files/main-d2fab2bf619172d3.js"></script><noscript><link rel="stylesheet" href="../static.files/noscript-df360f571f6edeae.css"></noscript><link rel="icon" href="https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png"></head><body class="rustdoc fn"><!--[if lte IE 11]><div class="warning">This old browser is unsupported and will most likely display funky things.</div><![endif]--><nav class="mobile-topbar"><button class="sidebar-menu-toggle" title="show sidebar"></button><a class="logo-container" href="../relay_monitors/index.html"><img src="https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" alt=""></a></nav><nav class="sidebar"><div class="sidebar-crate"><a class="logo-container" href="../relay_monitors/index.html"><img src="https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" alt="logo"></a><h2><a href="../relay_monitors/index.html">relay_monitors</a><span class="version">24.9.0</span></h2></div><div class="sidebar-elems"></div></nav><div class="sidebar-resizer"></div><main><div class="width-limiter"><rustdoc-search></rustdoc-search><section id="main-content" class="content"><div class="main-heading"><h1>Function <a href="index.html">relay_monitors</a>::<wbr><a class="fn" href="#">process_check_in</a><button id="copy-path" title="Copy item path to clipboard">Copy item path</button></h1><span class="out-of-band"><a class="src" href="../src/relay_monitors/lib.rs.html#187-228">source</a> · <button id="toggle-all-docs" title="collapse all docs">[<span>&#x2212;</span>]</button></span></div><pre class="rust item-decl"><code>pub fn process_check_in(
payload: &amp;[<a class="primitive" href="https://doc.rust-lang.org/1.81.0/std/primitive.u8.html">u8</a>],
project_id: ProjectId,
project_id: <a class="struct" href="../relay_base_schema/project/struct.ProjectId.html" title="struct relay_base_schema::project::ProjectId">ProjectId</a>,
) -&gt; <a class="enum" href="https://doc.rust-lang.org/1.81.0/core/result/enum.Result.html" title="enum core::result::Result">Result</a>&lt;<a class="struct" href="struct.ProcessedCheckInResult.html" title="struct relay_monitors::ProcessedCheckInResult">ProcessedCheckInResult</a>, <a class="enum" href="enum.ProcessCheckInError.html" title="enum relay_monitors::ProcessCheckInError">ProcessCheckInError</a>&gt;</code></pre><details class="toggle top-doc" open><summary class="hideme"><span>Expand description</span></summary><div class="docblock"><p>Normalizes a monitor check-in payload.</p>
</div></details></section></div></main></body></html>
12 changes: 0 additions & 12 deletions src/relay_server/service.rs.html
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,8 @@
<a href="#439" id="439">439</a>
<a href="#440" id="440">440</a>
<a href="#441" id="441">441</a>
<a href="#442" id="442">442</a>
<a href="#443" id="443">443</a>
<a href="#444" id="444">444</a>
<a href="#445" id="445">445</a>
<a href="#446" id="446">446</a>
<a href="#447" id="447">447</a>
</pre></div><pre class="rust"><code><span class="kw">use </span>std::convert::Infallible;
<span class="kw">use </span>std::fmt;
<span class="kw">use </span>std::sync::atomic::AtomicBool;
<span class="kw">use </span>std::sync::Arc;
<span class="kw">use </span>std::time::Duration;

Expand Down Expand Up @@ -689,9 +682,6 @@
)
.spawn_handler(processor_rx);

<span class="comment">// We initialize a shared boolean that is used to manage backpressure between the
// EnvelopeBufferService and the ProjectCacheService.
</span><span class="kw">let </span>project_cache_ready = Arc::new(AtomicBool::new(<span class="bool-val">true</span>));
<span class="kw">let </span>envelope_buffer = EnvelopeBufferService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand All @@ -701,7 +691,6 @@
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
project_cache_ready.clone(),
)
.map(|b| b.start_observable());

Expand All @@ -724,7 +713,6 @@
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
project_cache_ready,
)
.spawn_handler(project_cache_rx);

Expand Down
110 changes: 2 additions & 108 deletions src/relay_server/services/buffer/mod.rs.html
Original file line number Diff line number Diff line change
Expand Up @@ -624,59 +624,6 @@
<a href="#624" id="624">624</a>
<a href="#625" id="625">625</a>
<a href="#626" id="626">626</a>
<a href="#627" id="627">627</a>
<a href="#628" id="628">628</a>
<a href="#629" id="629">629</a>
<a href="#630" id="630">630</a>
<a href="#631" id="631">631</a>
<a href="#632" id="632">632</a>
<a href="#633" id="633">633</a>
<a href="#634" id="634">634</a>
<a href="#635" id="635">635</a>
<a href="#636" id="636">636</a>
<a href="#637" id="637">637</a>
<a href="#638" id="638">638</a>
<a href="#639" id="639">639</a>
<a href="#640" id="640">640</a>
<a href="#641" id="641">641</a>
<a href="#642" id="642">642</a>
<a href="#643" id="643">643</a>
<a href="#644" id="644">644</a>
<a href="#645" id="645">645</a>
<a href="#646" id="646">646</a>
<a href="#647" id="647">647</a>
<a href="#648" id="648">648</a>
<a href="#649" id="649">649</a>
<a href="#650" id="650">650</a>
<a href="#651" id="651">651</a>
<a href="#652" id="652">652</a>
<a href="#653" id="653">653</a>
<a href="#654" id="654">654</a>
<a href="#655" id="655">655</a>
<a href="#656" id="656">656</a>
<a href="#657" id="657">657</a>
<a href="#658" id="658">658</a>
<a href="#659" id="659">659</a>
<a href="#660" id="660">660</a>
<a href="#661" id="661">661</a>
<a href="#662" id="662">662</a>
<a href="#663" id="663">663</a>
<a href="#664" id="664">664</a>
<a href="#665" id="665">665</a>
<a href="#666" id="666">666</a>
<a href="#667" id="667">667</a>
<a href="#668" id="668">668</a>
<a href="#669" id="669">669</a>
<a href="#670" id="670">670</a>
<a href="#671" id="671">671</a>
<a href="#672" id="672">672</a>
<a href="#673" id="673">673</a>
<a href="#674" id="674">674</a>
<a href="#675" id="675">675</a>
<a href="#676" id="676">676</a>
<a href="#677" id="677">677</a>
<a href="#678" id="678">678</a>
<a href="#679" id="679">679</a>
</pre></div><pre class="rust"><code><span class="doccomment">//! Types for buffering envelopes.

</span><span class="kw">use </span>std::error::Error;
Expand Down Expand Up @@ -789,7 +736,6 @@
services: Services,
has_capacity: Arc&lt;AtomicBool&gt;,
sleep: Duration,
project_cache_ready: Arc&lt;AtomicBool&gt;,
}

<span class="doccomment">/// The maximum amount of time between evaluations of dequeue conditions.
Expand All @@ -808,7 +754,6 @@
memory_checker: MemoryChecker,
global_config_rx: watch::Receiver&lt;global_config::Status&gt;,
services: Services,
project_cache_ready: Arc&lt;AtomicBool&gt;,
) -&gt; <span class="prelude-ty">Option</span>&lt;<span class="self">Self</span>&gt; {
config.spool_v2().then(|| <span class="self">Self </span>{
config,
Expand All @@ -817,7 +762,6 @@
services,
has_capacity: Arc::new(AtomicBool::new(<span class="bool-val">true</span>)),
sleep: Duration::ZERO,
project_cache_ready,
})
}

Expand Down Expand Up @@ -851,12 +795,6 @@
tokio::time::sleep(<span class="self">self</span>.sleep).<span class="kw">await</span>;
}

<span class="comment">// In case the project cache is not ready, we defer popping to first try and handle incoming
// messages and only come back to this in case within the timeout no data was received.
</span><span class="kw">while </span>!<span class="self">self</span>.project_cache_ready.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(<span class="number">10</span>)).<span class="kw">await</span>;
}

<span class="macro">relay_statsd::metric!</span>(
counter(RelayCounters::BufferReadyToPop) += <span class="number">1</span>,
status = <span class="string">"slept"
Expand Down Expand Up @@ -922,11 +860,7 @@
.pop()
.<span class="kw">await</span><span class="question-mark">?
</span>.expect(<span class="string">"Element disappeared despite exclusive excess"</span>);
<span class="comment">// We assume that the project cache is now busy to process this envelope, so we flip
// the boolean flag, which will prioritize writes.
</span><span class="self">self</span>.project_cache_ready.store(<span class="bool-val">false</span>, Ordering::SeqCst);
<span class="self">self</span>.services.project_cache.send(DequeuedEnvelope(envelope));

<span class="self">self</span>.sleep = Duration::ZERO; <span class="comment">// try next pop immediately
</span>}
Peek::NotReady(stack_key, envelope) =&gt; {
Expand Down Expand Up @@ -1131,7 +1065,6 @@
watch::Sender&lt;global_config::Status&gt;,
mpsc::UnboundedReceiver&lt;ProjectCache&gt;,
mpsc::UnboundedReceiver&lt;TrackOutcome&gt;,
Arc&lt;AtomicBool&gt;,
) {
<span class="kw">let </span>config = Arc::new(
Config::from_json_value(<span class="macro">serde_json::json!</span>({
Expand All @@ -1147,7 +1080,6 @@
<span class="kw">let </span>(global_tx, global_rx) = watch::channel(global_config::Status::Pending);
<span class="kw">let </span>(project_cache, project_cache_rx) = Addr::custom();
<span class="kw">let </span>(outcome_aggregator, outcome_aggregator_rx) = Addr::custom();
<span class="kw">let </span>project_cache_ready = Arc::new(AtomicBool::new(<span class="bool-val">true</span>));
(
EnvelopeBufferService::new(
config,
Expand All @@ -1158,20 +1090,18 @@
outcome_aggregator,
test_store: Addr::dummy(),
},
project_cache_ready.clone(),
)
.unwrap(),
global_tx,
project_cache_rx,
outcome_aggregator_rx,
project_cache_ready,
)
}

<span class="attr">#[tokio::test]
</span><span class="kw">async fn </span>capacity_is_updated() {
tokio::time::pause();
<span class="kw">let </span>(service, _global_rx, _project_cache_tx, <span class="kw">_</span>, <span class="kw">_</span>) = buffer_service();
<span class="kw">let </span>(service, _global_rx, _project_cache_tx, <span class="kw">_</span>) = buffer_service();

<span class="comment">// Set capacity to false:
</span>service.has_capacity.store(<span class="bool-val">false</span>, Ordering::Relaxed);
Expand All @@ -1193,7 +1123,7 @@
<span class="attr">#[tokio::test]
</span><span class="kw">async fn </span>pop_requires_global_config() {
tokio::time::pause();
<span class="kw">let </span>(service, global_tx, project_cache_rx, <span class="kw">_</span>, <span class="kw">_</span>) = buffer_service();
<span class="kw">let </span>(service, global_tx, project_cache_rx, <span class="kw">_</span>) = buffer_service();

<span class="kw">let </span>addr = service.start();

Expand Down Expand Up @@ -1242,7 +1172,6 @@
)));

<span class="kw">let </span>(project_cache, project_cache_rx) = Addr::custom();
<span class="kw">let </span>project_cache_ready = Arc::new(AtomicBool::new(<span class="bool-val">true</span>));
<span class="kw">let </span>service = EnvelopeBufferService::new(
config,
memory_checker,
Expand All @@ -1252,7 +1181,6 @@
outcome_aggregator: Addr::dummy(),
test_store: Addr::dummy(),
},
project_cache_ready,
)
.unwrap();
<span class="kw">let </span>addr = service.start();
Expand Down Expand Up @@ -1288,7 +1216,6 @@
<span class="kw">let </span>(global_tx, global_rx) = watch::channel(global_config::Status::Pending);
<span class="kw">let </span>(project_cache, project_cache_rx) = Addr::custom();
<span class="kw">let </span>(outcome_aggregator, <span class="kw-2">mut </span>outcome_aggregator_rx) = Addr::custom();
<span class="kw">let </span>project_cache_ready = Arc::new(AtomicBool::new(<span class="bool-val">true</span>));
<span class="kw">let </span>service = EnvelopeBufferService::new(
config,
memory_checker,
Expand All @@ -1298,7 +1225,6 @@
outcome_aggregator,
test_store: Addr::dummy(),
},
project_cache_ready,
)
.unwrap();

Expand All @@ -1323,37 +1249,5 @@
<span class="macro">assert_eq!</span>(outcome.category, DataCategory::TransactionIndexed);
<span class="macro">assert_eq!</span>(outcome.quantity, <span class="number">1</span>);
}

<span class="attr">#[tokio::test]
</span><span class="kw">async fn </span>output_is_throttled() {
tokio::time::pause();
<span class="kw">let </span>(service, global_tx, <span class="kw-2">mut </span>project_cache_rx, <span class="kw">_</span>, <span class="kw">_</span>) = buffer_service();
global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));

<span class="kw">let </span>addr = service.start();

<span class="comment">// Send five messages:
</span><span class="kw">let </span>envelope = new_envelope(<span class="bool-val">false</span>, <span class="string">"foo"</span>);
<span class="kw">let </span>project_key = envelope.meta().public_key();
<span class="kw">for _ in </span><span class="number">0</span>..<span class="number">5 </span>{
addr.send(EnvelopeBuffer::Push(envelope.clone()));
}
addr.send(EnvelopeBuffer::Ready(project_key));

tokio::time::sleep(Duration::from_millis(<span class="number">100</span>)).<span class="kw">await</span>;

<span class="kw">let </span><span class="kw-2">mut </span>messages = <span class="macro">vec!</span>[];
project_cache_rx.recv_many(<span class="kw-2">&amp;mut </span>messages, <span class="number">100</span>).<span class="kw">await</span>;

<span class="macro">assert_eq!</span>(
messages
.iter()
.filter(|message| <span class="macro">matches!</span>(message, ProjectCache::HandleDequeuedEnvelope(..)))
.count(),
<span class="number">1
</span>);
}
}
</code></pre></div></section></main></body></html>
Loading

0 comments on commit 79b2c78

Please sign in to comment.