| //===------- SimpleEPCServer.cpp - EPC over simple abstract channel -------===// |
| // |
| // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
| // See https://llvm.org/LICENSE.txt for license information. |
| // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| // |
| //===----------------------------------------------------------------------===// |
| |
| #include "llvm/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.h" |
| |
| #include "llvm/ExecutionEngine/Orc/Shared/TargetProcessControlTypes.h" |
| #include "llvm/Support/FormatVariadic.h" |
| #include "llvm/Support/Host.h" |
| #include "llvm/Support/Process.h" |
| |
| #include "OrcRTBootstrap.h" |
| |
| #define DEBUG_TYPE "orc" |
| |
| using namespace llvm::orc::shared; |
| |
| namespace llvm { |
| namespace orc { |
| |
| ExecutorBootstrapService::~ExecutorBootstrapService() {} |
| |
| SimpleRemoteEPCServer::Dispatcher::~Dispatcher() {} |
| |
| #if LLVM_ENABLE_THREADS |
| void SimpleRemoteEPCServer::ThreadDispatcher::dispatch( |
| unique_function<void()> Work) { |
| { |
| std::lock_guard<std::mutex> Lock(DispatchMutex); |
| if (!Running) |
| return; |
| ++Outstanding; |
| } |
| |
| std::thread([this, Work = std::move(Work)]() mutable { |
| Work(); |
| std::lock_guard<std::mutex> Lock(DispatchMutex); |
| --Outstanding; |
| OutstandingCV.notify_all(); |
| }).detach(); |
| } |
| |
| void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() { |
| std::unique_lock<std::mutex> Lock(DispatchMutex); |
| Running = false; |
| OutstandingCV.wait(Lock, [this]() { return Outstanding == 0; }); |
| } |
| #endif |
| |
| StringMap<ExecutorAddr> SimpleRemoteEPCServer::defaultBootstrapSymbols() { |
| StringMap<ExecutorAddr> DBS; |
| rt_bootstrap::addTo(DBS); |
| return DBS; |
| } |
| |
| Expected<SimpleRemoteEPCTransportClient::HandleMessageAction> |
| SimpleRemoteEPCServer::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo, |
| ExecutorAddr TagAddr, |
| SimpleRemoteEPCArgBytesVector ArgBytes) { |
| |
| LLVM_DEBUG({ |
| dbgs() << "SimpleRemoteEPCServer::handleMessage: opc = "; |
| switch (OpC) { |
| case SimpleRemoteEPCOpcode::Setup: |
| dbgs() << "Setup"; |
| assert(SeqNo == 0 && "Non-zero SeqNo for Setup?"); |
| assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Setup?"); |
| break; |
| case SimpleRemoteEPCOpcode::Hangup: |
| dbgs() << "Hangup"; |
| assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?"); |
| assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Hangup?"); |
| break; |
| case SimpleRemoteEPCOpcode::Result: |
| dbgs() << "Result"; |
| assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Result?"); |
| break; |
| case SimpleRemoteEPCOpcode::CallWrapper: |
| dbgs() << "CallWrapper"; |
| break; |
| } |
| dbgs() << ", seqno = " << SeqNo |
| << ", tag-addr = " << formatv("{0:x}", TagAddr.getValue()) |
| << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size()) |
| << " bytes\n"; |
| }); |
| |
| using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>; |
| if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC)) |
| return make_error<StringError>("Unexpected opcode", |
| inconvertibleErrorCode()); |
| |
| // TODO: Clean detach message? |
| switch (OpC) { |
| case SimpleRemoteEPCOpcode::Setup: |
| return make_error<StringError>("Unexpected Setup opcode", |
| inconvertibleErrorCode()); |
| case SimpleRemoteEPCOpcode::Hangup: |
| return SimpleRemoteEPCTransportClient::EndSession; |
| case SimpleRemoteEPCOpcode::Result: |
| if (auto Err = handleResult(SeqNo, TagAddr, std::move(ArgBytes))) |
| return std::move(Err); |
| break; |
| case SimpleRemoteEPCOpcode::CallWrapper: |
| handleCallWrapper(SeqNo, TagAddr, std::move(ArgBytes)); |
| break; |
| } |
| return ContinueSession; |
| } |
| |
| Error SimpleRemoteEPCServer::waitForDisconnect() { |
| std::unique_lock<std::mutex> Lock(ServerStateMutex); |
| ShutdownCV.wait(Lock, [this]() { return RunState == ServerShutDown; }); |
| return std::move(ShutdownErr); |
| } |
| |
| void SimpleRemoteEPCServer::handleDisconnect(Error Err) { |
| PendingJITDispatchResultsMap TmpPending; |
| |
| { |
| std::lock_guard<std::mutex> Lock(ServerStateMutex); |
| std::swap(TmpPending, PendingJITDispatchResults); |
| RunState = ServerShuttingDown; |
| } |
| |
| // Send out-of-band errors to any waiting threads. |
| for (auto &KV : TmpPending) |
| KV.second->set_value( |
| shared::WrapperFunctionResult::createOutOfBandError("disconnecting")); |
| |
| // Wait for dispatcher to clear. |
| D->shutdown(); |
| |
| // Shut down services. |
| while (!Services.empty()) { |
| ShutdownErr = |
| joinErrors(std::move(ShutdownErr), Services.back()->shutdown()); |
| Services.pop_back(); |
| } |
| |
| std::lock_guard<std::mutex> Lock(ServerStateMutex); |
| ShutdownErr = joinErrors(std::move(ShutdownErr), std::move(Err)); |
| RunState = ServerShutDown; |
| ShutdownCV.notify_all(); |
| } |
| |
| Error SimpleRemoteEPCServer::sendMessage(SimpleRemoteEPCOpcode OpC, |
| uint64_t SeqNo, ExecutorAddr TagAddr, |
| ArrayRef<char> ArgBytes) { |
| |
| LLVM_DEBUG({ |
| dbgs() << "SimpleRemoteEPCServer::sendMessage: opc = "; |
| switch (OpC) { |
| case SimpleRemoteEPCOpcode::Setup: |
| dbgs() << "Setup"; |
| assert(SeqNo == 0 && "Non-zero SeqNo for Setup?"); |
| assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Setup?"); |
| break; |
| case SimpleRemoteEPCOpcode::Hangup: |
| dbgs() << "Hangup"; |
| assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?"); |
| assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Hangup?"); |
| break; |
| case SimpleRemoteEPCOpcode::Result: |
| dbgs() << "Result"; |
| assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Result?"); |
| break; |
| case SimpleRemoteEPCOpcode::CallWrapper: |
| dbgs() << "CallWrapper"; |
| break; |
| } |
| dbgs() << ", seqno = " << SeqNo |
| << ", tag-addr = " << formatv("{0:x}", TagAddr.getValue()) |
| << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size()) |
| << " bytes\n"; |
| }); |
| auto Err = T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes); |
| LLVM_DEBUG({ |
| if (Err) |
| dbgs() << " \\--> SimpleRemoteEPC::sendMessage failed\n"; |
| }); |
| return Err; |
| } |
| |
| Error SimpleRemoteEPCServer::sendSetupMessage( |
| StringMap<ExecutorAddr> BootstrapSymbols) { |
| |
| using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames; |
| |
| std::vector<char> SetupPacket; |
| SimpleRemoteEPCExecutorInfo EI; |
| EI.TargetTriple = sys::getProcessTriple(); |
| if (auto PageSize = sys::Process::getPageSize()) |
| EI.PageSize = *PageSize; |
| else |
| return PageSize.takeError(); |
| EI.BootstrapSymbols = std::move(BootstrapSymbols); |
| |
| assert(!EI.BootstrapSymbols.count(ExecutorSessionObjectName) && |
| "Dispatch context name should not be set"); |
| assert(!EI.BootstrapSymbols.count(DispatchFnName) && |
| "Dispatch function name should not be set"); |
| EI.BootstrapSymbols[ExecutorSessionObjectName] = ExecutorAddr::fromPtr(this); |
| EI.BootstrapSymbols[DispatchFnName] = ExecutorAddr::fromPtr(jitDispatchEntry); |
| |
| using SPSSerialize = |
| shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>; |
| auto SetupPacketBytes = |
| shared::WrapperFunctionResult::allocate(SPSSerialize::size(EI)); |
| shared::SPSOutputBuffer OB(SetupPacketBytes.data(), SetupPacketBytes.size()); |
| if (!SPSSerialize::serialize(OB, EI)) |
| return make_error<StringError>("Could not send setup packet", |
| inconvertibleErrorCode()); |
| |
| return sendMessage(SimpleRemoteEPCOpcode::Setup, 0, ExecutorAddr(), |
| {SetupPacketBytes.data(), SetupPacketBytes.size()}); |
| } |
| |
| Error SimpleRemoteEPCServer::handleResult( |
| uint64_t SeqNo, ExecutorAddr TagAddr, |
| SimpleRemoteEPCArgBytesVector ArgBytes) { |
| std::promise<shared::WrapperFunctionResult> *P = nullptr; |
| { |
| std::lock_guard<std::mutex> Lock(ServerStateMutex); |
| auto I = PendingJITDispatchResults.find(SeqNo); |
| if (I == PendingJITDispatchResults.end()) |
| return make_error<StringError>("No call for sequence number " + |
| Twine(SeqNo), |
| inconvertibleErrorCode()); |
| P = I->second; |
| PendingJITDispatchResults.erase(I); |
| releaseSeqNo(SeqNo); |
| } |
| auto R = shared::WrapperFunctionResult::allocate(ArgBytes.size()); |
| memcpy(R.data(), ArgBytes.data(), ArgBytes.size()); |
| P->set_value(std::move(R)); |
| return Error::success(); |
| } |
| |
| void SimpleRemoteEPCServer::handleCallWrapper( |
| uint64_t RemoteSeqNo, ExecutorAddr TagAddr, |
| SimpleRemoteEPCArgBytesVector ArgBytes) { |
| D->dispatch([this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() { |
| using WrapperFnTy = |
| shared::CWrapperFunctionResult (*)(const char *, size_t); |
| auto *Fn = TagAddr.toPtr<WrapperFnTy>(); |
| shared::WrapperFunctionResult ResultBytes( |
| Fn(ArgBytes.data(), ArgBytes.size())); |
| if (auto Err = sendMessage(SimpleRemoteEPCOpcode::Result, RemoteSeqNo, |
| ExecutorAddr(), |
| {ResultBytes.data(), ResultBytes.size()})) |
| ReportError(std::move(Err)); |
| }); |
| } |
| |
| shared::WrapperFunctionResult |
| SimpleRemoteEPCServer::doJITDispatch(const void *FnTag, const char *ArgData, |
| size_t ArgSize) { |
| uint64_t SeqNo; |
| std::promise<shared::WrapperFunctionResult> ResultP; |
| auto ResultF = ResultP.get_future(); |
| { |
| std::lock_guard<std::mutex> Lock(ServerStateMutex); |
| if (RunState != ServerRunning) |
| return shared::WrapperFunctionResult::createOutOfBandError( |
| "jit_dispatch not available (EPC server shut down)"); |
| |
| SeqNo = getNextSeqNo(); |
| assert(!PendingJITDispatchResults.count(SeqNo) && "SeqNo already in use"); |
| PendingJITDispatchResults[SeqNo] = &ResultP; |
| } |
| |
| if (auto Err = sendMessage(SimpleRemoteEPCOpcode::CallWrapper, SeqNo, |
| ExecutorAddr::fromPtr(FnTag), {ArgData, ArgSize})) |
| ReportError(std::move(Err)); |
| |
| return ResultF.get(); |
| } |
| |
| shared::CWrapperFunctionResult |
| SimpleRemoteEPCServer::jitDispatchEntry(void *DispatchCtx, const void *FnTag, |
| const char *ArgData, size_t ArgSize) { |
| return reinterpret_cast<SimpleRemoteEPCServer *>(DispatchCtx) |
| ->doJITDispatch(FnTag, ArgData, ArgSize) |
| .release(); |
| } |
| |
| } // end namespace orc |
| } // end namespace llvm |