Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bell/enable bind in auto ctput #1

Open
wants to merge 1 commit into
base: guozhong/auto_ctput_donot_call_multi
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions src/plugins/auto/auto_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,9 @@ void AutoSchedule::init(const ScheduleContext::Ptr& sContext) {
std::end(validDevices),
std::back_inserter(_autoSContext->_devicePriorities));
// Total number of devices in CTPUT
_nCTputDeviceNums = validDevices.size();
auto nCTputDeviceNums = validDevices.size();
// Generate contexts for loading each device
_pCTPUTLoadContext.reset(new AutoLoadContext[_nCTputDeviceNums]);
_pCTPUTLoadContext.reset(new AutoLoadContext[nCTputDeviceNums]);
int idx = 0;
DeviceInformation cpuDeviceInformation;
for (auto& device : validDevices) {
Expand Down Expand Up @@ -388,12 +388,12 @@ void AutoSchedule::init(const ScheduleContext::Ptr& sContext) {
std::vector<Task> otherDevicesloads;
std::vector<Task> cpuLoads;
if (_pCTPUTLoadContext) {
for (size_t i = 0; i < _nCTputDeviceNums; i++) {
for (size_t i = 0; i < _autoSContext->_devicePriorities.size(); i++) {
auto* contextPtr = &_pCTPUTLoadContext[i];
auto modelPath = _autoSContext->_modelPath;
auto network = _autoSContext->_network;
_pCTPUTLoadContext[i].task = std::bind(loadDeviceTask, contextPtr, modelPath, network, isCumulative);
if (i == _nCTputDeviceNums - 1 &&
if (i == _autoSContext->_devicePriorities.size() - 1 &&
_pCTPUTLoadContext[i].deviceInfo.deviceName.find("CPU") != std::string::npos) {
cpuLoads.push_back(_pCTPUTLoadContext[i].task);
} else {
Expand Down Expand Up @@ -648,7 +648,7 @@ void AutoSchedule::WaitFirstNetworkReady() {
// devices loaded successfully in CTPUT
if (_pCTPUTLoadContext) {
int nLoadSucNums = 0;
for (size_t i = 0; i < _nCTputDeviceNums; i++) {
for (size_t i = 0; i < _autoSContext->_devicePriorities.size(); i++) {
// check if device loaded successfully
if (_pCTPUTLoadContext[i].isAlready) {
nLoadSucNums++;
Expand Down Expand Up @@ -778,24 +778,12 @@ IInferPtr AutoSchedule::CreateInferRequest() {
syncRequestImpl = CreateInferRequestImpl(execNetwork->_networkInputs, execNetwork->_networkOutputs);
syncRequestImpl->setPointerToExecutableNetworkInternal(execNetwork);
if (_passthroughExeNet) {
std::string perfmode;
try {
perfmode = _passthroughExeNet->GetConfig(
CONFIG_KEY(PERFORMANCE_HINT)).as<std::string>();
} catch (const IE::Exception&) {
LOG_INFO("query perf hint from passthrough network failed");
}
if (_autoSContext->_batchingDisabled || perfmode != CONFIG_VALUE(THROUGHPUT)) {
syncRequestImpl->setPointerToSo(_passthroughExeNet._so);
} else {
auto so = _passthroughExeNet._ptr->GetPointerToSo();
// Get the _so from passthrough executable network when batch plugin is disable.
if (!so)
so = _passthroughExeNet._so;
syncRequestImpl->setPointerToSo(so);
}
auto so = _passthroughExeNet._ptr->GetPointerToSo();
// Get the _so from passthrough executable network when batch plugin is disable.
if (!so)
so = _passthroughExeNet._so;
syncRequestImpl->setPointerToSo(so);
} else if (std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest()) {
// cumulative case, load to MULTI:*
auto sharedMultiRequest = std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest();
if (sharedMultiRequest._ptr->getPointerToSo())
syncRequestImpl->setPointerToSo(sharedMultiRequest._ptr->getPointerToSo());
Expand Down
3 changes: 1 addition & 2 deletions src/plugins/auto/auto_schedule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class AutoSchedule : public MultiSchedule {
static bool RunPipelineTask(IE::Task& inferPipelineTask, NotBusyPriorityWorkerRequests& idleWorkerRequests,
const DeviceName& preferred_device);
DeviceMap<NotBusyPriorityWorkerRequests> _idleWorkerRequests;
AutoScheduleContext::Ptr _autoSContext;

private:
void WaitFirstNetworkReady();
Expand All @@ -73,8 +74,6 @@ class AutoSchedule : public MultiSchedule {
std::promise<void> _firstLoadPromise;
bool _exitFlag = {false};
size_t _cpuHelpInferCount = 0;
AutoScheduleContext::Ptr _autoSContext;
size_t _nCTputDeviceNums = 0;
};

} // namespace MultiDevicePlugin
152 changes: 26 additions & 126 deletions src/plugins/auto/bind_multi_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,140 +6,40 @@
#include "async_infer_request.hpp"
#include "plugin.hpp"
#include "bind_multi_schedule.hpp"
#include "multi_executable_network.hpp"
// ------------------------------MultiSchedule----------------------------
namespace MultiDevicePlugin {

thread_local IE::IInferRequestInternal* BinderMultiSchedule::_sharedRequest = nullptr;

void BinderMultiSchedule::init(const ScheduleContext::Ptr& sContext) {
MultiSchedule::init(sContext);
AutoSchedule::init(sContext);
LOG_INFO_TAG("enable bind buffer for AUTO");
}

Pipeline BinderMultiSchedule::GetPipeline(const IInferPtr& syncInferRequest, WorkerInferRequest** workerInferRequest) {
Pipeline pipeline = {
// if the request is coming with device-specific remote blobs make sure it is scheduled to the specific device only:
Stage {
/*TaskExecutor*/ std::make_shared<IE::ImmediateExecutor>(), /*task*/ [this, &syncInferRequest, workerInferRequest]() {
// by default, no preferred device:
_thisPreferredDeviceName = "";
auto execNetwork = _multiSContext->_executableNetwork.lock();
// if any input is remote (e.g. was set with SetBlob), let' use the corresponding device
for (const auto& it : execNetwork->GetInputsInfo()) {
auto b = syncInferRequest->GetBlob(it.first);
auto r = b->as<IE::RemoteBlob>();
if (r) {
const auto name = r->getDeviceName();
const auto res = std::find_if(
_multiSContext->_devicePrioritiesInitial.cbegin(),
_multiSContext->_devicePrioritiesInitial.cend(),
[&name](const MultiDevicePlugin::DeviceInformation & d) {
return (d.defaultDeviceID.empty() ? d.deviceName : (d.deviceName + "." +
d.defaultDeviceID)) == name;
});
if (_multiSContext->_devicePrioritiesInitial.cend() == res) {
IE_THROW() <<
"None of the devices (for which current MULTI-device configuration was "
"initialized) supports a remote blob created on the device named " << name;
} else {
// it is ok to take the c_str() here (as pointed in the executable_network.hpp we need to use const char*)
// as the original strings are from the "persistent" vector (with the right lifetime)
_thisPreferredDeviceName = res->deviceName.c_str();
break;
}
}
}
_thisWorkerInferRequest = *workerInferRequest;
_sharedRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>(syncInferRequest)->GetSharedRequest()._ptr.get();
}},
// as the scheduling algo may select any device, this stage accepts the scheduling decision (actual workerRequest)
// then sets the device-agnostic blobs to the actual (device-specific) request
Stage {
/*TaskExecutor*/std::dynamic_pointer_cast<IE::ITaskExecutor>(shared_from_this()), /*task*/ [&syncInferRequest, workerInferRequest]() {
*workerInferRequest = _thisWorkerInferRequest;
auto multiSyncInferRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>(syncInferRequest);
multiSyncInferRequest->SetBlobsToAnotherRequest(_thisWorkerInferRequest->_inferRequest);
INFO_RUN([workerInferRequest]() {
(*workerInferRequest)->_startTimes.push_back(std::chrono::steady_clock::now());
});
}},
// final task in the pipeline:
Stage {
/*TaskExecutor*/std::make_shared<ThisRequestExecutor>(workerInferRequest), /*task*/ [this, &syncInferRequest, workerInferRequest]() {
if (nullptr != (*workerInferRequest)->_exceptionPtr) {
std::rethrow_exception((*workerInferRequest)->_exceptionPtr);
}
if (_multiSContext->_needPerfCounters) {
auto multiSyncInferRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>
(syncInferRequest);
multiSyncInferRequest->_scheduledRequest =
(*workerInferRequest)->_inferRequest;
}
INFO_RUN([workerInferRequest]() {
(*workerInferRequest)->_endTimes.push_back(std::chrono::steady_clock::now());
Pipeline pipeline;
struct RequestExecutor : ITaskExecutor {
explicit RequestExecutor(InferenceEngine::SoIInferRequestInternal& inferRequest) : _inferRequest(inferRequest) {
_inferRequest->SetCallback([this](std::exception_ptr exceptionPtr) mutable {
_exceptionPtr = exceptionPtr;
auto capturedTask = std::move(_task);
capturedTask();
});
}}
};
return pipeline;
}

bool BinderMultiSchedule::ScheduleToWorkerInferRequest(IE::Task inferPipelineTask, DeviceName preferred_device) {
std::vector<DeviceInformation> devices;
devices = [&] {
std::lock_guard<std::mutex> lock(_multiSContext->_mutex);
return _multiSContext->_devicePriorities;
}();
for (auto&& device : devices) {
if (!preferred_device.empty() && (device.deviceName != preferred_device)) {
continue;
}
if (RunPipelineTask(inferPipelineTask, _idleWorkerRequests[device.deviceName], preferred_device)) {
return true;
}
}
// no vacant requests this time, storing the task to the respective queue
if (!preferred_device.empty()) {
_inferPipelineTasksDeviceSpecific[preferred_device]->push(std::move(inferPipelineTask));
} else {
_inferPipelineTasks.push(std::move(inferPipelineTask));
}
return false;
}

bool BinderMultiSchedule::RunPipelineTask(IE::Task& inferPipelineTask,
NotBusyWorkerRequests& idleWorkerRequests,
const DeviceName& preferred_device) {
WorkerInferRequest* workerRequestPtr = nullptr;
WorkerInferRequest* headWorker = nullptr;
bool flag = false;
while (idleWorkerRequests.try_pop(workerRequestPtr)) {
if (flag && workerRequestPtr == headWorker)
break;
if (!flag) {
headWorker = workerRequestPtr;
flag = true;
}
IdleGuard<NotBusyWorkerRequests> idleGuard{workerRequestPtr, idleWorkerRequests};
if (_sharedRequest == workerRequestPtr->_inferRequest._ptr.get()) {
_thisWorkerInferRequest = workerRequestPtr;
{
auto capturedTask = std::move(inferPipelineTask);
capturedTask();
}
idleGuard.Release();
return true;
void run(InferenceEngine::Task task) override {
_task = std::move(task);
_inferRequest->StartAsync();
};
InferenceEngine::SoIInferRequestInternal& _inferRequest;
std::exception_ptr _exceptionPtr;
InferenceEngine::Task _task;
};
auto requestExecutor =
std::make_shared<RequestExecutor>(std::static_pointer_cast<MultiDeviceInferRequest>(syncInferRequest)->GetSharedRequest());
pipeline.emplace_back(requestExecutor, [requestExecutor] {
if (nullptr != requestExecutor->_exceptionPtr) {
std::rethrow_exception(requestExecutor->_exceptionPtr);
}
}
return false;
}

void BinderMultiSchedule::run(IE::Task inferPipelineTask) {
if (_thisWorkerInferRequest) {
auto capturedTask = std::move(inferPipelineTask);
capturedTask();
} else {
ScheduleToWorkerInferRequest(std::move(inferPipelineTask), _thisPreferredDeviceName);
}
});
return pipeline;
}

BinderMultiSchedule::~BinderMultiSchedule() {
Expand All @@ -153,7 +53,7 @@ IInferPtr BinderMultiSchedule::CreateInferRequestImpl(
SoInfer request_to_share_blobs_with;
// borrowing device-specific blobs from the underlying requests for the device-agnostic, user-facing requests
// this allows to potentially save on the data-copy later (if the requests are scheduled in the same order)
for (const auto& device : _multiSContext->_devicePrioritiesInitial) {
for (const auto& device : _autoSContext->_devicePrioritiesInitial) {
auto& dev_requests = _workerRequests[device.deviceName];
if ((num - sum) < dev_requests.size()) {
request_to_share_blobs_with = dev_requests.at(num - sum)._inferRequest;
Expand All @@ -177,7 +77,7 @@ IInferPtr BinderMultiSchedule::CreateInferRequestImpl(IE::InputsDataMap networkI
size_t sum = 0;
// borrowing device-specific blobs from the underlying requests for the device-agnostic, user-facing requests
// this allows to potentially save on the data-copy later (if the requests are scheduled in the same order)
for (const auto& device : _multiSContext->_devicePrioritiesInitial) {
for (const auto& device : _autoSContext->_devicePrioritiesInitial) {
auto& dev_requests = _workerRequests[device.deviceName];
if ((num - sum) < dev_requests.size()) {
request_to_share_blobs_with = dev_requests.at(num - sum)._inferRequest;
Expand Down
12 changes: 2 additions & 10 deletions src/plugins/auto/bind_multi_schedule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
///////////////////////////////////////////////////////////////////////////////////////////////////
#pragma once

#include "multi_schedule.hpp"
#include "auto_schedule.hpp"

#ifdef MULTIUNITTEST
#define MOCKTESTMACRO virtual
Expand All @@ -15,22 +15,14 @@
#endif

namespace MultiDevicePlugin {
class BinderMultiSchedule : public MultiSchedule {
class BinderMultiSchedule : public AutoSchedule {
public:
using Ptr = std::shared_ptr<BinderMultiSchedule>;
IInferPtr CreateInferRequestImpl(IE::InputsDataMap networkInputs, IE::OutputsDataMap networkOutputs) override;
IE::IInferRequestInternal::Ptr CreateInferRequestImpl(const std::vector<std::shared_ptr<const ov::Node>>& inputs,
const std::vector<std::shared_ptr<const ov::Node>>& outputs) override;
void run(IE::Task inferTask) override;
void init(const ScheduleContext::Ptr& sContext) override;
Pipeline GetPipeline(const IInferPtr& syncRequestImpl, WorkerInferRequest** WorkerInferRequest) override;
virtual ~BinderMultiSchedule();

protected:
static bool RunPipelineTask(IE::Task& inferPipelineTask, NotBusyWorkerRequests& idleWorkerRequests, const DeviceName& preferred_device);
bool ScheduleToWorkerInferRequest(IE::Task, DeviceName preferred_device = "") override;

protected:
thread_local static IE::IInferRequestInternal* _sharedRequest;
};
} // namespace MultiDevicePlugin
1 change: 0 additions & 1 deletion src/plugins/auto/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ class MultiScheduleContext : public ScheduleContext {
std::mutex _mutex;
bool _needPerfCounters;
bool _batchingDisabled = {false};
bool _bindBuffer = false;
bool _startupfallback = true;
bool _runtimeFallback = true;
virtual ~MultiScheduleContext() = default;
Expand Down
25 changes: 7 additions & 18 deletions src/plugins/auto/multi_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,26 +307,15 @@ IInferPtr MultiSchedule::CreateInferRequest() {
syncRequestImpl = CreateInferRequestImpl(execNetwork->_networkInputs, execNetwork->_networkOutputs);
syncRequestImpl->setPointerToExecutableNetworkInternal(execNetwork);
if (_passthroughExeNet) {
std::string perfmode;
try {
perfmode = _passthroughExeNet->GetConfig(
CONFIG_KEY(PERFORMANCE_HINT)).as<std::string>();
} catch (const IE::Exception&) {
LOG_INFO("query perf hint from passthrough network failed");
}
if (_multiSContext->_batchingDisabled || perfmode != CONFIG_VALUE(THROUGHPUT)) {
syncRequestImpl->setPointerToSo(_passthroughExeNet._so);
} else {
auto so = _passthroughExeNet._ptr->GetPointerToSo();
// Get the _so from passthrough executable network when batch plugin is disable.
if (!so)
so = _passthroughExeNet._so;
syncRequestImpl->setPointerToSo(so);
}
} else if (_multiSContext->_bindBuffer) {
auto so = _passthroughExeNet._ptr->GetPointerToSo();
// Get the _so from passthrough executable network when batch plugin is disable.
if (!so)
so = _passthroughExeNet._so;
syncRequestImpl->setPointerToSo(so);
} else if (std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest()) {
auto sharedRequest = std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest();
if (sharedRequest._ptr->getPointerToSo())
syncRequestImpl->setPointerToSo(sharedRequest._ptr->getPointerToSo());
syncRequestImpl->setPointerToSo(sharedRequest._ptr->getPointerToSo());
else
syncRequestImpl->setPointerToSo(sharedRequest._so);
}
Expand Down
Loading