Skip to content

Commit

Permalink
Increase the accuracy of time measurements by replacing
Browse files Browse the repository at this point in the history
most uses of `quanta::Instant` with `std::time::Instant`
  • Loading branch information
tatsuya6502 committed Jan 5, 2025
1 parent cdfad26 commit e681c1f
Show file tree
Hide file tree
Showing 20 changed files with 548 additions and 673 deletions.
7 changes: 0 additions & 7 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ pub(crate) mod entry_info;
#[cfg(feature = "sync")]
pub(crate) mod housekeeper;

#[cfg_attr(feature = "quanta", path = "concurrent/atomic_time/atomic_time.rs")]
#[cfg_attr(
not(feature = "quanta"),
path = "concurrent/atomic_time/atomic_time_compat.rs"
)]
pub(crate) mod atomic_time;

#[cfg(feature = "unstable-debug-counters")]
pub(crate) mod debug_counters;

Expand Down
61 changes: 0 additions & 61 deletions src/common/concurrent/atomic_time/atomic_time.rs

This file was deleted.

40 changes: 0 additions & 40 deletions src/common/concurrent/atomic_time/atomic_time_compat.rs

This file was deleted.

15 changes: 5 additions & 10 deletions src/common/concurrent/entry_info.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::atomic::{self, AtomicBool, AtomicU16, AtomicU32, Ordering};

use super::{AccessTime, KeyHash};
use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant};
use crate::common::time::{AtomicInstant, Instant};

