diff --git a/docs/BackendGuide.md b/docs/BackendGuide.md index b14f3d61e..b68b286a8 100644 --- a/docs/BackendGuide.md +++ b/docs/BackendGuide.md @@ -51,12 +51,10 @@ The key/value parameters are a map of strings to byte arrays that are passed fro * supportsLocal(): Indicates if the backend supports transfers within a node * supportsRemote(): Indicates if the backend supports transfers across nodes * supportsNotif(): Indicates if the backend supports notifications -* supportsProgressThread(): Indicates if the backend supports progress() method. That method should call the underlying procedure of progressing transfers for this backend. * getSupportedMems(): Indicates memory types supported by the backend -Based on the first 4 methods (supports*), the required methods to be implemented change. For instance, UCX backend implements all as it supports all scenarios, while GDS backend only has supportsLocal, detailed more in Example implementations. Note that a network backend should have supportsRemote and supportsNotif to be set to true, and preferably supportsLocal also to true, so another backend doesn’t need to be involved for local transfers. For a storage backend, it should have supportsLocal and supportsNotif is optional. supportsProgressThread is optional for both cases. Additionally, a backend that supportsRemote must also support supportNotifs. +Based on the first 3 methods (supports*), the required methods to be implemented change. For instance, UCX backend implements all as it supports all scenarios, while GDS backend only has supportsLocal, detailed more in Example implementations. Note that a network backend must have supportsRemote and supportsNotif to be set to true, and preferably supportsLocal also to true, so another backend doesn't need to be involved for local transfers. For a storage backend, it should have supportsLocal and supportsNotif is optional. -Note that supportProgressThread is an indicator whether a backend has implemented the progress() method, but does not imply how the progress thread is implemented. During creation of a backend, the provided init params indicate how the progress thread is intended to be used. For instance, if the enablement of progress thread is set to false, while a backend cannot work without a separate progress thread, the backend creation would fail. This flag is useful for the NIXL agent if we want to provide some agent level guarantees, such as minimum time between calls to progress for backends, or if a central progress method is implemented (for future proofing, not currently implemented). ### Connection Management: * connect(): Initiates connection to a remote agent. @@ -106,12 +104,6 @@ Finally, note that a call to releaseXferReq should not block and be asynchronous Note that getNotif does not know which agent it should look for to receive the notification. So there should be a method to extract the agent name from the notification received, corresponding to a transfer. genNotif generates a notification which is not bound to any transfers, and does not provide any ordering guarantees. If a backend does not set supportsNotifications, these two methods are not needed. -### Progress Thread: - -* progress(): Makes progress on transfers and notifications. - -If a backend requires a progress call, such as UCX, to proceed with the transfers, for both check of transfer status or received notification, they can implement a progress thread, and a frequency of waking up that thread will be passed during backend creation. In addition, each time a user calls to check a transfer status, or check received notifications, this method is called, enabling progress if a progress thread is not implemented. - ## Descriptor List Abstraction A key underlying abstraction for NIXL library is a descriptor list, that is made of a memory space (host/GPU/block/File/Obj-Store) and a list of descriptors. There are 2 types of descriptors used for the SB API. @@ -142,9 +134,9 @@ The plugin manager maintains API versioning of these above APIs. This can allow ## Comparing two plugins as an example -NIXL UCX plugin provides networking across different nodes, while GDS plugin provides storage access. Moreover, UCX plugin sets all of the “supports” flags, while GDS only has the supportsLocal flag set. The reason being UCX requires a progress thread and provides notifications, and can do transfers within an Agent, for instance from GPU to CPU, and across Agents. Therefore, it should implement all of the methods mentioned previously. +NIXL UCX plugin provides networking across different nodes, while GDS plugin provides storage access. UCX plugin sets all of the “supports” flags, while GDS only has the supportsLocal flag set. The reason being UCX is a network plugin that should support inter-agent communication and notifications, and it also supports intra-agent transfers, for instance from GPU to CPU. -However, for NIXL storage backends, there is no need to run a NIXL agent on a remote storage node. Instead, a distributed storage client on the local agent talks to the remote distributed storage, and therefore from NIXL agent point of view for all storage, whether local or remote, it has to talk to this local storage client. In other words, all the transfers are loopback to the agent itself. For the current use case, there is no need for notifications within the same agent, or a progress thread either. +However, for NIXL storage backends, there is no need to run a NIXL agent on a remote storage node. Instead, a distributed storage client on the local agent talks to the remote distributed storage, and therefore from NIXL agent point of view for all storage, whether local or remote, it has to talk to this local storage client. In other words, all the transfers are loopback to the agent itself. For the current use case, there is no need for notifications within the same agent. Moreover, the GDS plugin does not require a local connection to itself, so it returns SUCCESS for connect and disconnect, and for loadLocal simply returns back the input pointer as its output. The only 6 remaining methods that it has to implement are: @@ -213,7 +205,7 @@ Note that inside a transfer, a backend might provide methods for network resilie ### Get transfer status: -The agent will call the backend specific transfer handle that is stored within the agent transfer handle, and check the status of the transfer. This is achieved through a call to **checkXfer** in the SB API. Internal to the backend, they can call the **progress** method in SB API, if that’s necessary to get the latest status of the transfers. If the agent is run in progress thread mode, the agent will call that periodically, and therefore reduce the load on this internal call. +The agent will call the backend specific transfer handle that is stored within the agent transfer handle, and check the status of the transfer. This is achieved through a call to **checkXfer** in the SB API. Internal to the backend, they can call their internal progress method, if that’s necessary to get the latest status of the transfers. ### Invalidate transfer request: @@ -221,7 +213,7 @@ The agent will call the **releaseReqH** from the SB API on the backend specific ### Get notifications: -The agent will iterate over all the backends that support notification, and call their **getNotifs** from the SB API, which will return a list of notifications received from each remote node between the previous call to this method and this time. Then the agent will merge the results from all such backends, and append them to the map that the user has provided. Similar to get transfer status, Internal to the backend, they can call the **progress** method in SB API, if that’s necessary to get the latest notifications received from the transfers initiated by the other agents towards them. If the agent is run in progress thread mode, the agent will call that periodically, and therefore reduce the load on this internal call. +The agent will iterate over all the backends that support notification, and call their **getNotifs** from the SB API, which will return a list of notifications received from each remote node between the previous call to this method and this time. Then the agent will merge the results from all such backends, and append them to the map that the user has provided. Similar to get transfer status, Internal to the backend, they can call their internal progress method, if that’s necessary to get the latest notifications received from the transfers initiated by the other agents towards them. ### Generate notification: @@ -229,4 +221,4 @@ If a backend is provided by the user, the agent will call **genNotif** from the ### Destructor: -When an agent is getting destroyed at the end of the application, it will deregister all the remaining memories that were not deregistered by the application (bad practice, but agent takes care of it). Then for each of the backends it will call their **destructor** from the SB API, and finally do the rest of internal clean up. \ No newline at end of file +When an agent is getting destroyed at the end of the application, it will deregister all the remaining memories that were not deregistered by the application (bad practice, but agent takes care of it). Then for each of the backends it will call their **destructor** from the SB API, and finally do the rest of internal clean up. diff --git a/src/api/cpp/backend/backend_engine.h b/src/api/cpp/backend/backend_engine.h index aa127a8f0..8fc1cd131 100644 --- a/src/api/cpp/backend/backend_engine.h +++ b/src/api/cpp/backend/backend_engine.h @@ -112,9 +112,6 @@ class nixlBackendEngine { // pure virtual, and return errors, as parent shouldn't call if supportsNotif is false. virtual bool supportsNotif() const = 0; - // Determines if a backend supports progress thread. - virtual bool supportsProgTh() const = 0; - virtual nixl_mem_list_t getSupportedMems() const = 0; // TODO: Return by const-reference and mark noexcept? @@ -209,14 +206,6 @@ class nixlBackendEngine { } - // *** Needs to be implemented if supportsProgTh() is true *** // - - // Force backend engine worker to progress. - virtual int - progress() { - return 0; - } - // *** Optional virtual methods that are good to be implemented in any backend *** // // Query information about a list of memory/storage diff --git a/src/core/agent_data.h b/src/core/agent_data.h index a0cc26332..b0e8c2a5c 100644 --- a/src/core/agent_data.h +++ b/src/core/agent_data.h @@ -130,7 +130,6 @@ class nixlBackendH { bool supportsRemote () const { return engine->supportsRemote(); } bool supportsLocal () const { return engine->supportsLocal (); } bool supportsNotif () const { return engine->supportsNotif (); } - bool supportsProgTh () const { return engine->supportsProgTh(); } friend class nixlAgentData; friend class nixlAgent; diff --git a/src/plugins/cuda_gds/gds_backend.h b/src/plugins/cuda_gds/gds_backend.h index 2274c3d76..428d41441 100644 --- a/src/plugins/cuda_gds/gds_backend.h +++ b/src/plugins/cuda_gds/gds_backend.h @@ -121,9 +121,6 @@ class nixlGdsEngine : public nixlBackendEngine { bool supportsLocal() const { return true; } - bool supportsProgTh() const { - return false; - } nixl_mem_list_t getSupportedMems() const { nixl_mem_list_t mems; diff --git a/src/plugins/gds_mt/gds_mt_backend.h b/src/plugins/gds_mt/gds_mt_backend.h index 0b9e20326..773a73b64 100644 --- a/src/plugins/gds_mt/gds_mt_backend.h +++ b/src/plugins/gds_mt/gds_mt_backend.h @@ -52,10 +52,6 @@ class nixlGdsMtEngine : public nixlBackendEngine { supportsLocal() const override { return true; } - bool - supportsProgTh() const override { - return false; - } nixl_mem_list_t getSupportedMems() const override { diff --git a/src/plugins/gpunetio/gpunetio_backend.h b/src/plugins/gpunetio/gpunetio_backend.h index aed736187..8f3a35117 100644 --- a/src/plugins/gpunetio/gpunetio_backend.h +++ b/src/plugins/gpunetio/gpunetio_backend.h @@ -46,9 +46,6 @@ class nixlDocaEngine : public nixlBackendEngine { bool supportsNotif() const { return true; } - bool supportsProgTh() const { - return false; - } nixl_mem_list_t getSupportedMems() const; diff --git a/src/plugins/hf3fs/hf3fs_backend.h b/src/plugins/hf3fs/hf3fs_backend.h index 3f8b0b34c..7f2c70dc5 100644 --- a/src/plugins/hf3fs/hf3fs_backend.h +++ b/src/plugins/hf3fs/hf3fs_backend.h @@ -138,9 +138,6 @@ class nixlHf3fsEngine : public nixlBackendEngine { bool supportsLocal () const { return true; } - bool supportsProgTh () const { - return false; - } nixl_mem_list_t getSupportedMems () const { nixl_mem_list_t mems; diff --git a/src/plugins/mooncake/mooncake_backend.h b/src/plugins/mooncake/mooncake_backend.h index 72a4a91ec..e76c22f55 100644 --- a/src/plugins/mooncake/mooncake_backend.h +++ b/src/plugins/mooncake/mooncake_backend.h @@ -53,11 +53,6 @@ class nixlMooncakeEngine : public nixlBackendEngine { return true; } - bool - supportsProgTh() const { - return false; - } - nixl_mem_list_t getSupportedMems() const; diff --git a/src/plugins/obj/obj_backend.h b/src/plugins/obj/obj_backend.h index 078cb5e08..c77c5bb54 100644 --- a/src/plugins/obj/obj_backend.h +++ b/src/plugins/obj/obj_backend.h @@ -46,11 +46,6 @@ class nixlObjEngine : public nixlBackendEngine { return false; } - bool - supportsProgTh() const override { - return false; - } - nixl_mem_list_t getSupportedMems() const override { return {OBJ_SEG, DRAM_SEG}; diff --git a/src/plugins/posix/posix_backend.h b/src/plugins/posix/posix_backend.h index a2660f3d1..b2777297b 100644 --- a/src/plugins/posix/posix_backend.h +++ b/src/plugins/posix/posix_backend.h @@ -81,10 +81,6 @@ class nixlPosixEngine : public nixlBackendEngine { return false; } - bool supportsProgTh() const override { - return false; - } - nixl_mem_list_t getSupportedMems() const override { return {FILE_SEG, DRAM_SEG}; } diff --git a/src/plugins/ucx/ucx_backend.h b/src/plugins/ucx/ucx_backend.h index 6c33eb318..11108ea86 100644 --- a/src/plugins/ucx/ucx_backend.h +++ b/src/plugins/ucx/ucx_backend.h @@ -124,11 +124,6 @@ class nixlUcxEngine : public nixlBackendEngine { return true; } - bool - supportsProgTh() const override { - return false; - } - nixl_mem_list_t getSupportedMems() const override; @@ -196,7 +191,7 @@ class nixlUcxEngine : public nixlBackendEngine { releaseReqH(nixlBackendReqH *handle) const override; int - progress() override; + progress(); nixl_status_t getNotifs(notif_list_t ¬if_list) override; @@ -307,11 +302,6 @@ class nixlUcxThreadEngine : public nixlUcxEngine { nixlUcxThreadEngine(const nixlBackendInitParams &init_params); ~nixlUcxThreadEngine(); - bool - supportsProgTh() const override { - return true; - } - nixl_status_t getNotifs(notif_list_t ¬if_list) override; @@ -345,11 +335,6 @@ class nixlUcxThreadPoolEngine : public nixlUcxEngine { nixlBackendReqH *&handle, const nixl_opt_b_args_t *opt_args = nullptr) const override; - bool - supportsProgTh() const override { - return true; - } - size_t getSharedWorkersSize() const override { return numSharedWorkers_; diff --git a/src/plugins/ucx_mo/ucx_mo_backend.h b/src/plugins/ucx_mo/ucx_mo_backend.h index fdb6293b6..9fef3a251 100644 --- a/src/plugins/ucx_mo/ucx_mo_backend.h +++ b/src/plugins/ucx_mo/ucx_mo_backend.h @@ -95,7 +95,7 @@ class nixlUcxMoEngine : public nixlBackendEngine { bool pthrOn; // UCX backends data - std::vector> engines; + std::vector> engines; // Map of agent name to saved nixlUcxConnection info using remote_conn_map_t = std::map; using remote_comm_it_t = remote_conn_map_t::iterator; @@ -114,7 +114,6 @@ class nixlUcxMoEngine : public nixlBackendEngine { bool supportsRemote () const { return true; } bool supportsLocal () const { return false; } bool supportsNotif () const { return true; } - bool supportsProgTh () const { return pthrOn; } nixl_mem_list_t getSupportedMems () const; @@ -159,14 +158,16 @@ class nixlUcxMoEngine : public nixlBackendEngine { nixl_status_t checkXfer (nixlBackendReqH* handle) const; nixl_status_t releaseReqH(nixlBackendReqH* handle) const; - int progress(); - nixl_status_t getNotifs(notif_list_t ¬if_list); nixl_status_t genNotif(const std::string &remote_agent, const std::string &msg) const; //public function for UCX worker to mark connections as connected nixl_status_t checkConn(const std::string &remote_agent); nixl_status_t endConn(const std::string &remote_agent); + + // Public function as it is used in tests + int + progress(); }; #endif diff --git a/test/gtest/mocks/gmock_engine.cpp b/test/gtest/mocks/gmock_engine.cpp index 3dbd86dbc..4dd01bff2 100644 --- a/test/gtest/mocks/gmock_engine.cpp +++ b/test/gtest/mocks/gmock_engine.cpp @@ -30,7 +30,6 @@ GMockBackendEngine::GMockBackendEngine() : nixlBackendEngine(&init_params) { ON_CALL(*this, supportsRemote()).WillByDefault(Return(true)); ON_CALL(*this, supportsLocal()).WillByDefault(Return(true)); ON_CALL(*this, supportsNotif()).WillByDefault(Return(true)); - ON_CALL(*this, supportsProgTh()).WillByDefault(Return(false)); ON_CALL(*this, getSupportedMems()).WillByDefault(Return(nixl_mem_list_t{DRAM_SEG})); ON_CALL(*this, registerMem(_, _, _)).WillByDefault(Return(NIXL_SUCCESS)); ON_CALL(*this, deregisterMem(_)).WillByDefault(Return(NIXL_SUCCESS)); @@ -51,7 +50,6 @@ GMockBackendEngine::GMockBackendEngine() : nixlBackendEngine(&init_params) { ON_CALL(*this, loadLocalMD(_, _)).WillByDefault(Return(NIXL_SUCCESS)); ON_CALL(*this, getNotifs(_)).WillByDefault(Return(NIXL_SUCCESS)); ON_CALL(*this, genNotif(_, _)).WillByDefault(Return(NIXL_SUCCESS)); - ON_CALL(*this, progress()).WillByDefault(Return(0)); } void diff --git a/test/gtest/mocks/gmock_engine.h b/test/gtest/mocks/gmock_engine.h index 07d73b3af..8739e87e9 100644 --- a/test/gtest/mocks/gmock_engine.h +++ b/test/gtest/mocks/gmock_engine.h @@ -67,7 +67,6 @@ class GMockBackendEngine : public nixlBackendEngine { MOCK_METHOD(bool, supportsRemote, (), (const, override)); MOCK_METHOD(bool, supportsLocal, (), (const, override)); MOCK_METHOD(bool, supportsNotif, (), (const, override)); - MOCK_METHOD(bool, supportsProgTh, (), (const, override)); MOCK_METHOD(nixl_mem_list_t, getSupportedMems, (), (const, override)); MOCK_METHOD(nixl_status_t, registerMem, @@ -122,7 +121,6 @@ class GMockBackendEngine : public nixlBackendEngine { genNotif, (const std::string &remote_agent, const std::string &msg), (const, override)); - MOCK_METHOD(int, progress, (), (override)); }; } // namespace mocks diff --git a/test/gtest/mocks/mock_backend_engine.cpp b/test/gtest/mocks/mock_backend_engine.cpp index 020f6afff..452783cc9 100644 --- a/test/gtest/mocks/mock_backend_engine.cpp +++ b/test/gtest/mocks/mock_backend_engine.cpp @@ -124,9 +124,4 @@ MockBackendEngine::genNotif(const std::string &remote_agent, const std::string & return gmock_backend_engine->genNotif(remote_agent, msg); } -int -MockBackendEngine::progress() { - sharedState++; - return gmock_backend_engine->progress(); -} } // namespace mocks diff --git a/test/gtest/mocks/mock_backend_engine.h b/test/gtest/mocks/mock_backend_engine.h index e89a129c2..61f64d9f0 100644 --- a/test/gtest/mocks/mock_backend_engine.h +++ b/test/gtest/mocks/mock_backend_engine.h @@ -44,10 +44,6 @@ class MockBackendEngine : public nixlBackendEngine { assert(sharedState > 0); return gmock_backend_engine->supportsNotif(); } - bool supportsProgTh() const override { - assert(sharedState > 0); - return gmock_backend_engine->supportsProgTh(); - } nixl_mem_list_t getSupportedMems() const override { assert(sharedState > 0); return gmock_backend_engine->getSupportedMems(); @@ -90,7 +86,6 @@ class MockBackendEngine : public nixlBackendEngine { nixl_status_t getNotifs(notif_list_t ¬if_list) override; nixl_status_t genNotif(const std::string &remote_agent, const std::string &msg) const override; - int progress() override; private: // This represents an engine shared state that is read in every const method and modified in non-cost ones diff --git a/test/gtest/plugins/transfer_handler.h b/test/gtest/plugins/transfer_handler.h index 9c13b2f9e..d2d8c8074 100644 --- a/test/gtest/plugins/transfer_handler.h +++ b/test/gtest/plugins/transfer_handler.h @@ -190,8 +190,6 @@ template class transferHandler { while (ret == NIXL_IN_PROG && absl::Now() < end_time) { ret = srcBackendEngine_->checkXfer(handle); ASSERT_TRUE(ret == NIXL_SUCCESS || ret == NIXL_IN_PROG); - - if (dstBackendEngine_->supportsProgTh()) dstBackendEngine_->progress(); } NIXL_INFO << "\nTransfer complete"; @@ -220,7 +218,6 @@ template class transferHandler { while (num_notifs == 0 && absl::Now() < end_time) { ASSERT_EQ(dstBackendEngine_->getNotifs(target_notifs), NIXL_SUCCESS); num_notifs = target_notifs.size(); - if (srcBackendEngine_->supportsProgTh()) srcBackendEngine_->progress(); } NIXL_INFO << "\nNotification transfer complete"; diff --git a/test/gtest/unit/obj/obj.cpp b/test/gtest/unit/obj/obj.cpp index 728adb074..3b76f08f2 100644 --- a/test/gtest/unit/obj/obj.cpp +++ b/test/gtest/unit/obj/obj.cpp @@ -310,7 +310,6 @@ TEST_F(objTestFixture, EngineInitialization) { EXPECT_TRUE(objEngine_->supportsLocal()); EXPECT_FALSE(objEngine_->supportsRemote()); EXPECT_FALSE(objEngine_->supportsNotif()); - EXPECT_FALSE(objEngine_->supportsProgTh()); // Verify that the executor was properly set on the mock S3 client by the engine constructor EXPECT_TRUE(mockS3Client_->hasExecutor()); diff --git a/test/unit/plugins/ucx/ucx_backend_multi.cpp b/test/unit/plugins/ucx/ucx_backend_multi.cpp index 14175e1b5..cf3cdab0f 100644 --- a/test/unit/plugins/ucx/ucx_backend_multi.cpp +++ b/test/unit/plugins/ucx/ucx_backend_multi.cpp @@ -32,7 +32,6 @@ void test_thread(int id) { nixlBackendInitParams init_params; nixl_b_params_t custom_params; - nixlBackendEngine* ucx; nixl_status_t ret; std::string my_name("Agent1"); @@ -50,9 +49,9 @@ void test_thread(int id) std::cout << my_name << " Started\n"; - ucx = nixlUcxEngine::create(init_params).release(); + auto ucx = nixlUcxEngine::create(init_params).release(); - if(!USE_PTHREAD) ucx->progress(); + if (!USE_PTHREAD) ucx->progress(); ucx->getConnInfo(conn_info[id]); @@ -71,7 +70,7 @@ void test_thread(int id) done[id] = true; while(!done[!id]) - if(!USE_PTHREAD && id) ucx->progress(); + if (!USE_PTHREAD && id) ucx->progress(); std::cout << "Thread passed with id " << id << "\n"; @@ -83,7 +82,7 @@ void test_thread(int id) //wait for other while(!disconnect[!id]); - if(!USE_PTHREAD) ucx->progress(); + if (!USE_PTHREAD) ucx->progress(); std::cout << "Thread disconnected with id " << id << "\n"; diff --git a/test/unit/plugins/ucx/ucx_backend_test.cpp b/test/unit/plugins/ucx/ucx_backend_test.cpp index 654e307f8..6f893b800 100644 --- a/test/unit/plugins/ucx/ucx_backend_test.cpp +++ b/test/unit/plugins/ucx/ucx_backend_test.cpp @@ -106,10 +106,8 @@ class testHndlIterator { } }; - -nixlBackendEngine *createEngine(std::string name, bool p_thread) -{ - nixlBackendEngine *ucx; +nixlUcxEngine * +createEngine(std::string name, bool p_thread) { nixlBackendInitParams init; nixl_b_params_t custom_params; @@ -119,7 +117,7 @@ nixlBackendEngine *createEngine(std::string name, bool p_thread) init.customParams = &custom_params; init.type = "UCX"; - ucx = nixlUcxEngine::create(init).release(); + auto ucx = nixlUcxEngine::create(init).release(); assert(!ucx->getInitErr()); if (ucx->getInitErr()) { std::cout << "Failed to initialize worker1" << std::endl; @@ -129,8 +127,8 @@ nixlBackendEngine *createEngine(std::string name, bool p_thread) return ucx; } -void releaseEngine(nixlBackendEngine *ucx) -{ +void +releaseEngine(nixlUcxEngine *ucx) { delete ucx; } @@ -283,8 +281,8 @@ void *releaseValidationPtr(nixl_mem_t mem_type, void *addr) return NULL; } -void allocateWrongGPUTest(nixlBackendEngine* ucx, int dev_id) -{ +void +allocateWrongGPUTest(nixlUcxEngine *ucx, int dev_id) { nixlBlobDesc desc; nixlBackendMD* md; void* buf; @@ -301,9 +299,13 @@ void allocateWrongGPUTest(nixlBackendEngine* ucx, int dev_id) releaseBuffer(VRAM_SEG, dev_id, buf); } -void allocateAndRegister(nixlBackendEngine *ucx, int dev_id, nixl_mem_t mem_type, - void* &addr, size_t len, nixlBackendMD* &md) -{ +void +allocateAndRegister(nixlUcxEngine *ucx, + int dev_id, + nixl_mem_t mem_type, + void *&addr, + size_t len, + nixlBackendMD *&md) { nixlBlobDesc desc; allocateBuffer(mem_type, dev_id, len, addr); @@ -317,17 +319,25 @@ void allocateAndRegister(nixlBackendEngine *ucx, int dev_id, nixl_mem_t mem_type assert(ret == NIXL_SUCCESS); } -void deallocateAndDeregister(nixlBackendEngine *ucx, int dev_id, nixl_mem_t mem_type, - void* &addr, nixlBackendMD* &md) -{ +void +deallocateAndDeregister(nixlUcxEngine *ucx, + int dev_id, + nixl_mem_t mem_type, + void *&addr, + nixlBackendMD *&md) { ucx->deregisterMem(md); releaseBuffer(mem_type, dev_id, addr); } -void loadRemote(nixlBackendEngine *ucx, int dev_id, std::string agent, - nixl_mem_t mem_type, void *addr, size_t len, - nixlBackendMD* &lmd, nixlBackendMD* &rmd) -{ +void +loadRemote(nixlUcxEngine *ucx, + int dev_id, + std::string agent, + nixl_mem_t mem_type, + void *addr, + size_t len, + nixlBackendMD *&lmd, + nixlBackendMD *&rmd) { nixlBlobDesc info; info.addr = (uintptr_t) addr; info.len = len; @@ -367,15 +377,18 @@ static string op2string(nixl_xfer_op_t op, bool hasNotif) return string("ERR-OP"); } - -void performTransfer(nixlBackendEngine *ucx1, nixlBackendEngine *ucx2, - nixl_meta_dlist_t &req_src_descs, - nixl_meta_dlist_t &req_dst_descs, - void* addr1, void* addr2, size_t len, - nixl_xfer_op_t op, - testHndlIterator &hiter, - bool progress, bool use_notif) -{ +void +performTransfer(nixlUcxEngine *ucx1, + nixlUcxEngine *ucx2, + nixl_meta_dlist_t &req_src_descs, + nixl_meta_dlist_t &req_dst_descs, + void *addr1, + void *addr2, + size_t len, + nixl_xfer_op_t op, + testHndlIterator &hiter, + bool progress, + bool use_notif) { int ret2; nixl_status_t ret3; void *chkptr1, *chkptr2; @@ -465,8 +478,8 @@ void performTransfer(nixlBackendEngine *ucx1, nixlBackendEngine *ucx2, cout << "OK" << endl; } -void test_intra_agent_transfer(bool p_thread, nixlBackendEngine *ucx, nixl_mem_t mem_type) -{ +void +test_intra_agent_transfer(bool p_thread, nixlUcxEngine *ucx, nixl_mem_t mem_type) { std::cout << std::endl << std::endl; std::cout << "****************************************************" << std::endl; @@ -541,10 +554,15 @@ void test_intra_agent_transfer(bool p_thread, nixlBackendEngine *ucx, nixl_mem_t ucx->disconnect(agent1); } -void test_inter_agent_transfer(bool p_thread, bool reuse_hndl, - nixlBackendEngine *ucx1, nixl_mem_t src_mem_type, int src_dev_id, - nixlBackendEngine *ucx2, nixl_mem_t dst_mem_type, int dst_dev_id) -{ +void +test_inter_agent_transfer(bool p_thread, + bool reuse_hndl, + nixlUcxEngine *ucx1, + nixl_mem_t src_mem_type, + int src_dev_id, + nixlUcxEngine *ucx2, + nixl_mem_t dst_mem_type, + int dst_dev_id) { int ret; int iter = 10; @@ -674,7 +692,7 @@ void test_inter_agent_transfer(bool p_thread, bool reuse_hndl, int main() { bool thread_on[2] = {false, true}; - nixlBackendEngine *ucx[2][2] = { 0 }; + nixlUcxEngine *ucx[2][2] = {0}; // Allocate UCX engines for(int i = 0; i < 2; i++) { diff --git a/test/unit/plugins/ucx_mo/ucx_mo_backend_test.cpp b/test/unit/plugins/ucx_mo/ucx_mo_backend_test.cpp index 6ccb800c2..fd23ba7ec 100644 --- a/test/unit/plugins/ucx_mo/ucx_mo_backend_test.cpp +++ b/test/unit/plugins/ucx_mo/ucx_mo_backend_test.cpp @@ -368,7 +368,7 @@ void performTransfer(nixlBackendEngine *ucx1, nixlBackendEngine *ucx2, while(status == NIXL_IN_PROG) { status = ucx1->checkXfer(handle); if(progress){ - ucx2->progress(); + ((nixlUcxMoEngine *)ucx2)->progress(); } assert( (NIXL_SUCCESS == status) || (NIXL_IN_PROG == status) ); } @@ -385,7 +385,7 @@ void performTransfer(nixlBackendEngine *ucx1, nixlBackendEngine *ucx2, status = ucx2->getNotifs(target_notifs); assert(NIXL_SUCCESS == status); if(progress){ - ucx1->progress(); + ((nixlUcxMoEngine *)ucx1)->progress(); } } @@ -526,7 +526,7 @@ void test_agent_transfer(bool p_thread, assert(NIXL_SUCCESS == status); if (!p_thread) { /* progress UCX1 as well */ - ucx1->progress(); + ((nixlUcxMoEngine *)ucx1)->progress(); } }