| //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========// |
| // |
| // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
| // See https://llvm.org/LICENSE.txt for license information. |
| // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| // |
| //===----------------------------------------------------------------------===// |
| |
| #include "llvm/Support/ThreadPool.h" |
| |
| #include "llvm/ADT/STLExtras.h" |
| #include "llvm/ADT/SetVector.h" |
| #include "llvm/ADT/SmallVector.h" |
| #include "llvm/Support/CommandLine.h" |
| #include "llvm/Support/Program.h" |
| #include "llvm/Support/TargetSelect.h" |
| #include "llvm/Support/Threading.h" |
| #include "llvm/TargetParser/Host.h" |
| #include "llvm/TargetParser/Triple.h" |
| |
| #ifdef _WIN32 |
| #include "llvm/Support/Windows/WindowsSupport.h" |
| #endif |
| |
| #include <chrono> |
| #include <thread> |
| |
| #include "gtest/gtest.h" |
| |
| using namespace llvm; |
| |
| // Fixture for the unittests, allowing to *temporarily* disable the unittests |
| // on a particular platform |
| class ThreadPoolTest : public testing::Test { |
| Triple Host; |
| SmallVector<Triple::ArchType, 4> UnsupportedArchs; |
| SmallVector<Triple::OSType, 4> UnsupportedOSs; |
| SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments; |
| |
| protected: |
| // This is intended for platform as a temporary "XFAIL" |
| bool isUnsupportedOSOrEnvironment() { |
| Triple Host(Triple::normalize(sys::getProcessTriple())); |
| |
| if (find(UnsupportedEnvironments, Host.getEnvironment()) != |
| UnsupportedEnvironments.end()) |
| return true; |
| |
| if (is_contained(UnsupportedOSs, Host.getOS())) |
| return true; |
| |
| if (is_contained(UnsupportedArchs, Host.getArch())) |
| return true; |
| |
| return false; |
| } |
| |
| ThreadPoolTest() { |
| // Add unsupported configuration here, example: |
| // UnsupportedArchs.push_back(Triple::x86_64); |
| |
| // See https://llvm.org/bugs/show_bug.cgi?id=25829 |
| UnsupportedArchs.push_back(Triple::ppc64le); |
| UnsupportedArchs.push_back(Triple::ppc64); |
| } |
| |
| /// Make sure this thread not progress faster than the main thread. |
| void waitForMainThread() { waitForPhase(1); } |
| |
| /// Set the readiness of the main thread. |
| void setMainThreadReady() { setPhase(1); } |
| |
| /// Wait until given phase is set using setPhase(); first "main" phase is 1. |
| /// See also PhaseResetHelper below. |
| void waitForPhase(int Phase) { |
| std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); |
| CurrentPhaseCondition.wait( |
| LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; }); |
| } |
| /// If a thread waits on another phase, the test could bail out on a failed |
| /// assertion and ThreadPool destructor would wait() on all threads, which |
| /// would deadlock on the task waiting. Create this helper to automatically |
| /// reset the phase and unblock such threads. |
| struct PhaseResetHelper { |
| PhaseResetHelper(ThreadPoolTest *test) : test(test) {} |
| ~PhaseResetHelper() { test->setPhase(-1); } |
| ThreadPoolTest *test; |
| }; |
| |
| /// Advance to the given phase. |
| void setPhase(int Phase) { |
| { |
| std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); |
| assert(Phase == CurrentPhase + 1 || Phase < 0); |
| CurrentPhase = Phase; |
| } |
| CurrentPhaseCondition.notify_all(); |
| } |
| |
| void SetUp() override { CurrentPhase = 0; } |
| |
| std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S); |
| |
| std::condition_variable CurrentPhaseCondition; |
| std::mutex CurrentPhaseMutex; |
| int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom |
| }; |
| |
| #define CHECK_UNSUPPORTED() \ |
| do { \ |
| if (isUnsupportedOSOrEnvironment()) \ |
| GTEST_SKIP(); \ |
| } while (0); |
| |
| TEST_F(ThreadPoolTest, AsyncBarrier) { |
| CHECK_UNSUPPORTED(); |
| // test that async & barrier work together properly. |
| |
| std::atomic_int checked_in{0}; |
| |
| ThreadPool Pool; |
| for (size_t i = 0; i < 5; ++i) { |
| Pool.async([this, &checked_in] { |
| waitForMainThread(); |
| ++checked_in; |
| }); |
| } |
| ASSERT_EQ(0, checked_in); |
| setMainThreadReady(); |
| Pool.wait(); |
| ASSERT_EQ(5, checked_in); |
| } |
| |
| static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; } |
| |
| TEST_F(ThreadPoolTest, AsyncBarrierArgs) { |
| CHECK_UNSUPPORTED(); |
| // Test that async works with a function requiring multiple parameters. |
| std::atomic_int checked_in{0}; |
| |
| ThreadPool Pool; |
| for (size_t i = 0; i < 5; ++i) { |
| Pool.async(TestFunc, std::ref(checked_in), i); |
| } |
| Pool.wait(); |
| ASSERT_EQ(10, checked_in); |
| } |
| |
| TEST_F(ThreadPoolTest, Async) { |
| CHECK_UNSUPPORTED(); |
| ThreadPool Pool; |
| std::atomic_int i{0}; |
| Pool.async([this, &i] { |
| waitForMainThread(); |
| ++i; |
| }); |
| Pool.async([&i] { ++i; }); |
| ASSERT_NE(2, i.load()); |
| setMainThreadReady(); |
| Pool.wait(); |
| ASSERT_EQ(2, i.load()); |
| } |
| |
| TEST_F(ThreadPoolTest, GetFuture) { |
| CHECK_UNSUPPORTED(); |
| ThreadPool Pool(hardware_concurrency(2)); |
| std::atomic_int i{0}; |
| Pool.async([this, &i] { |
| waitForMainThread(); |
| ++i; |
| }); |
| // Force the future using get() |
| Pool.async([&i] { ++i; }).get(); |
| ASSERT_NE(2, i.load()); |
| setMainThreadReady(); |
| Pool.wait(); |
| ASSERT_EQ(2, i.load()); |
| } |
| |
| TEST_F(ThreadPoolTest, GetFutureWithResult) { |
| CHECK_UNSUPPORTED(); |
| ThreadPool Pool(hardware_concurrency(2)); |
| auto F1 = Pool.async([] { return 1; }); |
| auto F2 = Pool.async([] { return 2; }); |
| |
| setMainThreadReady(); |
| Pool.wait(); |
| ASSERT_EQ(1, F1.get()); |
| ASSERT_EQ(2, F2.get()); |
| } |
| |
| TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) { |
| CHECK_UNSUPPORTED(); |
| ThreadPool Pool(hardware_concurrency(2)); |
| auto Fn = [](int x) { return x; }; |
| auto F1 = Pool.async(Fn, 1); |
| auto F2 = Pool.async(Fn, 2); |
| |
| setMainThreadReady(); |
| Pool.wait(); |
| ASSERT_EQ(1, F1.get()); |
| ASSERT_EQ(2, F2.get()); |
| } |
| |
| TEST_F(ThreadPoolTest, PoolDestruction) { |
| CHECK_UNSUPPORTED(); |
| // Test that we are waiting on destruction |
| std::atomic_int checked_in{0}; |
| { |
| ThreadPool Pool; |
| for (size_t i = 0; i < 5; ++i) { |
| Pool.async([this, &checked_in] { |
| waitForMainThread(); |
| ++checked_in; |
| }); |
| } |
| ASSERT_EQ(0, checked_in); |
| setMainThreadReady(); |
| } |
| ASSERT_EQ(5, checked_in); |
| } |
| |
| // Check running tasks in different groups. |
| TEST_F(ThreadPoolTest, Groups) { |
| CHECK_UNSUPPORTED(); |
| // Need at least two threads, as the task in group2 |
| // might block a thread until all tasks in group1 finish. |
| ThreadPoolStrategy S = hardware_concurrency(2); |
| if (S.compute_thread_count() < 2) |
| GTEST_SKIP(); |
| ThreadPool Pool(S); |
| PhaseResetHelper Helper(this); |
| ThreadPoolTaskGroup Group1(Pool); |
| ThreadPoolTaskGroup Group2(Pool); |
| |
| // Check that waiting for an empty group is a no-op. |
| Group1.wait(); |
| |
| std::atomic_int checked_in1{0}; |
| std::atomic_int checked_in2{0}; |
| |
| for (size_t i = 0; i < 5; ++i) { |
| Group1.async([this, &checked_in1] { |
| waitForMainThread(); |
| ++checked_in1; |
| }); |
| } |
| Group2.async([this, &checked_in2] { |
| waitForPhase(2); |
| ++checked_in2; |
| }); |
| ASSERT_EQ(0, checked_in1); |
| ASSERT_EQ(0, checked_in2); |
| // Start first group and wait for it. |
| setMainThreadReady(); |
| Group1.wait(); |
| ASSERT_EQ(5, checked_in1); |
| // Second group has not yet finished, start it and wait for it. |
| ASSERT_EQ(0, checked_in2); |
| setPhase(2); |
| Group2.wait(); |
| ASSERT_EQ(5, checked_in1); |
| ASSERT_EQ(1, checked_in2); |
| } |
| |
| // Check recursive tasks. |
| TEST_F(ThreadPoolTest, RecursiveGroups) { |
| CHECK_UNSUPPORTED(); |
| ThreadPool Pool; |
| ThreadPoolTaskGroup Group(Pool); |
| |
| std::atomic_int checked_in1{0}; |
| |
| for (size_t i = 0; i < 5; ++i) { |
| Group.async([this, &Pool, &checked_in1] { |
| waitForMainThread(); |
| |
| ThreadPoolTaskGroup LocalGroup(Pool); |
| |
| // Check that waiting for an empty group is a no-op. |
| LocalGroup.wait(); |
| |
| std::atomic_int checked_in2{0}; |
| for (size_t i = 0; i < 5; ++i) { |
| LocalGroup.async([&checked_in2] { ++checked_in2; }); |
| } |
| LocalGroup.wait(); |
| ASSERT_EQ(5, checked_in2); |
| |
| ++checked_in1; |
| }); |
| } |
| ASSERT_EQ(0, checked_in1); |
| setMainThreadReady(); |
| Group.wait(); |
| ASSERT_EQ(5, checked_in1); |
| } |
| |
| TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) { |
| CHECK_UNSUPPORTED(); |
| ThreadPoolStrategy S = hardware_concurrency(2); |
| if (S.compute_thread_count() < 2) |
| GTEST_SKIP(); |
| ThreadPool Pool(S); |
| PhaseResetHelper Helper(this); |
| ThreadPoolTaskGroup Group(Pool); |
| |
| // Test that a thread calling wait() for a group and is waiting for more tasks |
| // returns when the last task finishes in a different thread while the waiting |
| // thread was waiting for more tasks to process while waiting. |
| |
| // Task A runs in the first thread. It finishes and leaves |
| // the background thread waiting for more tasks. |
| Group.async([this] { |
| waitForMainThread(); |
| setPhase(2); |
| }); |
| // Task B is run in a second thread, it launches yet another |
| // task C in a different group, which will be handled by the waiting |
| // thread started above. |
| Group.async([this, &Pool] { |
| waitForPhase(2); |
| ThreadPoolTaskGroup LocalGroup(Pool); |
| LocalGroup.async([this] { |
| waitForPhase(3); |
| // Give the other thread enough time to check that there's no task |
| // to process and suspend waiting for a notification. This is indeed racy, |
| // but probably the best that can be done. |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| }); |
| // And task B only now will wait for the tasks in the group (=task C) |
| // to finish. This test checks that it does not deadlock. If the |
| // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place, |
| // this task B would be stuck waiting for tasks to arrive. |
| setPhase(3); |
| LocalGroup.wait(); |
| }); |
| setMainThreadReady(); |
| Group.wait(); |
| } |
| |
| #if LLVM_ENABLE_THREADS == 1 |
| |
| // FIXME: Skip some tests below on non-Windows because multi-socket systems |
| // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask() |
| // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc). |
| #ifdef _WIN32 |
| |
| std::vector<llvm::BitVector> |
| ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) { |
| llvm::SetVector<llvm::BitVector> ThreadsUsed; |
| std::mutex Lock; |
| { |
| std::condition_variable AllThreads; |
| std::mutex AllThreadsLock; |
| unsigned Active = 0; |
| |
| ThreadPool Pool(S); |
| for (size_t I = 0; I < S.compute_thread_count(); ++I) { |
| Pool.async([&] { |
| { |
| std::lock_guard<std::mutex> Guard(AllThreadsLock); |
| ++Active; |
| AllThreads.notify_one(); |
| } |
| waitForMainThread(); |
| std::lock_guard<std::mutex> Guard(Lock); |
| auto Mask = llvm::get_thread_affinity_mask(); |
| ThreadsUsed.insert(Mask); |
| }); |
| } |
| EXPECT_EQ(true, ThreadsUsed.empty()); |
| { |
| std::unique_lock<std::mutex> Guard(AllThreadsLock); |
| AllThreads.wait(Guard, |
| [&]() { return Active == S.compute_thread_count(); }); |
| } |
| setMainThreadReady(); |
| } |
| return ThreadsUsed.takeVector(); |
| } |
| |
| TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) { |
| CHECK_UNSUPPORTED(); |
| // After Windows 11, the OS is free to deploy the threads on any CPU socket. |
| // We cannot relibly ensure that all thread affinity mask are covered, |
| // therefore this test should not run. |
| if (llvm::RunningWindows11OrGreater()) |
| GTEST_SKIP(); |
| std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({}); |
| ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); |
| } |
| |
| TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) { |
| CHECK_UNSUPPORTED(); |
| // After Windows 11, the OS is free to deploy the threads on any CPU socket. |
| // We cannot relibly ensure that all thread affinity mask are covered, |
| // therefore this test should not run. |
| if (llvm::RunningWindows11OrGreater()) |
| GTEST_SKIP(); |
| std::vector<llvm::BitVector> ThreadsUsed = |
| RunOnAllSockets(llvm::heavyweight_hardware_concurrency()); |
| ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); |
| } |
| |
| // From TestMain.cpp. |
| extern const char *TestMainArgv0; |
| |
| // Just a reachable symbol to ease resolving of the executable's path. |
| static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1"); |
| |
| #ifdef _WIN32 |
| #define setenv(name, var, ignore) _putenv_s(name, var) |
| #endif |
| |
| TEST_F(ThreadPoolTest, AffinityMask) { |
| CHECK_UNSUPPORTED(); |
| |
| // Skip this test if less than 4 threads are available. |
| if (llvm::hardware_concurrency().compute_thread_count() < 4) |
| GTEST_SKIP(); |
| |
| using namespace llvm::sys; |
| if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) { |
| std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({}); |
| // Ensure the threads only ran on CPUs 0-3. |
| // NOTE: Don't use ASSERT* here because this runs in a subprocess, |
| // and will show up as un-executed in the parent. |
| assert(llvm::all_of(ThreadsUsed, |
| [](auto &T) { return T.getData().front() < 16UL; }) && |
| "Threads ran on more CPUs than expected! The affinity mask does not " |
| "seem to work."); |
| GTEST_SKIP(); |
| } |
| std::string Executable = |
| sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1); |
| StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"}; |
| |
| // Add environment variable to the environment of the child process. |
| int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false); |
| ASSERT_EQ(Res, 0); |
| |
| std::string Error; |
| bool ExecutionFailed; |
| BitVector Affinity; |
| Affinity.resize(4); |
| Affinity.set(0, 4); // Use CPUs 0,1,2,3. |
| int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error, |
| &ExecutionFailed, nullptr, &Affinity); |
| ASSERT_EQ(0, Ret); |
| } |
| |
| #endif // #ifdef _WIN32 |
| #endif // #if LLVM_ENABLE_THREADS == 1 |