Skip to content

Commit

Permalink
Merge pull request #8284 from google/benvanik-timed-notification
Browse files Browse the repository at this point in the history
Adding support for timeouts on iree_notification_t + fixes.
  • Loading branch information
benvanik authored Feb 10, 2022
2 parents 3f277c9 + 25ceb0e commit 9fbfe02
Show file tree
Hide file tree
Showing 20 changed files with 256 additions and 113 deletions.
107 changes: 68 additions & 39 deletions iree/base/internal/synchronization.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
// |timeout_ms| can be either IREE_INFINITE_TIMEOUT_MS to wait forever or a
// relative number of milliseconds to wait prior to returning early with
// IREE_STATUS_DEADLINE_EXCEEDED.
static inline iree_status_t iree_futex_wait(void* address,
uint32_t expected_value,
uint32_t timeout_ms);
static inline iree_status_code_t iree_futex_wait(void* address,
uint32_t expected_value,
uint32_t timeout_ms);

// Wakes at most |count| threads waiting for the |address| to change.
// Use IREE_ALL_WAITERS to wake all waiters. Which waiters are woken is
Expand All @@ -72,17 +72,17 @@ static inline void iree_futex_wake(void* address, int32_t count);

#if defined(IREE_PLATFORM_EMSCRIPTEN)