#[derive(Debug)]
pub(crate) struct EntryInfo<K> {
Expand Down Expand Up @@ -191,7 +191,6 @@ mod test {
// e.g. "1.64"
let ver =
option_env!("RUSTC_SEMVER").expect("RUSTC_SEMVER env var was not set at compile time");
let is_quanta_enabled = cfg!(feature = "quanta");
let arch = if cfg!(target_os = "linux") {
if cfg!(target_pointer_width = "64") {
Linux64
Expand All @@ -214,14 +213,10 @@ mod test {
panic!("Unsupported target architecture");
};

let expected_sizes = match (arch, is_quanta_enabled) {
(Linux64 | Linux32Arm | Linux32Mips, true) => vec![("1.51", 56)],
(Linux32X86, true) => vec![("1.51", 48)],
(MacOS64, true) => vec![("1.62", 56)],
(Linux64, false) => vec![("1.66", 104), ("1.60", 128)],
(Linux32X86, false) => unimplemented!(),
(Linux32Arm | Linux32Mips, false) => vec![("1.66", 104), ("1.62", 128), ("1.60", 80)],
(MacOS64, false) => vec![("1.62", 104)],
let expected_sizes = match arch {
Linux64 | Linux32Arm | Linux32Mips => vec![("1.51", 56)],
Linux32X86 => vec![("1.51", 48)],
MacOS64 => vec![("1.62", 56)],
};

let mut expected = None;
Expand Down
24 changes: 9 additions & 15 deletions src/common/concurrent/housekeeper.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use super::constants::LOG_SYNC_INTERVAL_MILLIS;

use super::{
atomic_time::AtomicInstant,
constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT},
};
use crate::common::time::{CheckedTimeOps, Instant};
use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT};
use crate::common::time::{AtomicInstant, Instant};
use crate::common::HousekeeperConfig;

use parking_lot::{Mutex, MutexGuard};
Expand Down Expand Up @@ -52,7 +49,11 @@ pub(crate) struct Housekeeper {
}

impl Housekeeper {
pub(crate) fn new(is_eviction_listener_enabled: bool, config: HousekeeperConfig) -> Self {
pub(crate) fn new(
is_eviction_listener_enabled: bool,
config: HousekeeperConfig,
now: Instant,
) -> Self {
let (more_entries_to_evict, maintenance_task_timeout) = if is_eviction_listener_enabled {
(
Some(AtomicBool::new(false)),
Expand All @@ -64,7 +65,7 @@ impl Housekeeper {

Self {
run_lock: Mutex::default(),
run_after: AtomicInstant::new(Self::sync_after(Instant::now())),
run_after: AtomicInstant::new(Self::sync_after(now)),
more_entries_to_evict,
maintenance_task_timeout,
max_log_sync_repeats: config.max_log_sync_repeats,
Expand Down Expand Up @@ -127,10 +128,7 @@ impl Housekeeper {

fn sync_after(now: Instant) -> Instant {
let dur = Duration::from_millis(LOG_SYNC_INTERVAL_MILLIS);
let ts = now.checked_add(dur);
// Assuming that `now` is current wall clock time, this should never fail at
// least next millions of years.
ts.expect("Timestamp overflow")
now.saturating_add(dur)
}
}

Expand All @@ -139,8 +137,4 @@ impl Housekeeper {
pub(crate) fn disable_auto_run(&self) {
self.auto_run_enabled.store(false, Ordering::Relaxed);
}

pub(crate) fn reset_run_after(&self, now: Instant) {
self.run_after.set_instant(Self::sync_after(now));
}
}
53 changes: 5 additions & 48 deletions src/common/time.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,10 @@
use std::time::Duration;

#[cfg_attr(feature = "quanta", path = "time/clock_quanta.rs")]
#[cfg_attr(not(feature = "quanta"), path = "time/clock_compat.rs")]
pub(crate) mod clock;
mod atomic_time;
mod clock;
mod instant;

pub(crate) use atomic_time::AtomicInstant;
pub(crate) use clock::Clock;
pub(crate) use instant::Instant;

#[cfg(test)]
pub(crate) use clock::Mock;

/// a wrapper type over Instant to force checked additions and prevent
/// unintentional overflow. The type preserve the Copy semantics for the wrapped
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
pub(crate) struct Instant(clock::Instant);

pub(crate) trait CheckedTimeOps {
fn checked_add(&self, duration: Duration) -> Option<Self>
where
Self: Sized;

fn checked_duration_since(&self, earlier: Self) -> Option<Duration>
where
Self: Sized;
}

impl Instant {
pub(crate) fn new(instant: clock::Instant) -> Instant {
Instant(instant)
}

pub(crate) fn now() -> Instant {
Instant(clock::Instant::now())
}

#[cfg(feature = "quanta")]
pub(crate) fn inner_clock(self) -> clock::Instant {
self.0
}
}

impl CheckedTimeOps for Instant {
fn checked_add(&self, duration: Duration) -> Option<Instant> {
self.0.checked_add(duration).map(Instant)
}

fn checked_duration_since(&self, earlier: Self) -> Option<Duration>
where
Self: Sized,
{
self.0.checked_duration_since(earlier.0)
}
}
62 changes: 62 additions & 0 deletions src/common/time/atomic_time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use crate::common::time::Instant;

use portable_atomic::AtomicU64;
use std::sync::atomic::Ordering;

/// `AtomicInstant` is a wrapper around `AtomicU64` that provides thread-safe access
/// to an `Instant`.
///
/// `u64::MAX` is used to represent an unset `Instant`.
#[derive(Debug)]
pub(crate) struct AtomicInstant {
instant: AtomicU64,
}

impl Default for AtomicInstant {
/// Creates a new `AtomicInstant` with an unset `Instant`.
fn default() -> Self {
Self {
instant: AtomicU64::new(u64::MAX),
}
}
}

impl AtomicInstant {
/// Creates a new `AtomicInstant` with the given `Instant`.
pub(crate) fn new(instant: Instant) -> Self {
// Ensure the `Instant` is not `u64::MAX`, which means unset.
debug_assert!(instant.as_nanos() != u64::MAX);

Self {
instant: AtomicU64::new(instant.as_nanos()),
}
}

/// Clears the `Instant`.
pub(crate) fn clear(&self) {
self.instant.store(u64::MAX, Ordering::Release);
}

/// Returns `true` if the `Instant` is set.
pub(crate) fn is_set(&self) -> bool {
self.instant.load(Ordering::Acquire) != u64::MAX
}

/// Returns the `Instant` if it is set, otherwise `None`.
pub(crate) fn instant(&self) -> Option<Instant> {
let ts = self.instant.load(Ordering::Acquire);
if ts == u64::MAX {
None
} else {
Some(Instant::from_nanos(ts))
}
}

/// Sets the `Instant`.
pub(crate) fn set_instant(&self, instant: Instant) {
// Ensure the `Instant` is not `u64::MAX`, which means unset.
debug_assert!(instant.as_nanos() != u64::MAX);

self.instant.store(instant.as_nanos(), Ordering::Release);
}
}
Loading

0 comments on commit e681c1f

Please sign in to comment.