blob: a0817dbe07375e1f29bf8f19290ae9c03fcac1e1 [file] [log] [blame]
//========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#include "llvm/Support/ThreadPool.h"
#include "llvm/ADT/STLExtras.h"
#include "llvm/ADT/SetVector.h"
#include "llvm/ADT/SmallVector.h"
#include "llvm/Support/CommandLine.h"
#include "llvm/Support/Program.h"
#include "llvm/Support/TargetSelect.h"
#include "llvm/Support/Threading.h"
#include "llvm/TargetParser/Host.h"
#include "llvm/TargetParser/Triple.h"
#ifdef _WIN32
#include "llvm/Support/Windows/WindowsSupport.h"
#endif
#include <chrono>
#include <thread>
#include "gtest/gtest.h"
using namespace llvm;
// Fixture for the unittests, allowing to *temporarily* disable the unittests
// on a particular platform
class ThreadPoolTest : public testing::Test {
Triple Host;
SmallVector<Triple::ArchType, 4> UnsupportedArchs;
SmallVector<Triple::OSType, 4> UnsupportedOSs;
SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
protected:
// This is intended for platform as a temporary "XFAIL"
bool isUnsupportedOSOrEnvironment() {
Triple Host(Triple::normalize(sys::getProcessTriple()));
if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
UnsupportedEnvironments.end())
return true;
if (is_contained(UnsupportedOSs, Host.getOS()))
return true;
if (is_contained(UnsupportedArchs, Host.getArch()))
return true;
return false;
}
ThreadPoolTest() {
// Add unsupported configuration here, example:
// UnsupportedArchs.push_back(Triple::x86_64);
// See https://llvm.org/bugs/show_bug.cgi?id=25829
UnsupportedArchs.push_back(Triple::ppc64le);
UnsupportedArchs.push_back(Triple::ppc64);
}
/// Make sure this thread not progress faster than the main thread.
void waitForMainThread() { waitForPhase(1); }
/// Set the readiness of the main thread.
void setMainThreadReady() { setPhase(1); }
/// Wait until given phase is set using setPhase(); first "main" phase is 1.
/// See also PhaseResetHelper below.
void waitForPhase(int Phase) {
std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
CurrentPhaseCondition.wait(
LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; });
}
/// If a thread waits on another phase, the test could bail out on a failed
/// assertion and ThreadPool destructor would wait() on all threads, which
/// would deadlock on the task waiting. Create this helper to automatically
/// reset the phase and unblock such threads.
struct PhaseResetHelper {
PhaseResetHelper(ThreadPoolTest *test) : test(test) {}
~PhaseResetHelper() { test->setPhase(-1); }
ThreadPoolTest *test;
};
/// Advance to the given phase.
void setPhase(int Phase) {
{
std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
assert(Phase == CurrentPhase + 1 || Phase < 0);
CurrentPhase = Phase;
}
CurrentPhaseCondition.notify_all();
}
void SetUp() override { CurrentPhase = 0; }
std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S);
std::condition_variable CurrentPhaseCondition;
std::mutex CurrentPhaseMutex;
int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
};
#define CHECK_UNSUPPORTED() \
do { \
if (isUnsupportedOSOrEnvironment()) \
GTEST_SKIP(); \
} while (0);
TEST_F(ThreadPoolTest, AsyncBarrier) {
CHECK_UNSUPPORTED();
// test that async & barrier work together properly.
std::atomic_int checked_in{0};
ThreadPool Pool;
for (size_t i = 0; i < 5; ++i) {
Pool.async([this, &checked_in] {
waitForMainThread();
++checked_in;
});
}
ASSERT_EQ(0, checked_in);
setMainThreadReady();
Pool.wait();
ASSERT_EQ(5, checked_in);
}
static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
CHECK_UNSUPPORTED();
// Test that async works with a function requiring multiple parameters.
std::atomic_int checked_in{0};
ThreadPool Pool;
for (size_t i = 0; i < 5; ++i) {
Pool.async(TestFunc, std::ref(checked_in), i);
}
Pool.wait();
ASSERT_EQ(10, checked_in);
}
TEST_F(ThreadPoolTest, Async) {
CHECK_UNSUPPORTED();
ThreadPool Pool;
std::atomic_int i{0};
Pool.async([this, &i] {
waitForMainThread();
++i;
});
Pool.async([&i] { ++i; });
ASSERT_NE(2, i.load());
setMainThreadReady();
Pool.wait();
ASSERT_EQ(2, i.load());
}
TEST_F(ThreadPoolTest, GetFuture) {
CHECK_UNSUPPORTED();
ThreadPool Pool(hardware_concurrency(2));
std::atomic_int i{0};
Pool.async([this, &i] {
waitForMainThread();
++i;
});
// Force the future using get()
Pool.async([&i] { ++i; }).get();
ASSERT_NE(2, i.load());
setMainThreadReady();
Pool.wait();
ASSERT_EQ(2, i.load());
}
TEST_F(ThreadPoolTest, GetFutureWithResult) {
CHECK_UNSUPPORTED();
ThreadPool Pool(hardware_concurrency(2));
auto F1 = Pool.async([] { return 1; });
auto F2 = Pool.async([] { return 2; });
setMainThreadReady();
Pool.wait();
ASSERT_EQ(1, F1.get());
ASSERT_EQ(2, F2.get());
}
TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
CHECK_UNSUPPORTED();
ThreadPool Pool(hardware_concurrency(2));
auto Fn = [](int x) { return x; };
auto F1 = Pool.async(Fn, 1);
auto F2 = Pool.async(Fn, 2);
setMainThreadReady();
Pool.wait();
ASSERT_EQ(1, F1.get());
ASSERT_EQ(2, F2.get());
}
TEST_F(ThreadPoolTest, PoolDestruction) {
CHECK_UNSUPPORTED();
// Test that we are waiting on destruction
std::atomic_int checked_in{0};
{
ThreadPool Pool;
for (size_t i = 0; i < 5; ++i) {
Pool.async([this, &checked_in] {
waitForMainThread();
++checked_in;
});
}
ASSERT_EQ(0, checked_in);
setMainThreadReady();
}
ASSERT_EQ(5, checked_in);
}
// Check running tasks in different groups.
TEST_F(ThreadPoolTest, Groups) {
CHECK_UNSUPPORTED();
// Need at least two threads, as the task in group2
// might block a thread until all tasks in group1 finish.
ThreadPoolStrategy S = hardware_concurrency(2);
if (S.compute_thread_count() < 2)
GTEST_SKIP();
ThreadPool Pool(S);
PhaseResetHelper Helper(this);
ThreadPoolTaskGroup Group1(Pool);
ThreadPoolTaskGroup Group2(Pool);
// Check that waiting for an empty group is a no-op.
Group1.wait();
std::atomic_int checked_in1{0};
std::atomic_int checked_in2{0};
for (size_t i = 0; i < 5; ++i) {
Group1.async([this, &checked_in1] {
waitForMainThread();
++checked_in1;
});
}
Group2.async([this, &checked_in2] {
waitForPhase(2);
++checked_in2;
});
ASSERT_EQ(0, checked_in1);
ASSERT_EQ(0, checked_in2);
// Start first group and wait for it.
setMainThreadReady();
Group1.wait();
ASSERT_EQ(5, checked_in1);
// Second group has not yet finished, start it and wait for it.
ASSERT_EQ(0, checked_in2);
setPhase(2);
Group2.wait();
ASSERT_EQ(5, checked_in1);
ASSERT_EQ(1, checked_in2);
}
// Check recursive tasks.
TEST_F(ThreadPoolTest, RecursiveGroups) {
CHECK_UNSUPPORTED();
ThreadPool Pool;
ThreadPoolTaskGroup Group(Pool);
std::atomic_int checked_in1{0};
for (size_t i = 0; i < 5; ++i) {
Group.async([this, &Pool, &checked_in1] {
waitForMainThread();
ThreadPoolTaskGroup LocalGroup(Pool);
// Check that waiting for an empty group is a no-op.
LocalGroup.wait();
std::atomic_int checked_in2{0};
for (size_t i = 0; i < 5; ++i) {
LocalGroup.async([&checked_in2] { ++checked_in2; });
}
LocalGroup.wait();
ASSERT_EQ(5, checked_in2);
++checked_in1;
});
}
ASSERT_EQ(0, checked_in1);
setMainThreadReady();
Group.wait();
ASSERT_EQ(5, checked_in1);
}
TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) {
CHECK_UNSUPPORTED();
ThreadPoolStrategy S = hardware_concurrency(2);
if (S.compute_thread_count() < 2)
GTEST_SKIP();
ThreadPool Pool(S);
PhaseResetHelper Helper(this);
ThreadPoolTaskGroup Group(Pool);
// Test that a thread calling wait() for a group and is waiting for more tasks
// returns when the last task finishes in a different thread while the waiting
// thread was waiting for more tasks to process while waiting.
// Task A runs in the first thread. It finishes and leaves
// the background thread waiting for more tasks.
Group.async([this] {
waitForMainThread();
setPhase(2);
});
// Task B is run in a second thread, it launches yet another
// task C in a different group, which will be handled by the waiting
// thread started above.
Group.async([this, &Pool] {
waitForPhase(2);
ThreadPoolTaskGroup LocalGroup(Pool);
LocalGroup.async([this] {
waitForPhase(3);
// Give the other thread enough time to check that there's no task
// to process and suspend waiting for a notification. This is indeed racy,
// but probably the best that can be done.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
});
// And task B only now will wait for the tasks in the group (=task C)
// to finish. This test checks that it does not deadlock. If the
// `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
// this task B would be stuck waiting for tasks to arrive.
setPhase(3);
LocalGroup.wait();
});
setMainThreadReady();
Group.wait();
}
#if LLVM_ENABLE_THREADS == 1
// FIXME: Skip some tests below on non-Windows because multi-socket systems
// were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
// isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc).
#ifdef _WIN32
std::vector<llvm::BitVector>
ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
llvm::SetVector<llvm::BitVector> ThreadsUsed;
std::mutex Lock;
{
std::condition_variable AllThreads;
std::mutex AllThreadsLock;
unsigned Active = 0;
ThreadPool Pool(S);
for (size_t I = 0; I < S.compute_thread_count(); ++I) {
Pool.async([&] {
{
std::lock_guard<std::mutex> Guard(AllThreadsLock);
++Active;
AllThreads.notify_one();
}
waitForMainThread();
std::lock_guard<std::mutex> Guard(Lock);
auto Mask = llvm::get_thread_affinity_mask();
ThreadsUsed.insert(Mask);
});
}
EXPECT_EQ(true, ThreadsUsed.empty());
{
std::unique_lock<std::mutex> Guard(AllThreadsLock);
AllThreads.wait(Guard,
[&]() { return Active == S.compute_thread_count(); });
}
setMainThreadReady();
}
return ThreadsUsed.takeVector();
}
TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
CHECK_UNSUPPORTED();
// After Windows 11, the OS is free to deploy the threads on any CPU socket.
// We cannot relibly ensure that all thread affinity mask are covered,
// therefore this test should not run.
if (llvm::RunningWindows11OrGreater())
GTEST_SKIP();
std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
}
TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
CHECK_UNSUPPORTED();
// After Windows 11, the OS is free to deploy the threads on any CPU socket.
// We cannot relibly ensure that all thread affinity mask are covered,
// therefore this test should not run.
if (llvm::RunningWindows11OrGreater())
GTEST_SKIP();
std::vector<llvm::BitVector> ThreadsUsed =
RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
}
// From TestMain.cpp.
extern const char *TestMainArgv0;
// Just a reachable symbol to ease resolving of the executable's path.
static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1");
#ifdef _WIN32
#define setenv(name, var, ignore) _putenv_s(name, var)
#endif
TEST_F(ThreadPoolTest, AffinityMask) {
CHECK_UNSUPPORTED();
// Skip this test if less than 4 threads are available.
if (llvm::hardware_concurrency().compute_thread_count() < 4)
GTEST_SKIP();
using namespace llvm::sys;
if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
// Ensure the threads only ran on CPUs 0-3.
// NOTE: Don't use ASSERT* here because this runs in a subprocess,
// and will show up as un-executed in the parent.
assert(llvm::all_of(ThreadsUsed,
[](auto &T) { return T.getData().front() < 16UL; }) &&
"Threads ran on more CPUs than expected! The affinity mask does not "
"seem to work.");
GTEST_SKIP();
}
std::string Executable =
sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1);
StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"};
// Add environment variable to the environment of the child process.
int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
ASSERT_EQ(Res, 0);
std::string Error;
bool ExecutionFailed;
BitVector Affinity;
Affinity.resize(4);
Affinity.set(0, 4); // Use CPUs 0,1,2,3.
int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error,
&ExecutionFailed, nullptr, &Affinity);
ASSERT_EQ(0, Ret);
}
#endif // #ifdef _WIN32
#endif // #if LLVM_ENABLE_THREADS == 1