Skip to content

Commit

Permalink
replace all uses
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Jul 24, 2024
1 parent 0c3b964 commit 8abf61f
Show file tree
Hide file tree
Showing 22 changed files with 43 additions and 41 deletions.
2 changes: 1 addition & 1 deletion relay-server/src/services/cogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Service for CogsService {
type Interface = CogsReport;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
let mut shutdown = Controller::shutdown_handle();

loop {
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl GlobalConfigService {
let upstream_relay = self.upstream.clone();
let internal_tx = self.internal_tx.clone();

tokio::spawn(async move {
relay_system::spawn(async move {
metric!(timer(RelayTimers::GlobalConfigRequestDuration), {
let query = GetGlobalConfig::new();
let res = upstream_relay.send(SendQuery(query)).await;
Expand Down Expand Up @@ -360,7 +360,7 @@ impl Service for GlobalConfigService {
type Interface = GlobalConfigManager;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
let mut shutdown_handle = Controller::shutdown_handle();

relay_log::info!("global config service starting");
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl Service for HealthCheckService {
// Add 10% buffer to the internal timeouts to avoid race conditions.
let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);

tokio::spawn(async move {
relay_system::spawn(async move {
let shutdown = Controller::shutdown_handle();

while shutdown.get().is_none() {
Expand All @@ -215,7 +215,7 @@ impl Service for HealthCheckService {
update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
});

tokio::spawn(async move {
relay_system::spawn(async move {
while let Some(HealthCheck(message, sender)) = rx.recv().await {
let update = update_rx.borrow();

Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Service for AggregatorService {
type Interface = Aggregator;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms));
let mut shutdown = Controller::shutdown_handle();

Expand Down Expand Up @@ -331,7 +331,7 @@ mod tests {
type Interface = TestInterface;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
while let Some(message) = rx.recv().await {
let buckets = message.0.buckets;
relay_log::debug!(?buckets, "received buckets");
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/metrics/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Service for RouterService {
type Interface = Aggregator;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
let mut router = StartedRouter::start(self);
relay_log::info!("metrics router started");

Expand Down Expand Up @@ -117,7 +117,7 @@ impl StartedRouter {
.chain(Some(self.default.send(AcceptsMetrics)))
.collect::<FuturesUnordered<_>>();

tokio::spawn(async move {
relay_system::spawn(async move {
let mut accepts = true;
while let Some(req) = requests.next().await {
accepts &= req.unwrap_or_default();
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ impl HttpOutcomeProducer {

let upstream_relay = self.upstream_relay.clone();

tokio::spawn(async move {
relay_system::spawn(async move {
match upstream_relay.send(SendQuery(request)).await {
Ok(_) => relay_log::trace!("outcome batch sent"),
Err(error) => {
Expand All @@ -683,7 +683,7 @@ impl Service for HttpOutcomeProducer {
type Interface = TrackRawOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
loop {
tokio::select! {
// Prioritize flush over receiving messages to prevent starving.
Expand Down Expand Up @@ -776,7 +776,7 @@ impl Service for ClientReportOutcomeProducer {
type Interface = TrackOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
loop {
tokio::select! {
// Prioritize flush over receiving messages to prevent starving.
Expand Down Expand Up @@ -1037,7 +1037,7 @@ impl Service for OutcomeProducerService {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
let Self { config, inner } = self;

tokio::spawn(async move {
relay_system::spawn(async move {
let broker = inner.start();

relay_log::info!("OutcomeProducer started.");
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/outcome_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Service for OutcomeAggregator {
type Interface = TrackOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
let mut shutdown = Controller::shutdown_handle();
relay_log::info!("outcome aggregator started");

Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2814,7 +2814,7 @@ impl Service for EnvelopeProcessorService {
type Interface = EnvelopeProcessor;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
while let Some(message) = rx.recv().await {
let service = self.clone();
self.inner
Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ impl ProjectCacheBroker {
let source = self.source.clone();
let sender = self.state_tx.clone();

tokio::spawn(async move {
relay_system::spawn(async move {
// Wait on the new attempt time when set.
if let Some(next_attempt) = next_attempt {
tokio::time::sleep_until(next_attempt).await;
Expand Down Expand Up @@ -1148,7 +1148,7 @@ impl Service for ProjectCacheService {
let outcome_aggregator = services.outcome_aggregator.clone();
let test_store = services.test_store.clone();

tokio::spawn(async move {
relay_system::spawn(async move {
let mut ticker = tokio::time::interval(config.cache_eviction_interval());
relay_log::info!("project cache started");

Expand Down Expand Up @@ -1429,7 +1429,7 @@ mod tests {
}

// Emulate the project cache service loop.
tokio::task::spawn(async move {
relay_system::spawn(async move {
loop {
select! {

Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/project_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn spawn_poll_local_states(
poll_local_states(&project_path, &tx).await;

// Start a background loop that polls periodically:
tokio::spawn(async move {
relay_system::spawn(async move {
// To avoid running two load tasks simultaneously at startup, we delay the interval by one period:
let start_at = Instant::now() + period;
let mut ticker = tokio::time::interval_at(start_at, period);
Expand All @@ -166,7 +166,7 @@ impl Service for LocalProjectSourceService {
// collect the result, the producer will block, which is acceptable.
let (state_tx, mut state_rx) = mpsc::channel(1);

tokio::spawn(async move {
relay_system::spawn(async move {
relay_log::info!("project local cache started");

// Start the background task that periodically reloads projects from disk:
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/project_upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ impl UpstreamProjectSourceService {
let channels = self.prepare_batches();
let upstream_relay = self.upstream_relay.clone();

tokio::spawn(async move {
relay_system::spawn(async move {
let responses = Self::fetch_states(config, upstream_relay, channels).await;
// Send back all resolved responses and also unused channels.
// These responses will be handled by `handle_responses` function.
Expand Down Expand Up @@ -532,7 +532,7 @@ impl Service for UpstreamProjectSourceService {
type Interface = UpstreamProjectSource;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
relay_log::info!("project upstream cache started");
loop {
tokio::select! {
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/relays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl RelayCacheService {

let fetch_tx = self.fetch_tx();
let upstream_relay = self.upstream_relay.clone();
tokio::spawn(async move {
relay_system::spawn(async move {
let request = GetRelays {
relay_ids: channels.keys().cloned().collect(),
};
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Service for RelayCacheService {
type Interface = RelayCache;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
relay_log::info!("key cache started");

loop {
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ impl Service for HttpServer {
relay_log::info!("spawning http server");
relay_log::info!(" listening on http://{}/", config.listen_addr());
relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1);
tokio::spawn(server.serve(app));
relay_system::spawn(server.serve(app));
}
Err(err) => {
relay_log::error!("Failed to start the HTTP server: {err}");
std::process::exit(1);
}
}

tokio::spawn(async move {
relay_system::spawn(async move {
let Shutdown { timeout } = Controller::shutdown_handle().notified().await;
relay_log::info!("Shutting down HTTP server");

Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/services/spooler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ impl BufferService {
BufferState::Disk(ref disk) => {
let db = disk.db.clone();
let project_cache = self.services.project_cache.clone();
tokio::spawn(async move {
relay_system::spawn(async move {
match OnDisk::get_spooled_index(&db).await {
Ok(index) => {
relay_log::trace!(
Expand Down Expand Up @@ -1255,7 +1255,7 @@ impl Service for BufferService {
type Interface = Buffer;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
let mut shutdown = Controller::shutdown_handle();

loop {
Expand Down Expand Up @@ -1574,7 +1574,7 @@ mod tests {
type Interface = TestHealth;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
loop {
tokio::select! {
Some(TestHealth::SpoolHealth(sender)) = rx.recv() => self.buffer.send(Health(sender)),
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl Service for RelayStats {
return;
};

tokio::spawn(async move {
relay_system::spawn(async move {
loop {
let _ = tokio::join!(
self.upstream_status(),
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ impl Service for StoreService {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
let this = Arc::new(self);

tokio::spawn(async move {
relay_system::spawn(async move {
relay_log::info!("store forwarder started");

while let Some(message) = rx.recv().await {
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/test_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl relay_system::Service for TestStoreService {
type Interface = TestStore;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn(async move {
while let Some(message) = rx.recv().await {
self.handle_message(message);
}
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ impl ConnectionMonitor {
// Only take action if we exceeded the grace period.
if first_error + self.client.config.http_outage_grace_period() <= now {
let return_tx = return_tx.clone();
let task = tokio::spawn(Self::connect(self.client.clone(), return_tx));
let task = relay_system::spawn(Self::connect(self.client.clone(), return_tx));
self.state = ConnectionState::Reconnecting(task);
}
}
Expand Down Expand Up @@ -1429,7 +1429,7 @@ impl UpstreamBroker {
let client = self.client.clone();
let action_tx = self.action_tx.clone();

tokio::spawn(async move {
relay_system::spawn(async move {
let send_start = Instant::now();
let result = client.send(entry.request.as_mut()).await;
emit_response_metrics(send_start, &entry, &result);
Expand Down Expand Up @@ -1512,7 +1512,7 @@ impl Service for UpstreamRelayService {
state: AuthState::Unknown,
tx: action_tx.clone(),
};
tokio::spawn(auth.run());
relay_system::spawn(auth.run());

// Main broker that serializes public and internal messages, as well as maintains connection
// and authentication state.
Expand All @@ -1525,7 +1525,7 @@ impl Service for UpstreamRelayService {
action_tx,
};

tokio::spawn(async move {
relay_system::spawn(async move {
loop {
tokio::select! {
biased;
Expand Down
5 changes: 3 additions & 2 deletions relay-system/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl ShutdownHandle {
/// type Interface = ();
///
/// fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
/// tokio::spawn(async move {
/// relay_system::spawn(async move {
/// let mut shutdown = Controller::shutdown_handle();
///
/// loop {
Expand Down Expand Up @@ -166,8 +166,9 @@ pub struct Controller;

impl Controller {
/// Starts a controller that monitors shutdown signals.
#[track_caller]
pub fn start(shutdown_timeout: Duration) {
tokio::spawn(monitor_shutdown(shutdown_timeout));
crate::spawn(monitor_shutdown(shutdown_timeout));
}

/// Manually initiates the shutdown process of the system.
Expand Down
1 change: 1 addition & 0 deletions relay-system/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::statsd::{SystemCounters, SystemTimers};
///
/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
#[track_caller]
#[allow(clippy::disallowed_methods)]
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
Expand Down
4 changes: 2 additions & 2 deletions relay-system/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ pub fn channel<I: Interface>(name: &'static str) -> (Addr<I>, Receiver<I>) {
/// type Interface = MyMessage;
///
/// fn spawn_handler(self, mut rx: Receiver<Self::Interface>) {
/// tokio::spawn(async move {
/// relay_system::spawn(async move {
/// while let Some(message) = rx.recv().await {
/// // handle the message
/// }
Expand Down Expand Up @@ -1047,7 +1047,7 @@ mod tests {
type Interface = MockMessage;

fn spawn_handler(self, mut rx: Receiver<Self::Interface>) {
tokio::spawn(async move {
crate::spawn(async move {
while rx.recv().await.is_some() {
tokio::time::sleep(BACKLOG_INTERVAL * 2).await;
}
Expand Down
2 changes: 1 addition & 1 deletion relay-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
{
let (addr, mut rx) = channel(name);

let handle = tokio::spawn(async move {
let handle = relay_system::spawn(async move {
while let Some(msg) = rx.recv().await {
f(&mut state, msg);
}
Expand Down

0 comments on commit 8abf61f

Please sign in to comment.