Skip to content

Commit

Permalink
--rate mode: fix latency stats skew on low rates
Browse files Browse the repository at this point in the history
Worker threads used sleep() in the bounded rate mode to avoid CPU
hogging when polling on the event queue. The problem was that sleep()
calls increased time spent by events in the queue, which is counted
towards event latency to avoid the coordinated omission problem. The
lower the rate, the bigger impact on latency was observed.

The fix is replacing sleep()s with a condition variable which is now
used to put worker into an idle wait when the queue is empty. The event
generation thread now uses pthread_cond_signal() after each generated
event to wake up an idle worker thread. Which somewhat increases
sysbench CPU consumption, especially for high rates.
  • Loading branch information
akopytov committed Dec 10, 2018
1 parent 49f1757 commit bb8c3b0
Showing 1 changed file with 41 additions and 11 deletions.
52 changes: 41 additions & 11 deletions src/sysbench.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ sb_test_t *current_test;
static sb_barrier_t thread_start_barrier;

/* structures to handle queue of events, needed for tx_rate mode */
static pthread_mutex_t queue_mutex;
static pthread_cond_t queue_cond;
static uint64_t queue_array[MAX_QUEUE_LEN] CK_CC_CACHELINE;
static ck_ring_buffer_t queue_ring_buffer[MAX_QUEUE_LEN] CK_CC_CACHELINE;
static ck_ring_t queue_ring CK_CC_CACHELINE;
Expand Down Expand Up @@ -709,7 +711,23 @@ bool sb_more_events(int thread_id)

while (!ck_ring_dequeue_spmc(&queue_ring, queue_ring_buffer, &ptr) &&
!ck_pr_load_int(&queue_is_full))
usleep(500000.0 * sb_globals.threads / sb_globals.tx_rate);
{
pthread_mutex_lock(&queue_mutex);
pthread_cond_wait(&queue_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);

/* Re-check for global error and time limit after waiting */

if (sb_globals.error)
return false;

if (sb_globals.max_time_ns > 0 &&
SB_UNLIKELY(sb_timer_value(&sb_exec_timer) >= sb_globals.max_time_ns))
{
log_text(LOG_INFO, "Time limit exceeded, exiting...");
return false;
}
}

if (ck_pr_load_int(&queue_is_full))
{
Expand Down Expand Up @@ -835,13 +853,11 @@ static void *worker_thread(void *arg)

static inline double sb_rand_exp(double lambda)
{
return -1.0 / lambda * log(1 - sb_rand_uniform_double());
return -lambda * log(1 - sb_rand_uniform_double());
}

static void *eventgen_thread_proc(void *arg)
{
int i;

(void)arg; /* unused */

sb_tls_thread_id = SB_BACKGROUND_THREAD_ID;
Expand All @@ -851,7 +867,12 @@ static void *eventgen_thread_proc(void *arg)

ck_ring_init(&queue_ring, MAX_QUEUE_LEN);

i = 0;
if (pthread_mutex_init(&queue_mutex, NULL) ||
pthread_cond_init(&queue_cond, NULL))
{
sb_barrier_wait(&thread_start_barrier);
return NULL;
}

log_text(LOG_DEBUG, "Event generating thread started");

Expand All @@ -865,24 +886,33 @@ static void *eventgen_thread_proc(void *arg)
Get exponentially distributed time intervals in nanoseconds with Lambda =
tx_rate. Alternatively, we can use Lambda = tx_rate / 1e9
*/
double lambda = sb_globals.tx_rate / 1e9;
const double lambda = 1e9 / sb_globals.tx_rate;

uint64_t curr_ns = sb_timer_value(&sb_exec_timer);
uint64_t intr_ns = sb_rand_exp(lambda);
uint64_t next_ns = curr_ns + intr_ns;;
uint64_t next_ns = curr_ns + intr_ns;

for (;;)
for (int i = 0; ; i = (i+1) % MAX_QUEUE_LEN)
{
curr_ns = sb_timer_value(&sb_exec_timer);
intr_ns = sb_rand_exp(lambda);
next_ns += intr_ns;

if (sb_globals.max_time_ns > 0 &&
SB_UNLIKELY(curr_ns >= sb_globals.max_time_ns))
{
/* Wake all waiting threads */
pthread_cond_broadcast(&queue_cond);
return NULL;
}

if (next_ns > curr_ns)
sb_nanosleep(next_ns - curr_ns);

/* Enqueue a new event */
queue_array[i] = sb_timer_value(&sb_exec_timer);
if (ck_ring_enqueue_spmc(&queue_ring, queue_ring_buffer,
&queue_array[i++]) == false)
&queue_array[i]) == false)
{
ck_pr_store_int(&queue_is_full, 1);
log_text(LOG_FATAL,
Expand All @@ -891,8 +921,8 @@ static void *eventgen_thread_proc(void *arg)
return NULL;
}

if (i >= MAX_QUEUE_LEN - 1)
i = 0;
/* Wake up one waiting thread, if there are any */
pthread_cond_signal(&queue_cond);
}

return NULL;
Expand Down

0 comments on commit bb8c3b0

Please sign in to comment.