static inline iree_status_t iree_futex_wait(void* address,
uint32_t expected_value,
uint32_t timeout_ms) {
static inline iree_status_code_t iree_futex_wait(void* address,
uint32_t expected_value,
uint32_t timeout_ms) {
int rc = emscripten_futex_wait(address, expected_value, (double)timeout_ms);
switch (rc) {
default:
return iree_ok_status();
return IREE_STATUS_OK;
case -ETIMEDOUT:
return iree_status_from_code(IREE_STATUS_DEADLINE_EXCEEDED);
return IREE_STATUS_DEADLINE_EXCEEDED;
case -EWOULDBLOCK:
return iree_status_from_code(IREE_STATUS_UNAVAILABLE);
return IREE_STATUS_UNAVAILABLE;
}
}

Expand All @@ -94,17 +94,17 @@ static inline void iree_futex_wake(void* address, int32_t count) {

#pragma comment(lib, "Synchronization.lib")

static inline iree_status_t iree_futex_wait(void* address,
uint32_t expected_value,
uint32_t timeout_ms) {
static inline iree_status_code_t iree_futex_wait(void* address,
uint32_t expected_value,
uint32_t timeout_ms) {
if (IREE_LIKELY(WaitOnAddress(address, &expected_value,
sizeof(expected_value), timeout_ms) == TRUE)) {
return iree_ok_status();
return IREE_STATUS_OK;
}
if (GetLastError() == ERROR_TIMEOUT) {
return iree_status_from_code(IREE_STATUS_DEADLINE_EXCEEDED);
return IREE_STATUS_DEADLINE_EXCEEDED;
}
return iree_status_from_code(IREE_STATUS_UNAVAILABLE);
return IREE_STATUS_UNAVAILABLE;
}

static inline void iree_futex_wake(void* address, int32_t count) {
Expand All @@ -119,21 +119,22 @@ static inline void iree_futex_wake(void* address, int32_t count) {

#elif defined(IREE_PLATFORM_ANDROID) || defined(IREE_PLATFORM_LINUX)

static inline iree_status_t iree_futex_wait(void* address,
uint32_t expected_value,
uint32_t timeout_ms) {
struct timespec timeout;
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
static inline iree_status_code_t iree_futex_wait(void* address,
uint32_t expected_value,
uint32_t timeout_ms) {
struct timespec timeout = {
.tv_sec = timeout_ms / 1000,
.tv_nsec = (timeout_ms % 1000) * 1000000,
};
int rc = syscall(
SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, expected_value,
timeout_ms == IREE_INFINITE_TIMEOUT_MS ? NULL : &timeout, NULL, 0);
if (IREE_LIKELY(rc == 0)) {
return iree_ok_status();
} else if (rc == ETIMEDOUT) {
return iree_status_from_code(IREE_STATUS_DEADLINE_EXCEEDED);
if (IREE_LIKELY(rc == 0) || errno == EAGAIN) {
return IREE_STATUS_OK;
} else if (errno == ETIMEDOUT) {
return IREE_STATUS_DEADLINE_EXCEEDED;
}
return iree_status_from_code(IREE_STATUS_UNAVAILABLE);
return IREE_STATUS_UNAVAILABLE;
}

static inline void iree_futex_wake(void* address, int32_t count) {
Expand Down Expand Up @@ -451,8 +452,7 @@ void iree_slim_mutex_lock(iree_slim_mutex_t* mutex)
while (iree_slim_mutex_is_locked(value)) {
// NOTE: we don't care about wait failure here as we are going to loop
// and check again anyway.
iree_status_ignore(
iree_futex_wait(&mutex->value, value, IREE_INFINITE_TIMEOUT_MS));
iree_futex_wait(&mutex->value, value, IREE_INFINITE_TIMEOUT_MS);
value = iree_atomic_load_int32(&mutex->value, iree_memory_order_relaxed);
}
}
Expand Down Expand Up @@ -606,24 +606,38 @@ iree_wait_token_t iree_notification_prepare_wait(
return (iree_wait_token_t)(previous_value >> IREE_NOTIFICATION_EPOCH_SHIFT);
}

void iree_notification_commit_wait(iree_notification_t* notification,
iree_wait_token_t wait_token) {
bool iree_notification_commit_wait(iree_notification_t* notification,
iree_wait_token_t wait_token,
iree_time_t deadline_ns) {
bool result = true;

// Spin until notified and the epoch increments from what we captured during
// iree_notification_prepare_wait.
while ((iree_atomic_load_int64(&notification->value,
iree_memory_order_acquire) >>
IREE_NOTIFICATION_EPOCH_SHIFT) == wait_token) {
iree_status_code_t status_code = IREE_STATUS_OK;
#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
// TODO(benvanik): platform sleep? this spins.
#elif defined(IREE_PLATFORM_HAS_FUTEX)
iree_status_ignore(
iree_futex_wait(iree_notification_epoch_address(notification),
wait_token, IREE_INFINITE_TIMEOUT_MS));
uint32_t timeout_ms = iree_absolute_deadline_to_timeout_ms(deadline_ns);
status_code = iree_futex_wait(iree_notification_epoch_address(notification),
wait_token, timeout_ms);
#else
struct timespec abs_ts = {
.tv_sec = (time_t)(deadline_ns / 1000000000ull),
.tv_nsec = (long)(deadline_ns % 1000000000ull),
};
pthread_mutex_lock(&notification->mutex);
pthread_cond_wait(&notification->cond, &notification->mutex);
int ret = pthread_cond_timedwait(&notification->cond, &notification->mutex,
&abs_ts);
pthread_mutex_unlock(&notification->mutex);
status_code = ret == 0 ? IREE_STATUS_OK : IREE_STATUS_DEADLINE_EXCEEDED;
#endif // IREE_PLATFORM_HAS_FUTEX
if (status_code != IREE_STATUS_OK) {
result = false;
break;
}
}

// TODO(benvanik): benchmark under real workloads.
Expand All @@ -633,6 +647,8 @@ void iree_notification_commit_wait(iree_notification_t* notification,
&notification->value, IREE_NOTIFICATION_WAITER_DEC,
iree_memory_order_seq_cst);
SYNC_ASSERT((previous_value & IREE_NOTIFICATION_WAITER_MASK) != 0);

return result;
}

void iree_notification_cancel_wait(iree_notification_t* notification) {
Expand All @@ -645,22 +661,35 @@ void iree_notification_cancel_wait(iree_notification_t* notification) {
SYNC_ASSERT((previous_value & IREE_NOTIFICATION_WAITER_MASK) != 0);
}

void iree_notification_await(iree_notification_t* notification,
bool iree_notification_await(iree_notification_t* notification,
iree_condition_fn_t condition_fn,
void* condition_arg) {
void* condition_arg, iree_timeout_t timeout) {
if (IREE_LIKELY(condition_fn(condition_arg))) {
// Fast-path with condition already met.
return;
return true;
}

// If a (silly) query then bail immediately after our first condition check.
// Otherwise we may have a real deadline and want it in absolute form so that
// we can easily handle spurious wakes.
if (iree_timeout_is_immediate(timeout)) return false;
const iree_time_t deadline_ns = iree_timeout_as_deadline_ns(timeout);

// Slow-path: try-wait until the condition is met.
while (true) {
iree_wait_token_t wait_token = iree_notification_prepare_wait(notification);
if (condition_fn(condition_arg)) {
// Condition is now met; no need to wait on the futex.
iree_notification_cancel_wait(notification);
return;
return true;
} else {
iree_notification_commit_wait(notification, wait_token);
if (!iree_notification_commit_wait(notification, wait_token,
deadline_ns)) {
// Wait hit the deadline before we hit the condition.
return false;
}
}
}

return true;
}
15 changes: 10 additions & 5 deletions iree/base/internal/synchronization.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,18 @@ iree_wait_token_t iree_notification_prepare_wait(
iree_notification_t* notification);

// Commits a pending wait operation when the caller has ensured it must wait.
// Waiting will continue until a notification has been posted.
// Waiting will continue until a notification has been posted or |deadline_ns|
// is reached. Returns false if the deadline is reached before a notification is
// posted.
//
// Acts as (at least) a memory_order_acquire barrier:
// A load operation with this memory order performs the acquire operation on
// the affected memory location: no reads or writes in the current thread can
// be reordered before this load. All writes in other threads that release the
// same atomic variable are visible in the current thread.
void iree_notification_commit_wait(iree_notification_t* notification,
iree_wait_token_t wait_token);
bool iree_notification_commit_wait(iree_notification_t* notification,
iree_wait_token_t wait_token,
iree_time_t deadline_ns);

// Cancels a pending wait operation without blocking.
//
Expand All @@ -355,6 +358,8 @@ typedef bool (*iree_condition_fn_t)(void* arg);

// Blocks and waits until |condition_fn| returns true. Other threads must modify
// state checked by the |condition_fn| and post the notification.
// Returns true if the condition is true before |timeout| is reached. If the
// timeout is infinite then the return will always be true.
//
// Example:
// thread 1:
Expand All @@ -367,9 +372,9 @@ typedef bool (*iree_condition_fn_t)(void* arg);
// thread 2:
// iree_atomic_int32_store(flag, 1, iree_memory_order_release);
// iree_notification_post(&notification, IREE_ALL_WAITERS);
void iree_notification_await(iree_notification_t* notification,
bool iree_notification_await(iree_notification_t* notification,
iree_condition_fn_t condition_fn,
void* condition_arg);
void* condition_arg, iree_timeout_t timeout);

#ifdef __cplusplus
} // extern "C"
Expand Down
40 changes: 40 additions & 0 deletions iree/base/internal/synchronization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,44 @@ TEST(SlimMutexTest, ExclusiveAccessTryLock) {

// Tested implicitly in threading_test.cc.

TEST(NotificationTest, TimeoutImmediate) {
iree_notification_t notification;
iree_notification_initialize(&notification);

iree_time_t start_ns = iree_time_now();

EXPECT_FALSE(iree_notification_await(
&notification,
+[](void* entry_arg) -> bool {
return false; // condition is never true
},
NULL, iree_immediate_timeout()));

iree_duration_t delta_ns = iree_time_now() - start_ns;
iree_duration_t delta_ms = delta_ns / 1000000;
EXPECT_LT(delta_ms, 50); // slop

iree_notification_deinitialize(&notification);
}

TEST(NotificationTest, Timeout) {
iree_notification_t notification;
iree_notification_initialize(&notification);

iree_time_t start_ns = iree_time_now();

EXPECT_FALSE(iree_notification_await(
&notification,
+[](void* entry_arg) -> bool {
return false; // condition is never true
},
NULL, iree_make_timeout(100 * 1000000)));

iree_duration_t delta_ns = iree_time_now() - start_ns;
iree_duration_t delta_ms = delta_ns / 1000000;
EXPECT_GE(delta_ms, 50); // slop

iree_notification_deinitialize(&notification);
}

} // namespace
3 changes: 2 additions & 1 deletion iree/base/internal/threading_pthreads.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ static void* iree_thread_start_routine(void* param) {
while (iree_atomic_load_int32(&thread->suspend_count,
iree_memory_order_seq_cst) > 0) {
iree_notification_await(&thread->suspend_barrier,
iree_thread_resumed_predicate, thread);
iree_thread_resumed_predicate, thread,
iree_infinite_timeout());
}

// "Consume" the entry info so that we don't see it again (as we don't own
Expand Down
6 changes: 3 additions & 3 deletions iree/base/internal/threading_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ TEST(ThreadTest, Lifetime) {
return iree_atomic_load_int32(&entry_data->value,
iree_memory_order_relaxed) == (123 + 1);
},
&entry_data);
&entry_data, iree_infinite_timeout());
iree_notification_deinitialize(&entry_data.barrier);
}

Expand Down Expand Up @@ -110,7 +110,7 @@ TEST(ThreadTest, CreateSuspended) {
return iree_atomic_load_int32(&entry_data->value,
iree_memory_order_relaxed) == (123 + 1);
},
&entry_data);
&entry_data, iree_infinite_timeout());
iree_notification_deinitialize(&entry_data.barrier);
iree_thread_release(thread);
}
Expand Down Expand Up @@ -161,7 +161,7 @@ TEST(ThreadTest, PriorityOverride) {
return iree_atomic_load_int32(&entry_data->value,
iree_memory_order_relaxed) == 1;
},
&entry_data);
&entry_data, iree_infinite_timeout());
iree_notification_deinitialize(&entry_data.barrier);

// Pop overrides (in opposite order intentionally).
Expand Down
10 changes: 5 additions & 5 deletions iree/base/time.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ static bool iree_wait_until_impl(iree_time_t deadline_ns) {
// from the API.
static bool iree_wait_until_impl(iree_time_t deadline_ns) {
struct timespec ts = {
.tv_sec = 0,
.tv_nsec = deadline_ns,
.tv_sec = (time_t)(deadline_ns / 1000000000ull),
.tv_nsec = (long)(deadline_ns % 1000000000ull),
};
int ret = clock_nanosleep(CLOCK_REALTIME, TIMER_ABSTIME, &ts, NULL);
return ret == 0;
Expand All @@ -143,9 +143,9 @@ static bool iree_wait_until_impl(iree_time_t deadline_ns) {
iree_time_t now_ns = iree_time_now();
while (now_ns < deadline_ns) {
iree_time_t delta_ns = deadline_ns - now_ns;
struct timespec ts = {
.tv_sec = 0,
.tv_nsec = delta_ns,
struct timespec abs_ts = {
.tv_sec = (time_t)(delta_ns / 1000000000ull),
.tv_nsec = (long)(delta_ns % 1000000000ull),
};
int ret = nanosleep(&ts, NULL);
if (ret != 0) return false;
Expand Down
11 changes: 3 additions & 8 deletions iree/hal/local/sync_semaphore.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ static iree_status_t iree_hal_sync_semaphore_wait(
iree_notification_await(
&shared_state->notification,
(iree_condition_fn_t)iree_hal_sync_semaphore_is_signaled,
(void*)&notify_state);
(void*)&notify_state, timeout);

iree_status_t status = iree_ok_status();
iree_slim_mutex_lock(&semaphore->mutex);
Expand Down Expand Up @@ -384,18 +384,13 @@ iree_status_t iree_hal_sync_semaphore_multi_wait(
return status;
}

// TODO(#4680): we should be checking for DEADLINE_EXCEEDED here. This is
// easy when it's iree_timeout_is_infinite (we can just use the notification
// as below) but if it's an actual deadline we'll need to probably switch to
// iree_wait_handle_t.

// Perform wait on the global notification. Will wait forever.
// Perform wait on the global notification.
iree_notification_await(
&shared_state->notification,
wait_mode == IREE_HAL_WAIT_MODE_ALL
? (iree_condition_fn_t)iree_hal_sync_semaphore_all_signaled
: (iree_condition_fn_t)iree_hal_sync_semaphore_any_signaled,
(void*)semaphore_list);
(void*)semaphore_list, iree_infinite_timeout());

// We may have been successful - or may have a partial failure.
iree_status_t status =
Expand Down
Loading

0 comments on commit 9fbfe02

Please sign in to comment.