@@ -329,92 +329,39 @@ nixl_status_t
329329nixlDocaEngine::nixlDocaInitNotif (const std::string &remote_agent,
330330 struct doca_dev *dev,
331331 struct doca_gpu *gpu) {
332- struct nixlDocaNotif * notif;
332+ std::unique_ptr< nixlDocaNotif> notif;
333333
334334 std::lock_guard<std::mutex> lock (notifLock);
335335 // Same peer can be server or client
336336 if (notifMap.find (remote_agent) != notifMap.end ()) {
337337 NIXL_DEBUG << " nixlDocaInitNotif already found " << remote_agent << std::endl;
338- goto exit_success;
339- }
340-
341- notif = new struct nixlDocaNotif ;
342-
343- notif->elems_num = DOCA_MAX_NOTIF_INFLIGHT;
344- notif->elems_size = DOCA_MAX_NOTIF_MESSAGE_SIZE;
345- notif->send_addr = (uint8_t *)calloc (notif->elems_size * notif->elems_num , sizeof (uint8_t ));
346- if (notif->send_addr == nullptr ) {
347- NIXL_ERROR << " Can't alloc memory for send notif" ;
348- return NIXL_ERR_BACKEND;
349- }
350- memset (notif->send_addr , 0 , notif->elems_size * notif->elems_num );
351-
352- try {
353- notif->send_mmap = new nixlDocaMmap (notif->send_addr , notif->elems_num , notif->elems_size , ddev);
354- } catch (const std::exception &e) {
355- goto error;
338+ return NIXL_SUCCESS;
356339 }
357-
358340 try {
359- notif->send_barr = new nixlDocaBarr (notif->send_mmap ->mmap , notif->elems_num , (size_t )notif->elems_size , gdevs[0 ].second );
360- } catch (const std::exception &e) {
361- goto error;
341+ notif = std::make_unique<nixlDocaNotif>(
342+ DOCA_MAX_NOTIF_INFLIGHT, DOCA_MAX_NOTIF_MESSAGE_SIZE, ddev, gdevs[0 ].second );
362343 }
363-
364- notif->recv_addr = (uint8_t *)calloc (notif->elems_size * notif->elems_num , sizeof (uint8_t ));
365- if (notif->recv_addr == nullptr ) {
366- NIXL_ERROR << " Can't alloc memory for send notif" ;
344+ catch (const std::exception &e) {
345+ NIXL_ERROR << " Failed to create nixlDocaNotif " << e.what ();
367346 return NIXL_ERR_BACKEND;
368347 }
369- memset (notif->recv_addr , 0 , notif->elems_size * notif->elems_num );
370-
371- try {
372- notif->recv_mmap = new nixlDocaMmap (notif->recv_addr , notif->elems_num , notif->elems_size , ddev);
373- } catch (const std::exception &e) {
374- goto error;
375- }
376-
377- try {
378- notif->recv_barr = new nixlDocaBarr (notif->recv_mmap ->mmap , notif->elems_num , (size_t )notif->elems_size , gdevs[0 ].second );
379- } catch (const std::exception &e) {
380- goto error;
381- }
382-
383- notif->send_pi = 0 ;
384- notif->recv_pi = 0 ;
385348
386349 // Ensure notif list is not added twice for the same peer
387- notifMap[remote_agent] = notif;
388- ((volatile struct docaNotifRecv *)notif_fill_cpu)->barr_gpu = notif->recv_barr ->barr_gpu ;
350+ notifMap[remote_agent] = notif. release () ;
351+ ((volatile struct docaNotifRecv *)notif_fill_cpu)->barr_gpu = notif->recvBarr_ ->barr_gpu ;
389352 std::atomic_thread_fence (std::memory_order_release);
390353 ((volatile struct docaNotifRecv *)notif_fill_cpu)->rdma_qp =
391354 qpMap[remote_agent]->rdma_gpu_notif ;
392355 while (((volatile struct docaNotifRecv *)notif_fill_cpu)->rdma_qp != nullptr )
393356 ;
394357
395358 NIXL_INFO << " nixlDocaInitNotif added new qp for " << remote_agent << std::endl;
396-
397- exit_success:
398359 return NIXL_SUCCESS;
399-
400- error:
401- delete notif->send_mmap ;
402- delete notif->send_barr ;
403-
404- delete notif->recv_mmap ;
405- delete notif->recv_barr ;
406-
407- return NIXL_ERR_BACKEND;
408360}
409361
410362nixl_status_t
411- nixlDocaEngine::nixlDocaDestroyNotif (struct doca_gpu *gpu, struct nixlDocaNotif *notif) {
412- delete notif->send_mmap ;
413- delete notif->send_barr ;
414-
415- delete notif->recv_mmap ;
416- delete notif->recv_barr ;
417-
363+ nixlDocaEngine::nixlDocaDestroyNotif (struct doca_gpu *gpu, struct nixlDocaNotif *notif) {
364+ delete notif;
418365 return NIXL_SUCCESS;
419366}
420367
@@ -695,6 +642,7 @@ nixlDocaEngine::addRdmaQp (const std::string &remote_agent) {
695642 return NIXL_SUCCESS;
696643
697644exit_error:
645+ delete rdma_qp;
698646 return NIXL_ERR_BACKEND;
699647}
700648
@@ -1050,15 +998,15 @@ nixlDocaEngine::loadRemoteConnInfo (const std::string &remote_agent,
1050998 nixlDocaConnection conn;
1051999 size_t size = remote_conn_info.size ();
10521000 // TODO: eventually std::byte?
1053- char * addr = new char [size] ;
1001+ auto addr = std::make_unique< char []>(size) ;
10541002
10551003 if (remoteConnMap.find (remote_agent) != remoteConnMap.end ()) {
10561004 return NIXL_ERR_INVALID_PARAM;
10571005 }
10581006
1059- nixlSerDes::_stringToBytes ((void *)addr, remote_conn_info, size);
1007+ nixlSerDes::_stringToBytes ((void *)addr. get () , remote_conn_info, size);
10601008
1061- int ret = oob_connection_client_setup (addr, &oob_sock_client);
1009+ int ret = oob_connection_client_setup (addr. get () , &oob_sock_client);
10621010 if (ret < 0 ) {
10631011 NIXL_ERROR << " Can't connect to server " << ret;
10641012 return NIXL_ERR_BACKEND;
@@ -1082,8 +1030,6 @@ nixlDocaEngine::loadRemoteConnInfo (const std::string &remote_agent,
10821030
10831031 close (oob_sock_client);
10841032
1085- delete[] addr;
1086-
10871033 return NIXL_SUCCESS;
10881034}
10891035
@@ -1103,6 +1049,7 @@ nixlDocaEngine::registerMem (const nixlBlobDesc &mem,
11031049 });
11041050 if (it == gdevs.end ()) {
11051051 NIXL_ERROR << " Can't register memory for unknown device " << mem.devId ;
1052+ delete priv;
11061053 return NIXL_ERR_INVALID_PARAM;
11071054 }
11081055
@@ -1137,7 +1084,7 @@ nixlDocaEngine::registerMem (const nixlBlobDesc &mem,
11371084error:
11381085 delete priv->mem .mmap ;
11391086 delete priv->mem .barr ;
1140-
1087+ delete priv;
11411088 return NIXL_ERR_BACKEND;
11421089}
11431090
@@ -1173,16 +1120,16 @@ nixlDocaEngine::loadRemoteMD (const nixlBlobDesc &input,
11731120
11741121 if (search == remoteConnMap.end ()) {
11751122 NIXL_ERROR << " err: remote connection not found remote_agent " << remote_agent;
1176- return NIXL_ERR_NOT_FOUND ;
1123+ goto error ;
11771124 }
1178-
11791125 conn = (nixlDocaConnection)search->second ;
11801126
11811127 // directly copy underlying conn struct
11821128 md->conn = conn;
11831129
1184- // Empty mmap, filled with imported data
1130+ // Empty mmap, filled with imported data
11851131 try {
1132+ md->mem .mmap = nullptr ;
11861133 md->mem .mmap = new nixlDocaMmap ();
11871134 } catch (const std::exception &e) {
11881135 goto error;
@@ -1191,12 +1138,13 @@ nixlDocaEngine::loadRemoteMD (const nixlBlobDesc &input,
11911138 result = doca_mmap_create_from_export (nullptr , input.metaInfo .data (), size, ddev, &md->mem .mmap ->mmap );
11921139 if (result != DOCA_SUCCESS) {
11931140 NIXL_ERROR << " Function doca_mmap_create_from_export failed "
1194- << doca_error_get_descr (result);
1195- return NIXL_ERR_BACKEND ;
1141+ << doca_error_get_descr (result);
1142+ goto error ;
11961143 }
11971144
11981145 /* Remote buffer array */
11991146 try {
1147+ md->mem .barr = nullptr ;
12001148 md->mem .barr = new nixlDocaBarr (md->mem .mmap ->mmap , 1 , (size_t )size, gdevs[0 ].second );
12011149 } catch (const std::exception &e) {
12021150 goto error;
@@ -1208,7 +1156,8 @@ nixlDocaEngine::loadRemoteMD (const nixlBlobDesc &input,
12081156
12091157error:
12101158 delete md->mem .barr ;
1211-
1159+ delete md->mem .mmap ;
1160+ delete md;
12121161 return NIXL_ERR_BACKEND;
12131162}
12141163
@@ -1246,14 +1195,21 @@ nixlDocaEngine::prepXfer (const nixl_xfer_op_t &operation,
12461195 auto search = qpMap.find (remote_agent);
12471196 if (search == qpMap.end ()) {
12481197 NIXL_ERROR << " Can't find remote_agent " << remote_agent;
1198+ delete treq;
12491199 return NIXL_ERR_INVALID_PARAM;
12501200 }
12511201
12521202 rdma_qp = search->second ;
12531203
1254- if (lcnt != rcnt) return NIXL_ERR_INVALID_PARAM;
1204+ if (lcnt != rcnt) {
1205+ delete treq;
1206+ return NIXL_ERR_INVALID_PARAM;
1207+ }
12551208
1256- if (lcnt == 0 ) return NIXL_ERR_INVALID_PARAM;
1209+ if (lcnt == 0 ) {
1210+ delete treq;
1211+ return NIXL_ERR_INVALID_PARAM;
1212+ }
12571213
12581214 if (opt_args->customParam .empty ()) {
12591215 stream_id = (xferStream.fetch_add (1 ) & (nstreams - 1 ));
@@ -1302,6 +1258,7 @@ nixlDocaEngine::prepXfer (const nixl_xfer_op_t &operation,
13021258 auto search = notifMap.find (remote_agent);
13031259 if (search == notifMap.end ()) {
13041260 // NIXL_ERROR << "Can't find notif for remote_agent " << remote_agent;
1261+ delete treq;
13051262 return NIXL_ERR_INVALID_PARAM;
13061263 }
13071264
@@ -1312,14 +1269,14 @@ nixlDocaEngine::prepXfer (const nixl_xfer_op_t &operation,
13121269 msg_tag_end + opt_args->notifMsg ;
13131270
13141271 xferReqRingCpu[treq->end_pos - 1 ].has_notif_msg_idx =
1315- (notif->send_pi .fetch_add (1 ) & (notif->elems_num - 1 ));
1272+ (notif->sendPi_ .fetch_add (1 ) & (notif->elemsNum_ - 1 ));
13161273 xferReqRingCpu[treq->end_pos - 1 ].msg_sz = newMsg.size ();
1317- xferReqRingCpu[treq->end_pos - 1 ].notif_barr_gpu = notif->send_barr ->barr_gpu ;
1274+ xferReqRingCpu[treq->end_pos - 1 ].notif_barr_gpu = notif->sendBarr_ ->barr_gpu ;
13181275
1319- memcpy (notif->send_addr +
1320- (xferReqRingCpu[treq->end_pos - 1 ].has_notif_msg_idx * notif->elems_size ),
1321- newMsg.c_str (),
1322- newMsg.size ());
1276+ memcpy (notif->sendAddr_ +
1277+ (xferReqRingCpu[treq->end_pos - 1 ].has_notif_msg_idx * notif->elemsSize_ ),
1278+ newMsg.c_str (),
1279+ newMsg.size ());
13231280
13241281 NIXL_DEBUG << " DOCA prepXfer with notif to " << remote_agent << " at "
13251282 << xferReqRingCpu[treq->end_pos - 1 ].has_notif_msg_idx << " msg " << newMsg
@@ -1420,14 +1377,14 @@ nixlDocaEngine::getNotifs (notif_list_t ¬if_list) {
14201377 while (num_msg > 0 ) {
14211378 NIXL_DEBUG << " CPU num_msg " << num_msg;
14221379
1423- recv_idx = notif.second ->recv_pi .load () & (DOCA_MAX_NOTIF_INFLIGHT - 1 );
1424- addr = (char *)(notif.second ->recv_addr + (recv_idx * notif.second ->elems_size ));
1380+ recv_idx = notif.second ->recvPi_ .load () & (DOCA_MAX_NOTIF_INFLIGHT - 1 );
1381+ addr = (char *)(notif.second ->recvAddr_ + (recv_idx * notif.second ->elemsSize_ ));
14251382 msg_src = addr;
14261383 position = msg_src.find (msg_tag_start);
14271384
14281385 NIXL_DEBUG << " getNotifs idx " << recv_idx << " addr "
1429- << (void *)((notif.second ->recv_addr +
1430- (recv_idx * notif.second ->elems_size )))
1386+ << (void *)((notif.second ->recvAddr_ +
1387+ (recv_idx * notif.second ->elemsSize_ )))
14311388 << " msg " << msg_src << " position " << position << std::endl;
14321389
14331390 if (position != std::string::npos && position == 0 ) {
@@ -1445,7 +1402,7 @@ nixlDocaEngine::getNotifs (notif_list_t ¬if_list) {
14451402 notif_list.push_back (std::pair (notif.first , msg));
14461403 // Tag cleanup
14471404 memset (addr, 0 , msg_tag_start.size ());
1448- recv_idx = notif.second ->recv_pi .fetch_add (1 );
1405+ recv_idx = notif.second ->recvPi_ .fetch_add (1 );
14491406 num_msg--;
14501407 } else {
14511408 NIXL_ERROR << " getNotifs error message at " << num_msg;
@@ -1486,17 +1443,17 @@ nixlDocaEngine::genNotif (const std::string &remote_agent, const std::string &ms
14861443 }
14871444
14881445 rdma_gpu = searchQp->second ->rdma_gpu_notif ;
1489- std::string newMsg = msg_tag_start + std::to_string ((int )msg.size ()) + msg_tag_end + msg;
1490- buf_idx = (notif->send_pi .fetch_add (1 ) & (notif->elems_num - 1 ));
1491- memcpy (notif->send_addr + (buf_idx * notif->elems_size ), newMsg .c_str (), newMsg .size ());
1446+ std::string new_msg = msg_tag_start + std::to_string ((int )msg.size ()) + msg_tag_end + msg;
1447+ buf_idx = (notif->sendPi_ .fetch_add (1 ) & (notif->elemsNum_ - 1 ));
1448+ memcpy (notif->sendAddr_ + (buf_idx * notif->elemsSize_ ), new_msg .c_str (), new_msg .size ());
14921449
1493- NIXL_DEBUG << " genNotif to " << remote_agent << " msg size " << std::to_string ((int )msg.size ())
1494- << " msg " << newMsg << " at " << buf_idx;
1450+ NIXL_DEBUG << " genNotif to " << remote_agent << " msg size " << std::to_string ((int )msg.size ())
1451+ << " msg " << new_msg << " at " << buf_idx;
14951452
14961453 std::lock_guard<std::mutex> lock (notifSendLock);
1497- ((volatile struct docaNotifSend *)notif_send_cpu)->barr_gpu = notif->send_barr ->barr_gpu ;
1454+ ((volatile struct docaNotifSend *)notif_send_cpu)->barr_gpu = notif->sendBarr_ ->barr_gpu ;
14981455 ((volatile struct docaNotifSend *)notif_send_cpu)->buf_idx = buf_idx;
1499- ((volatile struct docaNotifSend *)notif_send_cpu)->msg_sz = newMsg .size ();
1456+ ((volatile struct docaNotifSend *)notif_send_cpu)->msg_sz = new_msg .size ();
15001457 // membar
15011458 std::atomic_thread_fence (std::memory_order_release);
15021459 ((volatile struct docaNotifSend *)notif_send_cpu)->rdma_qp = rdma_gpu;
@@ -1505,3 +1462,36 @@ nixlDocaEngine::genNotif (const std::string &remote_agent, const std::string &ms
15051462
15061463 return NIXL_SUCCESS;
15071464}
1465+
1466+ nixlDocaNotif::nixlDocaNotif (uint32_t elems_num,
1467+ uint32_t elems_size,
1468+ struct doca_dev *dev,
1469+ struct doca_gpu *gpu)
1470+ : elemsNum_(elems_num),
1471+ elemsSize_(elems_size),
1472+ sendAddr_(nullptr ),
1473+ sendPi_(0 ),
1474+ sendMmap_(nullptr ),
1475+ sendBarr_(nullptr ),
1476+ recvAddr_(nullptr ),
1477+ recvPi_(0 ),
1478+ recvMmap_(nullptr ),
1479+ recvBarr_(nullptr ) {
1480+ sendAddr_ = new uint8_t [elems_num * elems_size];
1481+ recvAddr_ = new uint8_t [elems_num * elems_size];
1482+ memset (sendAddr_, 0 , elems_num * elems_size);
1483+ memset (recvAddr_, 0 , elems_num * elems_size);
1484+ sendMmap_ = new nixlDocaMmap (sendAddr_, elems_num, elems_size, dev);
1485+ recvMmap_ = new nixlDocaMmap (recvAddr_, elems_num, elems_size, dev);
1486+ sendBarr_ = new nixlDocaBarr (sendMmap_->mmap , elems_num, elems_size, gpu);
1487+ recvBarr_ = new nixlDocaBarr (recvMmap_->mmap , elems_num, elems_size, gpu);
1488+ }
1489+
1490+ nixlDocaNotif::~nixlDocaNotif () {
1491+ delete sendAddr_;
1492+ delete recvAddr_;
1493+ delete sendMmap_;
1494+ delete recvMmap_;
1495+ delete sendBarr_;
1496+ delete recvBarr_;
1497+ }
0 commit comments