blob: 78b52126d27f2baa2415cbdd6b1555b1fc2fb971 [file] [log] [blame] [edit]
//===- Session.cpp --------------------------------------------------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
//
// Contains the implementation of the Session class and related APIs.
//
//===----------------------------------------------------------------------===//
#include "orc-rt/Session.h"
namespace orc_rt {
class Session::NotificationService : public Service {
public:
void addOnDetach(Session::OnDetachFn OnDetach) {
ToNotifyOnDetach.push_back(std::move(OnDetach));
}
void addOnShutdown(Session::OnShutdownFn OnShutdown) {
ToNotifyOnShutdown.push_back(std::move(OnShutdown));
}
void onDetach(OnCompleteFn OnComplete, bool ShutdownRequested) override {
while (!ToNotifyOnDetach.empty()) {
auto ToNotify = std::move(ToNotifyOnDetach.back());
ToNotifyOnDetach.pop_back();
ToNotify();
}
OnComplete();
}
void onShutdown(OnCompleteFn OnComplete) override {
while (!ToNotifyOnShutdown.empty()) {
auto ToNotify = std::move(ToNotifyOnShutdown.back());
ToNotifyOnShutdown.pop_back();
ToNotify();
}
OnComplete();
}
private:
std::vector<Session::OnDetachFn> ToNotifyOnDetach;
std::vector<Session::OnShutdownFn> ToNotifyOnShutdown;
};
Session::ControllerAccess::~ControllerAccess() = default;
Session::Session(ExecutorProcessInfo EPI,
std::unique_ptr<TaskDispatcher> Dispatcher,
ErrorReporterFn ReportError)
: EPI(std::move(EPI)), Dispatcher(std::move(Dispatcher)),
ReportError(std::move(ReportError)), Notifiers(addNotificationService()) {
}
Session::~Session() { waitForShutdown(); }
void Session::attach(std::shared_ptr<ControllerAccess> CA, BootstrapInfo BI) {
assert(CA && "attach called with null CA object");
{
std::scoped_lock<std::mutex> Lock(M);
// Controller can only be attached from the start state if no
// other operation has been requested.
if (CurrentState != State::Start || TargetState != State::None)
return;
assert(std::atomic_load(&this->CA) == nullptr &&
"ControllerAccess object already attached?");
std::atomic_store(&this->CA, CA);
TargetState = State::Attached;
}
CA->connect(std::move(BI));
{
std::scoped_lock<std::mutex> Lock(M);
assert(TargetState >= State::Attached);
// There are three possibilities that we have to deal with here:
// 1. Connection succeeded and we're done.
//
// We just need to move to the Attached state, reset TargetState, and
// we're done.
//
// 2. Connect failed.
//
// In this case connect must have called handleDisconnect, which should
// have initiated the detach. We just need to bail out.
//
// 3. Connection succeeded but a detach or shutdown was requested
// concurrently. In this case we need to start the detach process.
//
// To distinguish between these we first look at the target state. If it's
// Attached then it's option (1) and we're done:
if (TargetState == State::Attached) {
CurrentState = State::Attached;
TargetState = State::None;
return;
}
// The target state is Detached or higher. Check the current state. If it's
// also Detached or higher then handleDisconnect must already have been
// called (in turn calling proceedToDetach, which updated the current
// state). In this case we're in option (2) and we just need to bail out.
if (CurrentState >= State::Detached)
return;
// The target state is Detached or higher, but the current state is still
// Start. Someone must have called detach / shutdown concurrently. This is
// option (3) and we just need to update the current state and run
// disconnect.
CurrentState = State::Attached;
}
CA->disconnect();
}
void Session::detach(OnDetachFn OnDetach) {
addOnDetach(std::move(OnDetach));
std::shared_ptr<ControllerAccess> TmpCA;
{
std::unique_lock<std::mutex> Lock(M);
// Check if someone's already managing transitions.
if (TargetState != State::None) {
TargetState = std::max(TargetState, State::Detached);
return;
}
// Nobody's managing transitions, but this request is redundant.
if (CurrentState >= State::Detached)
return;
// We've actually got work to do.
TargetState = State::Detached;
assert((CurrentState == State::Start || CurrentState == State::Attached) &&
"Unexpected current state");
if (CurrentState == State::Attached) {
assert(CA && "Attached, but not CA?");
TmpCA = std::atomic_load(&this->CA);
} else {
assert(CurrentState == State::Start);
proceedToDetach(Lock, std::atomic_exchange(&this->CA, {}));
return;
}
}
TmpCA->disconnect();
}
void Session::shutdown(OnShutdownFn OnShutdown) {
addOnShutdown(std::move(OnShutdown));
std::shared_ptr<ControllerAccess> TmpCA;
{
std::unique_lock<std::mutex> Lock(M);
// Check if someone's already managing transitions.
if (TargetState != State::None) {
TargetState = std::max(TargetState, State::Shutdown);
return;
}
// Nobody's managing transition, but this request is redundant.
if (CurrentState == State::Shutdown)
return;
TargetState = State::Shutdown;
assert((CurrentState == State::Start || CurrentState == State::Attached ||
CurrentState == State::Detached) &&
"Unexpected current state");
switch (CurrentState) {
case State::Start:
proceedToDetach(Lock, nullptr);
return;
case State::Attached:
TmpCA = std::atomic_load(&this->CA);
break;
case State::Detached:
proceedToShutdown(Lock);
return;
default:
assert(false && "Illegal state");
abort();
}
}
TmpCA->disconnect();
}
void Session::waitForShutdown() {
std::promise<void> P;
auto F = P.get_future();
addOnShutdown([P = std::move(P)]() mutable { P.set_value(); });
shutdown();
F.get();
}
void Session::addOnDetach(OnDetachFn OnDetach) {
if (!OnDetach)
return;
{
std::scoped_lock<std::mutex> Lock(M);
if (CurrentState < State::Detached) {
Notifiers.addOnDetach(std::move(OnDetach));
return;
}
}
// We've already detached. Run in-place.
OnDetach();
}
void Session::addOnShutdown(OnShutdownFn OnShutdown) {
if (!OnShutdown)
return;
{
std::scoped_lock<std::mutex> Lock(M);
if (CurrentState < State::Shutdown) {
Notifiers.addOnShutdown(std::move(OnShutdown));
return;
}
}
// We've already shutdown. Run in-place.
OnShutdown();
}
Session::NotificationService &Session::addNotificationService() {
auto NS = std::make_unique<NotificationService>();
auto &TmpNS = *NS;
Services.push_back(std::move(NS));
return TmpNS;
}
void Session::appendService(std::unique_ptr<Service> Srv) {
bool ShuttingDown = false;
{
std::scoped_lock<std::mutex> Lock(M);
if (CurrentState < State::Detached) {
Services.push_back(std::move(Srv));
return;
}
ShuttingDown = TargetState == State::Shutdown;
}
// Already detached. Call onDetach on the service.
assert(Srv && "Should be non-null here");
Srv->onDetach([]() {}, ShuttingDown);
// Try to append again.
{
std::scoped_lock<std::mutex> Lock(M);
if (CurrentState < State::Shutdown) {
Services.push_back(std::move(Srv));
return;
}
}
// Already shutdown. Call onShutdown on the service.
assert(Srv && "Should be non-null here");
Srv->onShutdown([]() {});
// At this point the service has already been shut down, but we need to keep
// the object alive until the Session is destroyed, so append it anyway.
{
std::scoped_lock<std::mutex> Lock(M);
Services.push_back(std::move(Srv));
}
}
void Session::handleDisconnect() {
// If we get here we _don't_ need to call disconnect.
std::unique_lock<std::mutex> Lock(M);
assert(CurrentState <= State::Attached);
TargetState = std::max(TargetState, State::Detached);
proceedToDetach(Lock, std::atomic_exchange(&this->CA, {}));
}
void Session::proceedToDetach(std::unique_lock<std::mutex> &Lock,
std::shared_ptr<ControllerAccess> TmpCA) {
std::vector<Service *> ToNotify;
ToNotify.reserve(Services.size());
for (auto &Srv : Services)
ToNotify.push_back(Srv.get());
bool ShutdownRequested = TargetState == State::Shutdown;
CurrentState = State::Detached;
Lock.unlock();
// Throw away controller if present.
TmpCA.reset();
// Notify services.
detachServices(std::move(ToNotify), ShutdownRequested);
}
void Session::detachServices(std::vector<Service *> ToNotify,
bool ShutdownRequested) {
if (ToNotify.empty())
return completeDetach();
auto *Srv = ToNotify.back();
ToNotify.pop_back();
Srv->onDetach(
[this, ToNotify = std::move(ToNotify), ShutdownRequested]() {
detachServices(std::move(ToNotify), ShutdownRequested);
},
ShutdownRequested);
}
void Session::completeDetach() {
std::unique_lock<std::mutex> Lock(M);
assert(CurrentState == State::Detached);
if (TargetState == State::Detached) {
TargetState = State::None;
return;
}
// Someone must have requested shutdown.
assert(TargetState == State::Shutdown);
proceedToShutdown(Lock);
}
void Session::proceedToShutdown(std::unique_lock<std::mutex> &Lock) {
std::vector<Service *> ToNotify;
ToNotify.reserve(Services.size());
for (auto &Srv : Services)
ToNotify.push_back(Srv.get());
CurrentState = State::Shutdown;
Lock.unlock();
// Notify services.
shutdownServices(std::move(ToNotify));
}
void Session::shutdownServices(std::vector<Service *> ToNotify) {
if (ToNotify.empty())
return completeShutdown();
auto *Srv = ToNotify.back();
ToNotify.pop_back();
Srv->onShutdown([this, ToNotify = std::move(ToNotify)]() {
shutdownServices(std::move(ToNotify));
});
}
void Session::completeShutdown() {
Dispatcher->shutdown();
std::unique_lock<std::mutex> Lock(M);
assert(CurrentState == State::Shutdown);
assert(TargetState == State::Shutdown);
TargetState = State::None;
}
void Session::wrapperReturn(orc_rt_SessionRef S, uint64_t CallId,
orc_rt_WrapperFunctionBuffer ResultBytes) {
unwrap(S)->sendWrapperResult(CallId, WrapperFunctionBuffer(ResultBytes));
}
} // namespace orc_rt