| //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// |
| // |
| // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
| // See https://llvm.org/LICENSE.txt for license information. |
| // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| // |
| //===----------------------------------------------------------------------===// |
| |
| #include "llvm/Support/Parallel.h" |
| #include "llvm/ADT/ScopeExit.h" |
| #include "llvm/Config/llvm-config.h" |
| #include "llvm/Support/ExponentialBackoff.h" |
| #include "llvm/Support/Jobserver.h" |
| #include "llvm/Support/ManagedStatic.h" |
| #include "llvm/Support/Threading.h" |
| |
| #include <atomic> |
| #include <future> |
| #include <memory> |
| #include <mutex> |
| #include <thread> |
| #include <vector> |
| |
| llvm::ThreadPoolStrategy llvm::parallel::strategy; |
| |
| namespace llvm { |
| namespace parallel { |
| #if LLVM_ENABLE_THREADS |
| |
| #ifdef _WIN32 |
| static thread_local unsigned threadIndex = UINT_MAX; |
| |
| unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } |
| #else |
| thread_local unsigned threadIndex = UINT_MAX; |
| #endif |
| |
| namespace detail { |
| |
| namespace { |
| |
| /// An abstract class that takes closures and runs them asynchronously. |
| class Executor { |
| public: |
| virtual ~Executor() = default; |
| virtual void add(std::function<void()> func) = 0; |
| virtual size_t getThreadCount() const = 0; |
| |
| static Executor *getDefaultExecutor(); |
| }; |
| |
| /// An implementation of an Executor that runs closures on a thread pool |
| /// in filo order. |
| class ThreadPoolExecutor : public Executor { |
| public: |
| explicit ThreadPoolExecutor(ThreadPoolStrategy S) { |
| if (S.UseJobserver) |
| TheJobserver = JobserverClient::getInstance(); |
| |
| ThreadCount = S.compute_thread_count(); |
| // Spawn all but one of the threads in another thread as spawning threads |
| // can take a while. |
| Threads.reserve(ThreadCount); |
| Threads.resize(1); |
| std::lock_guard<std::mutex> Lock(Mutex); |
| // Use operator[] before creating the thread to avoid data race in .size() |
| // in 'safe libc++' mode. |
| auto &Thread0 = Threads[0]; |
| Thread0 = std::thread([this, S] { |
| for (unsigned I = 1; I < ThreadCount; ++I) { |
| Threads.emplace_back([this, S, I] { work(S, I); }); |
| if (Stop) |
| break; |
| } |
| ThreadsCreated.set_value(); |
| work(S, 0); |
| }); |
| } |
| |
| // To make sure the thread pool executor can only be created with a parallel |
| // strategy. |
| ThreadPoolExecutor() = delete; |
| |
| void stop() { |
| { |
| std::lock_guard<std::mutex> Lock(Mutex); |
| if (Stop) |
| return; |
| Stop = true; |
| } |
| Cond.notify_all(); |
| ThreadsCreated.get_future().wait(); |
| } |
| |
| ~ThreadPoolExecutor() override { |
| stop(); |
| std::thread::id CurrentThreadId = std::this_thread::get_id(); |
| for (std::thread &T : Threads) |
| if (T.get_id() == CurrentThreadId) |
| T.detach(); |
| else |
| T.join(); |
| } |
| |
| struct Creator { |
| static void *call() { return new ThreadPoolExecutor(strategy); } |
| }; |
| struct Deleter { |
| static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } |
| }; |
| |
| void add(std::function<void()> F) override { |
| { |
| std::lock_guard<std::mutex> Lock(Mutex); |
| WorkStack.push_back(std::move(F)); |
| } |
| Cond.notify_one(); |
| } |
| |
| size_t getThreadCount() const override { return ThreadCount; } |
| |
| private: |
| void work(ThreadPoolStrategy S, unsigned ThreadID) { |
| threadIndex = ThreadID; |
| S.apply_thread_strategy(ThreadID); |
| // Note on jobserver deadlock avoidance: |
| // GNU Make grants each invoked process one implicit job slot. Our |
| // JobserverClient models this by returning an implicit JobSlot on the |
| // first successful tryAcquire() in a process. This guarantees forward |
| // progress without requiring a dedicated "always-on" thread here. |
| |
| static thread_local std::unique_ptr<ExponentialBackoff> Backoff; |
| |
| while (true) { |
| if (TheJobserver) { |
| // Jobserver-mode scheduling: |
| // - Acquire one job slot (with exponential backoff to avoid busy-wait). |
| // - While holding the slot, drain and run tasks from the local queue. |
| // - Release the slot when the queue is empty or when shutting down. |
| // Rationale: Holding a slot amortizes acquire/release overhead over |
| // multiple tasks and avoids requeue/yield churn, while still enforcing |
| // the jobserver’s global concurrency limit. With K available slots, |
| // up to K workers run tasks in parallel; within each worker tasks run |
| // sequentially until the local queue is empty. |
| ExponentialBackoff Backoff(std::chrono::hours(24)); |
| JobSlot Slot; |
| do { |
| if (Stop) |
| return; |
| Slot = TheJobserver->tryAcquire(); |
| if (Slot.isValid()) |
| break; |
| } while (Backoff.waitForNextAttempt()); |
| |
| auto SlotReleaser = llvm::make_scope_exit( |
| [&] { TheJobserver->release(std::move(Slot)); }); |
| |
| while (true) { |
| std::function<void()> Task; |
| { |
| std::unique_lock<std::mutex> Lock(Mutex); |
| Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); |
| if (Stop && WorkStack.empty()) |
| return; |
| if (WorkStack.empty()) |
| break; |
| Task = std::move(WorkStack.back()); |
| WorkStack.pop_back(); |
| } |
| Task(); |
| } |
| } else { |
| std::unique_lock<std::mutex> Lock(Mutex); |
| Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); |
| if (Stop) |
| break; |
| auto Task = std::move(WorkStack.back()); |
| WorkStack.pop_back(); |
| Lock.unlock(); |
| Task(); |
| } |
| } |
| } |
| |
| std::atomic<bool> Stop{false}; |
| std::vector<std::function<void()>> WorkStack; |
| std::mutex Mutex; |
| std::condition_variable Cond; |
| std::promise<void> ThreadsCreated; |
| std::vector<std::thread> Threads; |
| unsigned ThreadCount; |
| |
| JobserverClient *TheJobserver = nullptr; |
| }; |
| |
| // A global raw pointer to the executor. Lifetime is managed by the |
| // objects created within createExecutor(). |
| static Executor *TheExec = nullptr; |
| static std::once_flag Flag; |
| |
| // This function will be called exactly once to create the executor. |
| // It contains the necessary platform-specific logic. Since functions |
| // called by std::call_once cannot return value, we have to set the |
| // executor as a global variable. |
| void createExecutor() { |
| #ifdef _WIN32 |
| // The ManagedStatic enables the ThreadPoolExecutor to be stopped via |
| // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This |
| // stops the thread pool and waits for any worker thread creation to complete |
| // but does not wait for the threads to finish. The wait for worker thread |
| // creation to complete is important as it prevents intermittent crashes on |
| // Windows due to a race condition between thread creation and process exit. |
| // |
| // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to |
| // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor |
| // destructor ensures it has been stopped and waits for worker threads to |
| // finish. The wait is important as it prevents intermittent crashes on |
| // Windows when the process is doing a full exit. |
| // |
| // The Windows crashes appear to only occur with the MSVC static runtimes and |
| // are more frequent with the debug static runtime. |
| // |
| // This also prevents intermittent deadlocks on exit with the MinGW runtime. |
| |
| static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, |
| ThreadPoolExecutor::Deleter> |
| ManagedExec; |
| static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); |
| TheExec = Exec.get(); |
| #else |
| // ManagedStatic is not desired on other platforms. When `Exec` is destroyed |
| // by llvm_shutdown(), worker threads will clean up and invoke TLS |
| // destructors. This can lead to race conditions if other threads attempt to |
| // access TLS objects that have already been destroyed. |
| static ThreadPoolExecutor Exec(strategy); |
| TheExec = &Exec; |
| #endif |
| } |
| |
| Executor *Executor::getDefaultExecutor() { |
| // Use std::call_once to lazily and safely initialize the executor. |
| std::call_once(Flag, createExecutor); |
| return TheExec; |
| } |
| } // namespace |
| } // namespace detail |
| |
| size_t getThreadCount() { |
| return detail::Executor::getDefaultExecutor()->getThreadCount(); |
| } |
| #endif |
| |
| // Latch::sync() called by the dtor may cause one thread to block. If is a dead |
| // lock if all threads in the default executor are blocked. To prevent the dead |
| // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario |
| // of nested parallel_for_each(), only the outermost one runs parallelly. |
| TaskGroup::TaskGroup() |
| #if LLVM_ENABLE_THREADS |
| : Parallel((parallel::strategy.ThreadsRequested != 1) && |
| (threadIndex == UINT_MAX)) {} |
| #else |
| : Parallel(false) {} |
| #endif |
| TaskGroup::~TaskGroup() { |
| // We must ensure that all the workloads have finished before decrementing the |
| // instances count. |
| L.sync(); |
| } |
| |
| void TaskGroup::spawn(std::function<void()> F) { |
| #if LLVM_ENABLE_THREADS |
| if (Parallel) { |
| L.inc(); |
| detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] { |
| F(); |
| L.dec(); |
| }); |
| return; |
| } |
| #endif |
| F(); |
| } |
| |
| } // namespace parallel |
| } // namespace llvm |
| |
| void llvm::parallelFor(size_t Begin, size_t End, |
| llvm::function_ref<void(size_t)> Fn) { |
| #if LLVM_ENABLE_THREADS |
| if (parallel::strategy.ThreadsRequested != 1) { |
| auto NumItems = End - Begin; |
| // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling |
| // overhead on large inputs. |
| auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; |
| if (TaskSize == 0) |
| TaskSize = 1; |
| |
| parallel::TaskGroup TG; |
| for (; Begin + TaskSize < End; Begin += TaskSize) { |
| TG.spawn([=, &Fn] { |
| for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) |
| Fn(I); |
| }); |
| } |
| if (Begin != End) { |
| TG.spawn([=, &Fn] { |
| for (size_t I = Begin; I != End; ++I) |
| Fn(I); |
| }); |
| } |
| return; |
| } |
| #endif |
| |
| for (; Begin != End; ++Begin) |
| Fn(Begin); |
| } |