1717#ifndef __TRANSFER_HANDLER_H
1818#define __TRANSFER_HANDLER_H
1919
20+ #include < random>
2021#include < absl/time/clock.h>
2122#include < absl/time/time.h>
2223#include " backend_engine.h"
2627
2728namespace gtest ::plugins {
2829
30+ int
31+ getRandomInt (int min, int max) {
32+ std::random_device rd;
33+ std::mt19937 gen (rd ());
34+ std::uniform_int_distribution<> dist (min, max);
35+ return dist (gen);
36+ }
37+
38+ struct transferMemConfig {
39+ const size_t numEntries_ = 1 ;
40+ const size_t entrySize_ = 64 ;
41+ const size_t numBufs_ = 1 ;
42+ const uint8_t srcBufByte_ = getRandomInt(0 , 255 );
43+ const uint8_t dstBufByte_ = getRandomInt(0 , 255 );
44+
45+ size_t
46+ bufSize () const {
47+ return numEntries_ * entrySize_;
48+ }
49+ };
50+
2951template <nixl_mem_t srcMemType, nixl_mem_t dstMemType> class transferHandler {
3052public:
3153 transferHandler (std::shared_ptr<nixlBackendEngine> src_engine,
3254 std::shared_ptr<nixlBackendEngine> dst_engine,
3355 std::string src_agent_name,
3456 std::string dst_agent_name,
35- bool split_buf,
36- int num_bufs)
57+ transferMemConfig mem_cfg = transferMemConfig())
3758 : srcBackendEngine_(src_engine),
3859 dstBackendEngine_ (dst_engine),
60+ srcDescs_(std::make_unique<nixl_meta_dlist_t >(srcMemType)),
61+ dstDescs_(std::make_unique<nixl_meta_dlist_t >(dstMemType)),
62+ memConfig_(std::move(mem_cfg)),
3963 srcAgentName_(src_agent_name),
4064 dstAgentName_(dst_agent_name),
41- srcDevId_(0 ) {
42-
43- bool remote_xfer = srcAgentName_ != dstAgentName_;
44- if (remote_xfer) {
45- CHECK (src_engine->supportsRemote ()) << " Local engine does not support remote transfers" ;
46- dstDevId_ = 1 ;
47- verifyConnInfo ();
48- } else {
49- CHECK (src_engine->supportsLocal ()) << " Local engine does not support local transfers" ;
50- dstDevId_ = srcDevId_;
51- }
65+ isRemoteXfer_(srcAgentName_ != dstAgentName_),
66+ srcDevId_(0 ),
67+ dstDevId_(isRemoteXfer_ ? 1 : 0 ) {
68+ if (dstBackendEngine_->supportsNotif ()) setupNotifs (" Test" );
69+ }
5270
53- for (int i = 0 ; i < num_bufs; i++) {
71+ void
72+ setupMems () {
73+ for (size_t i = 0 ; i < memConfig_.numBufs_ ; i++) {
5474 srcMem_.emplace_back (
55- std::make_unique<memoryHandler<srcMemType>>(BUF_SIZE , srcDevId_ + i));
75+ std::make_unique<memoryHandler<srcMemType>>(memConfig_. bufSize () , srcDevId_ + i));
5676 dstMem_.emplace_back (
57- std::make_unique<memoryHandler<dstMemType>>(BUF_SIZE , dstDevId_ + i));
77+ std::make_unique<memoryHandler<dstMemType>>(memConfig_. bufSize () , dstDevId_ + i));
5878 }
5979
60- if (dstBackendEngine_->supportsNotif ()) setupNotifs (" Test" );
61-
6280 registerMems ();
63- prepMems (split_buf, remote_xfer );
81+ prepareMems ( );
6482 }
6583
6684 ~transferHandler () {
@@ -71,47 +89,89 @@ template<nixl_mem_t srcMemType, nixl_mem_t dstMemType> class transferHandler {
7189
7290 void
7391 testTransfer (nixl_xfer_op_t op) {
74- performTransfer (op);
92+ verifyConnInfo ();
93+ ASSERT_EQ (prepareTransfer (op), NIXL_SUCCESS);
94+ ASSERT_EQ (postTransfer (op), NIXL_SUCCESS);
95+ ASSERT_EQ (waitForTransfer (), NIXL_SUCCESS);
96+ ASSERT_EQ (srcBackendEngine_->releaseReqH (xferHandle_), NIXL_SUCCESS);
7597 verifyTransfer (op);
7698 }
7799
100+ nixl_status_t
101+ prepareTransfer (nixl_xfer_op_t op) {
102+ return srcBackendEngine_->prepXfer (
103+ op, *srcDescs_, *dstDescs_, dstAgentName_, xferHandle_, &xferOptArgs_);
104+ }
105+
106+ nixl_status_t
107+ postTransfer (nixl_xfer_op_t op) {
108+ nixl_status_t ret;
109+ ret = srcBackendEngine_->postXfer (
110+ op, *srcDescs_, *dstDescs_, dstAgentName_, xferHandle_, &xferOptArgs_);
111+ return (ret == NIXL_SUCCESS || ret == NIXL_IN_PROG) ? NIXL_SUCCESS : NIXL_ERR_BACKEND;
112+ }
113+
114+ nixl_status_t
115+ waitForTransfer () {
116+ nixl_status_t ret = NIXL_IN_PROG;
117+ auto end_time = absl::Now () + absl::Seconds (3 );
118+
119+ NIXL_INFO << " \t\t Waiting for transfer to complete..." ;
120+ while (ret == NIXL_IN_PROG && absl::Now () < end_time) {
121+ ret = srcBackendEngine_->checkXfer (xferHandle_);
122+ if (ret != NIXL_SUCCESS && ret != NIXL_IN_PROG) return ret;
123+
124+ if (dstBackendEngine_->supportsProgTh ()) dstBackendEngine_->progress ();
125+ }
126+ NIXL_INFO << " \n Transfer complete" ;
127+
128+ return NIXL_SUCCESS;
129+ }
130+
78131 void
79- setLocalMem () {
132+ addSrcDesc (nixlMetaDesc &meta_desc) {
133+ srcDescs_->addDesc (meta_desc);
134+ }
135+
136+ void
137+ addDstDesc (nixlMetaDesc &meta_desc) {
138+ dstDescs_->addDesc (meta_desc);
139+ }
140+
141+ void
142+ setSrcMem () {
80143 for (size_t i = 0 ; i < srcMem_.size (); i++)
81- srcMem_[i]->setIncreasing (LOCAL_BUF_BYTE + i);
144+ srcMem_[i]->setIncreasing (memConfig_. srcBufByte_ + i);
82145 }
83146
84147 void
85- resetLocalMem () {
148+ resetSrcMem () {
86149 for (const auto &mem : srcMem_)
87150 mem->reset ();
88151 }
89152
90153 void
91- checkLocalMem () {
154+ checkSrcMem () {
92155 for (size_t i = 0 ; i < srcMem_.size (); i++)
93- EXPECT_TRUE (srcMem_[i]->checkIncreasing (LOCAL_BUF_BYTE + i));
156+ EXPECT_TRUE (srcMem_[i]->checkIncreasing (memConfig_. srcBufByte_ + i));
94157 }
95158
96159private:
97- static constexpr uint8_t LOCAL_BUF_BYTE = 0x11 ;
98- static constexpr uint8_t XFER_BUF_BYTE = 0x22 ;
99- static constexpr size_t NUM_ENTRIES = 4 ;
100- static constexpr size_t ENTRY_SIZE = 16 ;
101- static constexpr size_t BUF_SIZE = NUM_ENTRIES * ENTRY_SIZE;
102-
103160 std::vector<std::unique_ptr<memoryHandler<srcMemType>>> srcMem_;
104161 std::vector<std::unique_ptr<memoryHandler<dstMemType>>> dstMem_;
105- std::shared_ptr<nixlBackendEngine> srcBackendEngine_;
106- std::shared_ptr<nixlBackendEngine> dstBackendEngine_;
107- std::unique_ptr<nixl_meta_dlist_t > srcDescs_;
108- std::unique_ptr<nixl_meta_dlist_t > dstDescs_;
162+ const std::shared_ptr<nixlBackendEngine> srcBackendEngine_;
163+ const std::shared_ptr<nixlBackendEngine> dstBackendEngine_;
164+ const std::unique_ptr<nixl_meta_dlist_t > srcDescs_;
165+ const std::unique_ptr<nixl_meta_dlist_t > dstDescs_;
166+ const transferMemConfig memConfig_;
167+ const std::string srcAgentName_;
168+ const std::string dstAgentName_;
109169 nixl_opt_b_args_t xferOptArgs_;
110170 nixlBackendMD *xferLoadedMd_;
111- std::string srcAgentName_ ;
112- std::string dstAgentName_ ;
113- int srcDevId_;
114- int dstDevId_;
171+ nixlBackendReqH *xferHandle_ ;
172+ const bool isRemoteXfer_ ;
173+ const int srcDevId_;
174+ const int dstDevId_;
115175
116176 void
117177 registerMems () {
@@ -139,8 +199,8 @@ template<nixl_mem_t srcMemType, nixl_mem_t dstMemType> class transferHandler {
139199 }
140200
141201 void
142- prepMems ( bool split_buf, bool remote_xfer ) {
143- if (remote_xfer ) {
202+ prepareMems ( ) {
203+ if (isRemoteXfer_ ) {
144204 nixlBlobDesc info;
145205 dstMem_[0 ]->populateBlobDesc (&info);
146206 ASSERT_EQ (srcBackendEngine_->getPublicData (dstMem_[0 ]->getMD (), info.metaInfo ),
@@ -154,51 +214,17 @@ template<nixl_mem_t srcMemType, nixl_mem_t dstMemType> class transferHandler {
154214 NIXL_SUCCESS);
155215 }
156216
157- srcDescs_ = std::make_unique<nixl_meta_dlist_t >(srcMemType);
158- dstDescs_ = std::make_unique<nixl_meta_dlist_t >(dstMemType);
159-
160- int num_entries = split_buf ? NUM_ENTRIES : 1 ;
161- int entry_size = split_buf ? ENTRY_SIZE : BUF_SIZE;
162217 for (size_t i = 0 ; i < srcMem_.size (); i++) {
163- for (int entry_i = 0 ; entry_i < num_entries ; entry_i++) {
218+ for (size_t entry_i = 0 ; entry_i < memConfig_. numEntries_ ; entry_i++) {
164219 nixlMetaDesc desc;
165- srcMem_[i]->populateMetaDesc (&desc, entry_i, entry_size );
220+ srcMem_[i]->populateMetaDesc (&desc, entry_i, memConfig_. entrySize_ );
166221 srcDescs_->addDesc (desc);
167- dstMem_[i]->populateMetaDesc (&desc, entry_i, entry_size );
222+ dstMem_[i]->populateMetaDesc (&desc, entry_i, memConfig_. entrySize_ );
168223 dstDescs_->addDesc (desc);
169224 }
170225 }
171226 }
172227
173- void
174- performTransfer (nixl_xfer_op_t op) {
175- nixlBackendReqH *handle;
176- nixl_status_t ret;
177-
178- ASSERT_EQ (srcBackendEngine_->prepXfer (
179- op, *srcDescs_, *dstDescs_, dstAgentName_, handle, &xferOptArgs_),
180- NIXL_SUCCESS);
181-
182- ret = srcBackendEngine_->postXfer (
183- op, *srcDescs_, *dstDescs_, dstAgentName_, handle, &xferOptArgs_);
184- ASSERT_TRUE (ret == NIXL_SUCCESS || ret == NIXL_IN_PROG);
185-
186- NIXL_INFO << " \t\t Waiting for transfer to complete..." ;
187-
188- auto end_time = absl::Now () + absl::Seconds (3 );
189-
190- while (ret == NIXL_IN_PROG && absl::Now () < end_time) {
191- ret = srcBackendEngine_->checkXfer (handle);
192- ASSERT_TRUE (ret == NIXL_SUCCESS || ret == NIXL_IN_PROG);
193-
194- if (dstBackendEngine_->supportsProgTh ()) dstBackendEngine_->progress ();
195- }
196-
197- NIXL_INFO << " \n Transfer complete" ;
198-
199- ASSERT_EQ (srcBackendEngine_->releaseReqH (handle), NIXL_SUCCESS);
200- }
201-
202228 void
203229 verifyTransfer (nixl_xfer_op_t op) {
204230 if (srcBackendEngine_->supportsNotif ()) {
@@ -245,8 +271,9 @@ template<nixl_mem_t srcMemType, nixl_mem_t dstMemType> class transferHandler {
245271
246272 void
247273 verifyConnInfo () {
248- std::string conn_info ;
274+ if (!isRemoteXfer_) return ;
249275
276+ std::string conn_info;
250277 ASSERT_EQ (srcBackendEngine_->getConnInfo (conn_info), NIXL_SUCCESS);
251278 ASSERT_EQ (dstBackendEngine_->getConnInfo (conn_info), NIXL_SUCCESS);
252279 ASSERT_EQ (srcBackendEngine_->loadRemoteConnInfo (dstAgentName_, conn_info), NIXL_SUCCESS);
0 commit comments