blob: f779ecfc7693dc1f97c133d255b6916f233ded59 [file]
//===-- A platform independent abstraction layer for cond vars --*- C++ -*-===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#ifndef LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H
#define LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H
#include "hdr/stdint_proxy.h" // uint32_t
#include "src/__support/CPP/limits.h"
#include "src/__support/CPP/mutex.h"
#include "src/__support/CPP/new.h"
#include "src/__support/macros/config.h"
#include "src/__support/threads/futex_utils.h" // Futex
#include "src/__support/threads/mutex.h" // Mutex
#include "src/__support/threads/raw_mutex.h" // RawMutex
#include "src/__support/threads/sleep.h"
#include "src/__support/time/abs_timeout.h"
#ifdef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
#include "src/__support/time/monotonicity.h"
#endif
namespace LIBC_NAMESPACE_DECL {
enum class CndVarResult {
Success,
MutexError,
Timeout,
};
class CndVar {
public:
using Timeout = internal::AbsTimeout;
private:
// A single-waiter multiple-notifier barrier used to keep
// track of cancellation threads. We use this barrier to
// ensure in-queue threads that have posted their cancellation
// request have finished dequeue themselves.
class CancellationBarrier {
LIBC_INLINE_VAR static constexpr size_t CANCEL_STEP = 2;
LIBC_INLINE_VAR static constexpr size_t SLEEPING_BIT = 1;
// LSB indicates whether the waiter is in sleeping state.
Futex futex;
public:
LIBC_INLINE CancellationBarrier() : futex(0) {}
// Add one more notification request.
LIBC_INLINE void add_one() {
futex.fetch_add(CANCEL_STEP, cpp::MemoryOrder::RELAXED);
}
// Send notification to one waiter.
LIBC_INLINE void notify() {
FutexWordType res = futex.fetch_sub(CANCEL_STEP);
// Only need to goto syscall if waiter is sleep and we are the last one
if (res <= (CANCEL_STEP | SLEEPING_BIT) && (res & SLEEPING_BIT) != 0)
futex.notify_one();
}
LIBC_INLINE void wait() {
size_t spin = 0;
while (auto remaining = futex.load(cpp::MemoryOrder::RELAXED)) {
// Set LSB to 1 to indicate that the waiter is entering sleeping
// state.
FutexWordType new_val = remaining | SLEEPING_BIT;
if (spin > LIBC_COPT_RAW_MUTEX_DEFAULT_SPIN_COUNT &&
futex.compare_exchange_strong(remaining, new_val)) {
futex.wait(new_val, /*timeout=*/cpp::nullopt, /*is_pshared=*/false);
futex.fetch_sub(1);
spin = 0;
}
sleep_briefly();
spin++;
}
}
};
enum WaiterState : uint8_t {
Waiting = 0,
Signalled = 1,
Cancelled = 2,
Requeued = 3,
};
template <typename T> struct QueueNode {
T *prev;
T *next;
LIBC_INLINE T *self() { return static_cast<T *>(this); }
// We use cyclic dummy node to avoid handing corner cases.
LIBC_INLINE void ensure_queue_initialization() {
if (LIBC_UNLIKELY(prev == nullptr))
prev = next = self();
}
// Assume `this` the dummy node of queue. Push back `waiter` to the queue.
LIBC_INLINE void push_back(T *waiter) {
ensure_queue_initialization();
waiter->next = self();
waiter->prev = prev;
waiter->next->prev = waiter;
waiter->prev->next = waiter;
}
// Remove `waiter` from the queue.
LIBC_INLINE static void remove(T *waiter) {
waiter->next->prev = waiter->prev;
waiter->prev->next = waiter->next;
waiter->prev = waiter->next = waiter;
}
LIBC_INLINE bool is_empty() {
ensure_queue_initialization();
return self() == next;
}
// Assume `this` is the dummy node of the queue. Separate nodes before
// cursor into a separate queue.
LIBC_INLINE void separate(T *cursor) {
T *removed_head = this->next;
T *removed_tail = cursor->prev;
this->next = cursor;
cursor->prev = self();
removed_tail->next = removed_head;
removed_head->prev = removed_tail;
}
};
// This node will be on the per-thread stack.
struct CndWaiter : QueueNode<CndWaiter> {
cpp::Atomic<CancellationBarrier *> cancellation_barrier;
RawMutex barrier;
cpp::Atomic<uint8_t> state;
LIBC_INLINE CndWaiter()
: QueueNode{}, cancellation_barrier(nullptr), barrier{},
state{Waiting} {
// this lock should always success as no contention is possible
[[maybe_unused]] bool locked = barrier.try_lock();
LIBC_ASSERT(locked);
}
LIBC_INLINE void confirm_cancellation() {
if (CancellationBarrier *sender = cancellation_barrier.load())
sender->notify();
}
};
// Group structures with similar alignment together to
// save trailing padding bytes, such that is_shared
// can be introduced without extra space.
union {
QueueNode<CndWaiter> waiter_queue;
cpp::Atomic<size_t> shared_waiters;
};
union {
RawMutex queue_lock;
Futex shared_futex;
};
const bool is_shared;
const bool is_realtime;
LIBC_INLINE void notify(bool is_broadcast) {
if (LIBC_UNLIKELY(is_shared)) {
if (shared_waiters.load() == 0)
return;
// increase the sequence number
shared_futex.fetch_add(1);
if (is_broadcast)
shared_futex.notify_all(/*is_shared=*/true);
else
shared_futex.notify_one(/*is_shared=*/true);
return;
}
size_t limit =
is_broadcast ? cpp::numeric_limits<size_t>::max() : size_t{1};
CancellationBarrier cancellation_barrier{};
CndWaiter *head = nullptr;
CndWaiter *cursor = nullptr;
// Go through the queue, try send signal to waiters.
// 1. if signal is sent, we reduce the number of pending signals
// 2. if waiter cancelled before signal is sent, we add it
// to cancellation barrier and continue
// Notice that cancelled sender will not continue before
// we release the queue lock, because they also need to
// acquire the lock and dequeue themselves.
{
cpp::lock_guard lock(queue_lock);
if (waiter_queue.is_empty())
return;
for (cursor = waiter_queue.next; cursor != waiter_queue.self();
cursor = cursor->next) {
if (limit == 0)
break;
uint8_t expected = Waiting;
if (!cursor->state.compare_exchange_strong(expected, Signalled)) {
cancellation_barrier.add_one();
cursor->cancellation_barrier.store(&cancellation_barrier);
continue;
}
if (!head)
head = cursor;
limit--;
}
// remove everything before cursor
waiter_queue.separate(cursor);
}
// We want to make sure the propagation queue contain only threads
// that have consumed the signal. So we wait until all cancelled
// finishing their dequeue operation.
cancellation_barrier.wait();
// Start propagate notification to the first waiter in the queue.
// Waiters in the queue will acquire the lock in strict FIFO order:
// Only when the predecessor has acquired the lock can the successor
// be waken up to compete for the mutex.
if (head)
head->barrier.unlock();
}
public:
LIBC_INLINE constexpr CndVar(bool is_shared, bool is_realtime = false)
: waiter_queue{}, queue_lock{}, is_shared(is_shared),
is_realtime(is_realtime) {
if (is_shared) {
new (&shared_waiters) cpp::Atomic<size_t>(0);
new (&shared_futex) Futex(0);
}
}
LIBC_INLINE void reset() {
if (is_shared) {
shared_waiters.store(0);
shared_futex.store(0);
return;
}
queue_lock.reset();
waiter_queue.prev = nullptr;
waiter_queue.next = nullptr;
}
// The is_realtime field is just a field we spared for pthread_cond_t
// It is not used in wait directly.
LIBC_INLINE bool default_clock_is_realtime() const { return is_realtime; }
// TODO: register callback for pthread cancellation
LIBC_INLINE CndVarResult wait(Mutex *mutex,
cpp::optional<Timeout> timeout = cpp::nullopt) {
#ifdef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
if (timeout)
ensure_monotonicity(*timeout);
#endif
if (LIBC_UNLIKELY(is_shared)) {
shared_waiters.fetch_add(1);
FutexWordType old_val = shared_futex.load();
mutex->unlock();
ErrorOr<int> result =
shared_futex.wait(old_val, timeout, /*is_pshared=*/true);
shared_waiters.fetch_sub(1);
MutexError mutex_result = mutex->lock();
if (!result.has_value() && result.error() == ETIMEDOUT)
return CndVarResult::Timeout;
return mutex_result == MutexError::NONE ? CndVarResult::Success
: CndVarResult::MutexError;
}
CndWaiter waiter{};
// Register the waiter to the queue.
{
cpp::lock_guard lock(queue_lock);
waiter_queue.push_back(&waiter);
}
// Unlock the mutex and wait for the signal.
mutex->unlock();
// Notice that lock is already initialized as LOCKED. We abuse the LOCKED
// state to indicate that the waiter is pending.
bool locked = waiter.barrier.lock(timeout, /*is_shared=*/false);
// if we wake up and find that we are still waiting, this means
// timeout has been reached.
uint8_t old_state = Waiting;
if (waiter.state.compare_exchange_strong(old_state, Cancelled,
cpp::MemoryOrder::ACQ_REL)) {
// we haven't consumed the signal before timeout reaches.
{
cpp::lock_guard lock(queue_lock);
CndWaiter::remove(&waiter);
}
waiter.confirm_cancellation();
} else if (!locked) {
// Whenever a signal is already consumed, we compete for the mutex
// in the FIFO order of the queue. We only relock if we previously
// wake up due to timeout. Otherwise, it means that our turn has
// come, so we don't need to relock.
waiter.barrier.lock();
}
// Reacquire the mutex lock. If error ever happens, we still wake up
// our successor so that remaining waiters can continue. However, we treat
// outselves as not owning the mutex and we don't touch the contention
// bit.
MutexError mutex_result = mutex->lock();
// If we are requeued, we need to establish contention after lock, otherwise
// requeued thread may clear the contention bit even though
// there are still waiters behind it.
if (mutex_result == MutexError::NONE &&
waiter.state.load(cpp::MemoryOrder::RELAXED) == Requeued)
mutex->get_raw_futex().store(RawMutex::IN_CONTENTION);
// If there is other in the queue after us, we need to wake the next waiter.
// If we cancelled, we should naturally have waiter.next == &waiter
if (waiter.next != &waiter) {
auto *next_waiter = waiter.next;
CndWaiter::remove(&waiter);
auto &next_barrier_futex = next_waiter->barrier.get_raw_futex();
auto &mutex_futex = mutex->get_raw_futex();
// the following is basically an inlined version of mutex::unlock
// but with requeue instead of wake if it is possible.
FutexWordType prev = next_barrier_futex.exchange(
RawMutex::UNLOCKED, cpp::MemoryOrder::RELEASE);
// If next waiter in queue sleeps, it will establish contention its own
// barrier
if (prev == RawMutex::IN_CONTENTION) {
if (mutex_result == MutexError::NONE && mutex->can_be_requeued()) {
ErrorOr<int> res = next_barrier_futex.requeue_to(
mutex_futex, cpp::nullopt, /*wake_limit=*/0,
/*requeue_limit=*/1,
/*is_shared=*/false);
if (!res.has_value()) // cannot requeue on this system
next_waiter->barrier.wake(/*is_shared=*/false);
else if (res.value() > 0) {
next_waiter->state.store(Requeued, cpp::MemoryOrder::RELAXED);
mutex->get_raw_futex().store(RawMutex::IN_CONTENTION);
}
} else { // cannot requeue under special lock mode
next_waiter->barrier.wake(/*is_shared=*/false);
}
}
}
if (mutex_result != MutexError::NONE)
return CndVarResult::MutexError;
return old_state == Waiting ? CndVarResult::Timeout : CndVarResult::Success;
}
LIBC_INLINE void notify_one() { notify(/*is_broadcast=*/false); }
LIBC_INLINE void broadcast() { notify(/*is_broadcast=*/true); }
};
} // namespace LIBC_NAMESPACE_DECL
#endif // LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H