Skip to content

Commit e0a41fe

Browse files
[NPUW] Async weights bank processing and closure guard (#32505)
Alternative to #32424
1 parent ec23e21 commit e0a41fe

File tree

7 files changed

+246
-80
lines changed

7 files changed

+246
-80
lines changed

src/plugins/intel_npu/src/plugin/npuw/base_sync_infer_request.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,10 @@ void ov::npuw::IBaseInferRequest::unpack_closure(std::size_t idx, RqPtr request)
411411
std::vector<std::size_t> closure_unpack_required;
412412
std::vector<std::size_t> closure_copy_required;
413413

414-
for (std::size_t cidx = 0u; cidx < comp_model_desc.closure.size(); cidx++) {
415-
auto& closure = comp_model_desc.closure[cidx];
414+
auto& desc_closure = comp_model_desc.closure.get().closure;
415+
416+
for (std::size_t cidx = 0u; cidx < desc_closure.size(); cidx++) {
417+
auto& closure = desc_closure[cidx];
416418
const auto closure_param_id = comp_model_desc.param_base + cidx;
417419

418420
if (m_npuw_model->is_gather_closure(idx, cidx)) {
@@ -440,7 +442,7 @@ void ov::npuw::IBaseInferRequest::unpack_closure(std::size_t idx, RqPtr request)
440442
// m_ms_unpack += ov::npuw::perf::ms_to_run([&](){
441443
ov::parallel_for(closure_copy_required.size(), [&](std::size_t j) {
442444
auto cidx = closure_copy_required[j];
443-
auto& closure = comp_model_desc.closure[cidx];
445+
auto& closure = desc_closure[cidx];
444446
const auto closure_param_id = comp_model_desc.param_base + cidx;
445447
auto& iport = func_desc.compiled_model->inputs()[closure_param_id];
446448
auto clparam = request->get_tensor(iport);
@@ -455,7 +457,7 @@ void ov::npuw::IBaseInferRequest::unpack_closure(std::size_t idx, RqPtr request)
455457
auto cidx = closure_unpack_required[j];
456458

457459
// FIXME: zerops are stored with absolute indexing, this needs to be aligned
458-
auto& closure = comp_model_desc.closure[cidx];
460+
auto& closure = desc_closure[cidx];
459461

460462
const auto closure_param_id = comp_model_desc.param_base + cidx;
461463
auto& iport = func_desc.compiled_model->inputs()[closure_param_id];
@@ -565,7 +567,8 @@ void ov::npuw::IBaseInferRequest::bind_global_params(std::size_t idx, RqPtr requ
565567
const auto& gport = comp_model_desc.compiled_model->inputs()[comp_model_desc.host_gather.dst_idx];
566568
const auto gather = request->get_tensor(gport);
567569

568-
const auto& vocab = comp_model_desc.closure[comp_model_desc.host_gather.src_idx - comp_model_desc.param_base];
570+
const auto& vocab =
571+
comp_model_desc.closure.get().closure[comp_model_desc.host_gather.src_idx - comp_model_desc.param_base];
569572
const auto& lport = comp_model_desc.compiled_model->inputs()[comp_model_desc.host_gather.idx_idx];
570573
const auto lookup = request->get_tensor(lport);
571574

@@ -926,7 +929,7 @@ bool ov::npuw::IBaseInferRequest::needs_copy(std::size_t idx, std::size_t cidx)
926929
return false;
927930
}
928931
auto& comp_model_desc = m_npuw_model->m_compiled_submodels[idx];
929-
if (comp_model_desc.is_remote[cidx]) {
932+
if (comp_model_desc.closure.get().is_remote[cidx]) {
930933
// FIXME: Test if the tensor device and the request device are
931934
// the same or compatible!
932935
return false;

src/plugins/intel_npu/src/plugin/npuw/compiled_model.cpp

Lines changed: 98 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ ov::npuw::CompiledModel::CompiledModel(const std::shared_ptr<ov::Model>& model,
330330
// - dump the subgraphs, if necessary
331331
std::map<std::string, std::size_t> compiledFunctions;
332332
m_compiled_submodels.resize(orderedSubgraphs.size());
333+
333334
const std::size_t end_sub_idx = orderedSubgraphs.size();
334335

335336
const std::string dump_sub_opt = m_cfg.get<::intel_npu::NPUW_DUMP_SUBS>();
@@ -382,16 +383,18 @@ ov::npuw::CompiledModel::CompiledModel(const std::shared_ptr<ov::Model>& model,
382383
m_compiled_submodels[id].replaced_by = compiled_fcn_iter->second;
383384
LOG_INFO("Subgraph[" << id << "] is a function call to [" << compiled_fcn_iter->second << "]");
384385
}
386+
auto& closure_desc = m_compiled_submodels[id].closure.get();
387+
385388
m_compiled_submodels[id].host_gather = subgraph._host_gather;
386389
m_compiled_submodels[id].quant_unpack_gather = subgraph._quant_unpack_gather;
387390
m_compiled_submodels[id].param_base = fcn_template._param_offset;
388-
m_compiled_submodels[id].closure = subgraph._closure;
391+
closure_desc.closure = subgraph._closure;
389392
m_compiled_submodels[id].lazy_closure = subgraph._lazy_closure;
390-
m_compiled_submodels[id].closure_uid.resize(m_compiled_submodels[id].closure.size(), -1);
393+
closure_desc.closure_uid.resize(subgraph._closure.size(), -1);
391394
m_compiled_submodels[id].scales = subgraph._scales;
392395
m_compiled_submodels[id].zerops = subgraph._zerops;
393396
m_compiled_submodels[id].forced_to_fcall = subgraph._forced_to_fcall;
394-
m_compiled_submodels[id].is_remote.resize(m_compiled_submodels[id].closure.size(), false);
397+
closure_desc.is_remote.resize(subgraph._closure.size(), false);
395398
} // if(!funcall)
396399

397400
if (!m_compiled_submodels[id].model && !m_compiled_submodels[id].replaced_by) {
@@ -649,22 +652,24 @@ void ov::npuw::CompiledModel::CompiledModelDesc::serialize(std::ostream& stream,
649652
write(stream, spatial);
650653
write(stream, attention);
651654

652-
write(stream, is_remote);
653-
write(stream, closure_uid);
655+
auto& closure_desc = closure.get();
656+
657+
write(stream, closure_desc.is_remote);
658+
write(stream, closure_desc.closure_uid);
654659

655660
if (ctx.is_weightless) {
656661
write_weightless(stream, scales, ctx);
657662
write_weightless(stream, zerops, ctx);
658663

659-
write(stream, closure.size());
664+
write(stream, closure_desc.closure.size());
660665
std::vector<ov::Tensor> cpu_closures;
661666
std::vector<std::size_t> cpu_closure_ids;
662667
std::vector<ov::npuw::weights::LazyTensor> non_cpu_tensors;
663668
std::vector<std::size_t> non_cpu_tensors_ids;
664-
for (std::size_t cidx = 0; cidx < closure.size(); ++cidx) {
665-
if (closure_uid[cidx] == -1) { // CPU closure
669+
for (std::size_t cidx = 0; cidx < closure_desc.closure.size(); ++cidx) {
670+
if (closure_desc.closure_uid[cidx] == -1) { // CPU closure
666671
cpu_closure_ids.push_back(cidx);
667-
cpu_closures.push_back(closure[cidx]);
672+
cpu_closures.push_back(closure_desc.closure[cidx]);
668673
} else {
669674
non_cpu_tensors_ids.push_back(cidx);
670675
non_cpu_tensors.push_back(lazy_closure[cidx]); // must be there
@@ -679,13 +684,13 @@ void ov::npuw::CompiledModel::CompiledModelDesc::serialize(std::ostream& stream,
679684
write(stream, scales);
680685
write(stream, zerops);
681686

682-
write(stream, closure.size());
687+
write(stream, closure_desc.closure.size());
683688
std::vector<ov::Tensor> cpu_closures;
684689
std::vector<std::size_t> cpu_closure_ids;
685-
for (std::size_t cidx = 0; cidx < closure.size(); ++cidx) {
686-
if (closure_uid[cidx] == -1) { // CPU closure, not in the bank
690+
for (std::size_t cidx = 0; cidx < closure_desc.closure.size(); ++cidx) {
691+
if (closure_desc.closure_uid[cidx] == -1) { // CPU closure, not in the bank
687692
cpu_closure_ids.push_back(cidx);
688-
cpu_closures.push_back(closure[cidx]);
693+
cpu_closures.push_back(closure_desc.closure[cidx]);
689694
}
690695
}
691696

@@ -724,16 +729,18 @@ void ov::npuw::CompiledModel::CompiledModelDesc::deserialize(std::istream& strea
724729
read(stream, spatial);
725730
read(stream, attention);
726731

727-
read(stream, is_remote);
728-
read(stream, closure_uid);
732+
auto& closure_desc = closure.get();
733+
734+
read(stream, closure_desc.is_remote);
735+
read(stream, closure_desc.closure_uid);
729736

730737
if (ctx.weights || !ctx.consts_cache.empty()) {
731738
read_weightless(stream, scales, ctx);
732739
read_weightless(stream, zerops, ctx);
733740

734741
std::size_t closure_size = 0;
735742
read(stream, closure_size);
736-
closure.resize(closure_size);
743+
closure_desc.closure.resize(closure_size);
737744
lazy_closure.resize(closure_size);
738745

739746
std::vector<std::size_t> cpu_closure_ids;
@@ -743,7 +750,7 @@ void ov::npuw::CompiledModel::CompiledModelDesc::deserialize(std::istream& strea
743750
read_weightless(stream, cpu_closures, ctx);
744751
std::size_t tidx = 0;
745752
for (const auto& idx : cpu_closure_ids) {
746-
closure[idx] = std::move(cpu_closures[tidx++]);
753+
closure_desc.closure[idx] = std::move(cpu_closures[tidx++]);
747754
}
748755

749756
std::vector<std::size_t> non_cpu_tensors_ids;
@@ -757,8 +764,9 @@ void ov::npuw::CompiledModel::CompiledModelDesc::deserialize(std::istream& strea
757764
}
758765

759766
// Also read weights into LazyTensors
760-
for (std::size_t cidx = 0; cidx < closure.size(); ++cidx) {
761-
if (closure_uid[cidx] != -1 && lazy_closure[cidx]) { // previously registered before serialization
767+
for (std::size_t cidx = 0; cidx < closure_desc.closure.size(); ++cidx) {
768+
if (closure_desc.closure_uid[cidx] != -1 &&
769+
lazy_closure[cidx]) { // previously registered before serialization
762770
lazy_closure[cidx].read_weight(ctx);
763771
}
764772
}
@@ -770,15 +778,21 @@ void ov::npuw::CompiledModel::CompiledModelDesc::deserialize(std::istream& strea
770778
read(stream, closure_size);
771779
std::vector<std::size_t> cpu_closure_ids;
772780
read(stream, cpu_closure_ids);
773-
closure.resize(closure_size);
781+
closure_desc.closure.resize(closure_size);
774782
for (const auto& cidx : cpu_closure_ids) {
775-
read(stream, closure[cidx]);
783+
read(stream, closure_desc.closure[cidx]);
776784
}
777785
}
778786

779787
LOG_DEBUG("DONE.");
780788
}
781789

790+
ov::npuw::CompiledModel::~CompiledModel() {
791+
if (m_eval_future.valid()) {
792+
m_eval_future.wait();
793+
}
794+
}
795+
782796
void ov::npuw::CompiledModel::export_model(std::ostream& stream) const {
783797
using namespace ov::npuw::s11n;
784798

@@ -916,7 +930,6 @@ std::shared_ptr<ov::npuw::CompiledModel> ov::npuw::CompiledModel::import_model(
916930
if (is_weightless) {
917931
compiled->m_weights_bank = ov::npuw::weights::bank(bank_name, compiled->get_plugin()->get_core(), "");
918932
compiled->finalize_weights_bank();
919-
compiled->m_import_weights_ctx.reset();
920933
} else {
921934
compiled->m_weights_bank =
922935
ov::npuw::weights::Bank::deserialize(model_stream, compiled->get_plugin()->get_core(), bank_name);
@@ -1021,10 +1034,13 @@ void ov::npuw::CompiledModel::serialize(std::ostream& stream, const ov::npuw::s1
10211034

10221035
// Serialize compiled submodels
10231036
write(model_stream, m_compiled_submodels.size());
1024-
for (const auto& subm : m_compiled_submodels) {
1037+
for (std::size_t i = 0; i < m_compiled_submodels.size(); ++i) {
1038+
auto& subm = m_compiled_submodels[i];
1039+
auto real_idx = subm.replaced_by.value_or(i);
10251040
// Write device idx
1026-
std::size_t device_idx = subm.device_it - m_dev_list.begin();
1027-
write(model_stream, device_idx);
1041+
// FIXME: if there is no compiled submodel, device_it is not set.
1042+
std::size_t device_idx = m_compiled_submodels[real_idx].device_it - m_dev_list.begin();
1043+
write(model_stream, real_idx == i ? device_idx : 0);
10281044
// Write ICompiledModel if it's there
10291045
if (subm.compiled_model) {
10301046
write(model_stream, true);
@@ -1232,49 +1248,79 @@ void ov::npuw::CompiledModel::reconstruct_closure() {
12321248

12331249
const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
12341250
auto& func_desc = m_compiled_submodels[real_idx];
1251+
auto& desc_closure = comp_model_desc.closure.get();
12351252

1236-
for (std::size_t cidx = 0; cidx < comp_model_desc.closure.size(); ++cidx) {
1237-
if (comp_model_desc.closure[cidx]) {
1253+
for (std::size_t cidx = 0; cidx < desc_closure.closure.size(); ++cidx) {
1254+
if (desc_closure.closure[cidx]) {
12381255
// host-side closure - already set, do nothing
1239-
NPUW_ASSERT(!comp_model_desc.is_remote[cidx]);
1256+
NPUW_ASSERT(!desc_closure.is_remote[cidx]);
12401257
continue;
12411258
}
1242-
NPUW_ASSERT(comp_model_desc.closure_uid[cidx] != -1);
1243-
comp_model_desc.closure[cidx] =
1244-
m_weights_bank->get(comp_model_desc.closure_uid[cidx], *func_desc.device_it);
1259+
NPUW_ASSERT(desc_closure.closure_uid[cidx] != -1);
1260+
desc_closure.closure[cidx] = m_weights_bank->get(desc_closure.closure_uid[cidx], *func_desc.device_it);
12451261
}
12461262
}
12471263
}
12481264

12491265
void ov::npuw::CompiledModel::finalize_weights_bank() {
12501266
LOG_INFO("Finalizing weights bank...");
1251-
// Register lazy tensors
1252-
for (std::size_t idx = 0; idx < m_compiled_submodels.size(); ++idx) {
1253-
auto& comp_model_desc = m_compiled_submodels[idx];
1267+
std::shared_future<void> weights_bank_evaluation = std::async(std::launch::async, [&]() {
1268+
// Register lazy tensors
1269+
for (std::size_t idx = 0; idx < m_compiled_submodels.size(); ++idx) {
1270+
auto& comp_model_desc = m_compiled_submodels[idx];
12541271

1255-
// Skip optimized out and non-functions
1256-
if (!comp_model_desc.compiled_model && !comp_model_desc.replaced_by) {
1257-
continue;
1258-
}
1272+
// Skip optimized out and non-functions
1273+
if (!comp_model_desc.compiled_model && !comp_model_desc.replaced_by) {
1274+
continue;
1275+
}
12591276

1260-
const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
1261-
auto& func_desc = m_compiled_submodels[real_idx];
1277+
const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
1278+
auto& func_desc = m_compiled_submodels[real_idx];
12621279

1263-
for (std::size_t tidx = 0; tidx < comp_model_desc.lazy_closure.size(); ++tidx) {
1264-
if (comp_model_desc.closure[tidx]) {
1265-
continue; // host-side closure
1280+
for (std::size_t tidx = 0; tidx < comp_model_desc.lazy_closure.size(); ++tidx) {
1281+
if (comp_model_desc.closure.unsafe_get().closure[tidx]) {
1282+
continue; // host-side closure
1283+
}
1284+
comp_model_desc.closure.unsafe_get().closure_uid[tidx] =
1285+
m_weights_bank->registerLT(comp_model_desc.lazy_closure[tidx], *func_desc.device_it);
12661286
}
1267-
comp_model_desc.closure_uid[tidx] =
1268-
m_weights_bank->registerLT(comp_model_desc.lazy_closure[tidx], *func_desc.device_it);
12691287
}
1270-
}
12711288

1272-
// Evaluate and allocate all LazyTensors inside the bank
1273-
m_profile["weights bank"].record([&]() {
1289+
// Evaluate and allocate all LazyTensors inside the bank
12741290
m_weights_bank->evaluate_and_allocate();
1291+
1292+
// Set evaluated and allocated ov::Tensors to closures
1293+
for (size_t idx = 0; idx < m_compiled_submodels.size(); ++idx) {
1294+
auto& comp_model_desc = m_compiled_submodels[idx];
1295+
1296+
// Skip optimized out and non-functions
1297+
if (!comp_model_desc.compiled_model && !comp_model_desc.replaced_by) {
1298+
continue;
1299+
}
1300+
1301+
const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
1302+
auto& func_desc = m_compiled_submodels[real_idx];
1303+
auto& desc_closure = comp_model_desc.closure.unsafe_get();
1304+
1305+
for (std::size_t tidx = 0; tidx < desc_closure.closure.size(); ++tidx) {
1306+
if (desc_closure.closure[tidx]) {
1307+
// host-side closure - already set, do nothing
1308+
desc_closure.is_remote[tidx] = false;
1309+
continue;
1310+
}
1311+
const auto& uid = desc_closure.closure_uid[tidx];
1312+
NPUW_ASSERT(uid != -1); // All tensors should be registered at this point
1313+
desc_closure.closure[tidx] = m_weights_bank->get(uid, *func_desc.device_it);
1314+
// FIXME: find a more reliable way to do so
1315+
desc_closure.is_remote[tidx] = m_weights_bank->is_remote(uid);
1316+
}
1317+
}
1318+
1319+
m_import_weights_ctx.reset();
12751320
});
12761321

1277-
// Set evaluated and allocated ov::Tensors to closures
1322+
m_eval_future = weights_bank_evaluation;
1323+
12781324
for (size_t idx = 0; idx < m_compiled_submodels.size(); ++idx) {
12791325
auto& comp_model_desc = m_compiled_submodels[idx];
12801326

@@ -1283,21 +1329,7 @@ void ov::npuw::CompiledModel::finalize_weights_bank() {
12831329
continue;
12841330
}
12851331

1286-
const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
1287-
auto& func_desc = m_compiled_submodels[real_idx];
1288-
1289-
for (std::size_t tidx = 0; tidx < comp_model_desc.closure.size(); ++tidx) {
1290-
if (comp_model_desc.closure[tidx]) {
1291-
// host-side closure - already set, do nothing
1292-
comp_model_desc.is_remote[tidx] = false;
1293-
continue;
1294-
}
1295-
const auto& uid = comp_model_desc.closure_uid[tidx];
1296-
NPUW_ASSERT(uid != -1); // All tensors should be registered at this point
1297-
comp_model_desc.closure[tidx] = m_weights_bank->get(uid, *func_desc.device_it);
1298-
// FIXME: find a more reliable way to do so
1299-
comp_model_desc.is_remote[tidx] = m_weights_bank->is_remote(uid);
1300-
}
1332+
comp_model_desc.closure.set_future(weights_bank_evaluation);
13011333
}
13021334

13031335
LOG_INFO("Done.");
@@ -1662,7 +1694,7 @@ std::string ov::npuw::CompiledModel::submodel_device(const std::size_t idx) cons
16621694

16631695
bool ov::npuw::CompiledModel::unpack_required(const std::size_t idx) const {
16641696
auto& comp_model_desc = m_compiled_submodels.at(idx);
1665-
for (std::size_t cidx = 0u; cidx < comp_model_desc.closure.size(); cidx++) {
1697+
for (std::size_t cidx = 0u; cidx < comp_model_desc.closure.get().closure.size(); cidx++) {
16661698
if (unpack_required(idx, cidx)) {
16671699
return true;
16681700
}
@@ -1679,7 +1711,7 @@ bool ov::npuw::CompiledModel::unpack_required(const std::size_t idx, const std::
16791711
const auto real_idx = comp_model_desc.replaced_by.value();
16801712
auto& func_desc = m_compiled_submodels.at(real_idx);
16811713

1682-
auto& closure = comp_model_desc.closure.at(cidx);
1714+
auto& closure = comp_model_desc.closure.get().closure.at(cidx);
16831715
const auto closure_param_id = comp_model_desc.param_base + cidx;
16841716

16851717
auto& iport = func_desc.compiled_model->inputs()[closure_param_id];

0 commit comments

Comments
 (0)