diff --git a/.gitlab/test_cpp.sh b/.gitlab/test_cpp.sh index 29e52892a..7bfbac644 100755 --- a/.gitlab/test_cpp.sh +++ b/.gitlab/test_cpp.sh @@ -78,9 +78,10 @@ cd ${INSTALL_DIR} ./bin/nixl_etcd_example ./bin/ucx_backend_test ./bin/ucx_mo_backend_test -NIXL_TELEMETRY_ENABLE=y ./bin/agent_example & +mkdir -p /tmp/telemetry_test +NIXL_TELEMETRY_ENABLE=y NIXL_TELEMETRY_DIR=/tmp/telemetry_test ./bin/agent_example & sleep 1 -./bin/telemetry_reader /tmp/Agent001 & +./bin/telemetry_reader /tmp/telemetry_test/Agent001 & telePID=$! sleep 6 kill -s SIGINT $telePID diff --git a/.gitlab/test_python.sh b/.gitlab/test_python.sh index b0eb844f8..e01d0c18d 100755 --- a/.gitlab/test_python.sh +++ b/.gitlab/test_python.sh @@ -79,14 +79,15 @@ python3 partial_md_example.py --etcd python3 query_mem_example.py # Running telemetry for the last test -export NIXL_TELEMETRY_ENABLE=y blocking_send_recv_port=$(get_next_tcp_port) +mkdir -p /tmp/telemetry_test python3 blocking_send_recv_example.py --mode="target" --ip=127.0.0.1 --port="$blocking_send_recv_port"& sleep 5 +NIXL_TELEMETRY_ENABLE=y NIXL_TELEMETRY_DIR=/tmp/telemetry_test \ python3 blocking_send_recv_example.py --mode="initiator" --ip=127.0.0.1 --port="$blocking_send_recv_port" -python3 telemetry_reader.py --telemetry_path /tmp/initiator & +python3 telemetry_reader.py --telemetry_path /tmp/telemetry_test/initiator & telePID=$! sleep 6 kill -s INT $telePID diff --git a/docs/telemetry.md b/docs/telemetry.md index 5a36da08f..23f04e630 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -44,10 +44,12 @@ Telemetry is controlled by environment variables: | Variable | Description | Default | |----------|-------------|---------| | `NIXL_TELEMETRY_ENABLE` | Enable telemetry collection | Disabled | -| `NIXL_TELEMETRY_DIR` | Directory for telemetry files | `/tmp` | +| `NIXL_TELEMETRY_DIR` | Directory for telemetry files | - | | `NIXL_TELEMETRY_BUFFER_SIZE` | Number of events in buffer | `4096` | | `NIXL_TELEMETRY_RUN_INTERVAL` | Flush interval (ms) | `100` | +- NIXL_TELEMETRY_ENABLE can be set to y/yes/on/1 to be enabled, and n/no/off/0 (or not set) to be disabled, +- If NIXL_TELEMETRY_ENABLE is set to enabled but NIXL_TELEMETRY_DIR is not set, no telemetry file is generated and NIXL_TELEMETRY_RUN_INTERVAL is not used. ## Telemetry File Format @@ -95,4 +97,4 @@ Category: MEMORY Event: agent_memory_registered Value: 4096 =========================== -``` \ No newline at end of file +``` diff --git a/src/api/cpp/nixl.h b/src/api/cpp/nixl.h index e744af13d..1ed61be4e 100644 --- a/src/api/cpp/nixl.h +++ b/src/api/cpp/nixl.h @@ -289,6 +289,17 @@ class nixlAgent { nixl_status_t getXferStatus (nixlXferReqH* req_hndl) const; + + /** + * @brief Get the telemetry data associated with `req_hndl`. + * + * @param req_hndl Transfer request handle obtained from makeXferReq/createXferReq + * @param telemetry [out] Output telemetry information + * @return nixl_status_t Error code if call was not successful + */ + nixl_status_t + getXferTelemetry(const nixlXferReqH *req_hndl, nixl_xfer_telem_t &telemetry) const; + /** * @brief Query the backend associated with `req_hndl`. E.g., if for genNotif * the same backend as a transfer is desired. diff --git a/src/api/cpp/nixl_types.h b/src/api/cpp/nixl_types.h index a29f59667..e0d1997f8 100644 --- a/src/api/cpp/nixl_types.h +++ b/src/api/cpp/nixl_types.h @@ -20,6 +20,7 @@ #include #include #include +#include /*** Forward declarations ***/ @@ -62,7 +63,8 @@ enum nixl_status_t { NIXL_ERR_UNKNOWN = -8, NIXL_ERR_NOT_SUPPORTED = -9, NIXL_ERR_REMOTE_DISCONNECT = -10, - NIXL_ERR_CANCELED = -11 + NIXL_ERR_CANCELED = -11, + NIXL_ERR_NO_TELEMETRY = -12 }; /** @@ -228,6 +230,55 @@ using nixl_opt_args_t = nixlAgentOptionalArgs; */ using nixlGpuXferReqH = void; +/** + * @brief A typedefs for a point in time + */ +using chrono_point_t = std::chrono::steady_clock::time_point; + +/** + * @brief A typedefs for a period of time in microseconds + */ +using chrono_period_us_t = std::chrono::microseconds; + +/** + * @struct nixlXferTelemetry + * @brief A structure for telemetry output from agent API + */ +struct nixlXferTelemetry { + /** + * @var startTime Time that the transfer was posted + */ + chrono_point_t startTime; + + /** + * @var postDuration Time it took to do the post operation + */ + chrono_period_us_t postDuration; + + /** + * @var xferDuration Time it took to complete the transfer + * if checkXferReq is called late, that might impact this result + */ + chrono_period_us_t xferDuration; + + /** + * @var totalBytes Amount of bytes transferred in the request + */ + size_t totalBytes; + + /** + * @var descCount Number of descriptors in the transfer request. + * If any merging of descriptors were performed, it will be reflected here. + */ + size_t descCount; +}; + +/** + * @brief A typedef for a nixlXferTelemetry + * for telemetry output. + */ +using nixl_xfer_telem_t = nixlXferTelemetry; + /** * @brief A define for an empty string, that indicates the descriptor list is being * prepared for the local agent as an initiator in prepXferDlist method. diff --git a/src/api/python/_api.py b/src/api/python/_api.py index 8090bd43e..8662c6bf2 100644 --- a/src/api/python/_api.py +++ b/src/api/python/_api.py @@ -616,6 +616,22 @@ def check_xfer_state(self, handle: nixl_xfer_handle) -> str: else: return "ERR" + """ + @brief Get telemetry information of a transfer request. + The output object has three time values fields in microseconds + (startTime, postDuration, xferDuration), as well as integer totalBytes transferred + for the request, and integer descCount representing number of descriptors involved + (for example if there was some merging of descriptors). + + @param handle Handle to the transfer operation, from make_prepped_xfer or initialize_xfer. + @return nixlXferTelemetry object + """ + + def get_xfer_telemetry( + self, handle: nixl_xfer_handle + ) -> nixlBind.nixlXferTelemetry: + return self.agent.getXferTelemetry(handle._handle) + """ @brief Query the backend that was chosen for a transfer operation. diff --git a/src/bindings/python/nixl_bindings.cpp b/src/bindings/python/nixl_bindings.cpp index 6e3ca37cf..4a9b31772 100644 --- a/src/bindings/python/nixl_bindings.cpp +++ b/src/bindings/python/nixl_bindings.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -64,14 +65,29 @@ class nixlRepostActiveError : public std::runtime_error { nixlRepostActiveError(const char *what) : runtime_error(what) {} }; +class nixlUnknownError : public std::runtime_error { +public: + nixlUnknownError(const char *what) : runtime_error(what) {} +}; + class nixlNotSupportedError : public std::runtime_error { public: nixlNotSupportedError(const char *what) : runtime_error(what) {} }; -class nixlUnknownError : public std::runtime_error { +class nixlRemoteDisconnectError : public std::runtime_error { public: - nixlUnknownError(const char *what) : runtime_error(what) {} + nixlRemoteDisconnectError(const char *what) : runtime_error(what) {} +}; + +class nixlCancelledError : public std::runtime_error { +public: + nixlCancelledError(const char *what) : runtime_error(what) {} +}; + +class nixlNoTelemetryError : public std::runtime_error { +public: + nixlNoTelemetryError(const char *what) : runtime_error(what) {} }; void @@ -108,6 +124,15 @@ throw_nixl_exception(const nixl_status_t &status) { case NIXL_ERR_NOT_SUPPORTED: throw nixlNotSupportedError(nixlEnumStrings::statusStr(status).c_str()); break; + case NIXL_ERR_REMOTE_DISCONNECT: + throw nixlRemoteDisconnectError(nixlEnumStrings::statusStr(status).c_str()); + break; + case NIXL_ERR_CANCELED: + throw nixlCancelledError(nixlEnumStrings::statusStr(status).c_str()); + break; + case NIXL_ERR_NO_TELEMETRY: + throw nixlNoTelemetryError(nixlEnumStrings::statusStr(status).c_str()); + break; default: throw std::runtime_error("BAD_STATUS"); } @@ -161,6 +186,22 @@ PYBIND11_MODULE(_bindings, m) { .value("NIXL_ERR_NOT_SUPPORTED", NIXL_ERR_NOT_SUPPORTED) .export_values(); + py::class_(m, "nixlXferTelemetry") + .def(py::init<>()) + .def_property_readonly("startTime", + [](const nixl_xfer_telem_t &t) { + return std::chrono::duration_cast( + t.startTime.time_since_epoch()) + .count(); + }) + .def_property_readonly("postDuration", + [](const nixl_xfer_telem_t &t) { return t.postDuration.count(); }) + .def_property_readonly("xferDuration", + [](const nixl_xfer_telem_t &t) { return t.xferDuration.count(); }) + .def_readonly("totalBytes", &nixl_xfer_telem_t::totalBytes) + .def_readonly("descCount", &nixl_xfer_telem_t::descCount); + + py::register_exception(m, "nixlNotPostedError"); py::register_exception(m, "nixlInvalidParamError"); py::register_exception(m, "nixlBackendError"); @@ -170,6 +211,9 @@ PYBIND11_MODULE(_bindings, m) { py::register_exception(m, "nixlRepostActiveError"); py::register_exception(m, "nixlUnknownError"); py::register_exception(m, "nixlNotSupportedError"); + py::register_exception(m, "nixlRemoteDisconnectError"); + py::register_exception(m, "nixlCancelledError"); + py::register_exception(m, "nixlNoTelemetryError"); py::class_(m, "nixlXferDList") .def(py::init(), @@ -652,6 +696,15 @@ PYBIND11_MODULE(_bindings, m) { throw_nixl_exception(ret); return ret; }) + .def( + "getXferTelemetry", + [](nixlAgent &agent, uintptr_t reqh) -> nixl_xfer_telem_t { + nixl_xfer_telem_t telemetry; + nixl_status_t ret = agent.getXferTelemetry((nixlXferReqH *)reqh, telemetry); + throw_nixl_exception(ret); + return telemetry; + }, + py::arg("reqh")) .def("queryXferBackend", [](nixlAgent &agent, uintptr_t reqh) -> uintptr_t { nixlBackendH *backend = nullptr; diff --git a/src/core/agent_data.h b/src/core/agent_data.h index abcdf50e4..60ec0dee4 100644 --- a/src/core/agent_data.h +++ b/src/core/agent_data.h @@ -19,6 +19,7 @@ #include "common/str_tools.h" #include "mem_section.h" +#include "telemetry.h" #include "stream/metadata_stream.h" #include "sync.h" @@ -33,8 +34,6 @@ class SyncClient; #define NIXL_ETCD_NAMESPACE_DEFAULT "/nixl/agents/" #endif // HAVE_ETCD -class nixlTelemetry; - using backend_list_t = std::vector; //Internal typedef to define metadata communication request types @@ -65,6 +64,7 @@ class nixlAgentData { std::string name; nixlAgentConfig config; nixlLock lock; + bool telemetryEnabled = false; // some handle that can be used to instantiate an object from the lib std::map backendLibs; @@ -115,6 +115,11 @@ class nixlAgentData { nixlAgentData(const std::string &name, const nixlAgentConfig &cfg); ~nixlAgentData(); + inline void + addErrorTelemetry(nixl_status_t err_status) { + if (telemetry_) telemetry_->updateErrorCount(err_status); + } + friend class nixlAgent; }; diff --git a/src/core/nixl_agent.cpp b/src/core/nixl_agent.cpp index 93c53a96d..3dd4d05f2 100644 --- a/src/core/nixl_agent.cpp +++ b/src/core/nixl_agent.cpp @@ -31,14 +31,6 @@ #include "telemetry.h" #include "telemetry_event.h" -// Macro to safely call telemetry methods only if telemetry_ is not null -#define UPDATE_TELEMETRY_DATA(telemetry_ptr, method_call) \ - do { \ - if (telemetry_ptr) { \ - telemetry_ptr->method_call; \ - } \ - } while (0) - /* ERROR log prefixed with current function name and a colon */ #define NIXL_ERROR_FUNC NIXL_ERROR << __FUNCTION__ << ": " @@ -54,7 +46,8 @@ return NIXL_ERR_MISMATCH; \ } while (0) -const char TELEMETRY_ENABLED_VAR[] = "NIXL_TELEMETRY_ENABLE"; +constexpr char TELEMETRY_ENABLED_VAR[] = "NIXL_TELEMETRY_ENABLE"; +constexpr char TELEMETRY_DIR_VAR[] = "NIXL_TELEMETRY_DIR"; static const std::vector> illegal_plugin_combinations = { {"GDS", "GDS_MT"}, }; @@ -93,6 +86,8 @@ nixlEnumStrings::statusStr(const nixl_status_t &status) { case NIXL_ERR_REMOTE_DISCONNECT: return "NIXL_ERR_REMOTE_DISCONNECT"; case NIXL_ERR_CANCELED: return "NIXL_ERR_CANCELED"; + case NIXL_ERR_NO_TELEMETRY: + return "NIXL_ERR_NO_TELEMETRY"; default: return "BAD_STATUS"; } } @@ -100,28 +95,27 @@ nixlEnumStrings::statusStr(const nixl_status_t &status) { inline void nixlXferReqH::updateRequestStats(std::unique_ptr &telemetry_pub, nixl_telemetry_stat_status_t stat_status) { - assert(telemetry_pub != nullptr); static const std::array nixl_post_status_str = { " Posted", " Posted and Completed", " Completed"}; - auto duration = std::chrono::duration_cast( + auto duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - telemetry.startTime); - if (stat_status == NIXL_TELEMETRY_POST) { - telemetry.postDuration_ = duration; + telemetry.postDuration = duration; } else if (stat_status == NIXL_TELEMETRY_POST_AND_FINISH) { - telemetry.postDuration_ = duration; - telemetry.xferDuration_ = duration; - telemetry_pub->addPostTime(duration); - telemetry_pub->addXferTime(duration, backendOp == NIXL_WRITE, telemetry.totalBytes); + telemetry.postDuration = duration; + telemetry.xferDuration = duration; } else { // stat_status == NIXL_TELEMETRY_FINISH - telemetry.xferDuration_ = duration; - telemetry_pub->addPostTime(telemetry.postDuration_); + telemetry.xferDuration = duration; + } + + if (telemetry_pub && (stat_status != NIXL_TELEMETRY_POST)) { + telemetry_pub->addPostTime(telemetry.postDuration); telemetry_pub->addXferTime(duration, backendOp == NIXL_WRITE, telemetry.totalBytes); } NIXL_TRACE << "[NIXL TELEMETRY]: From backend " << engine->getType() - << nixl_post_status_str[stat_status] << " Xfer with " << initiatorDescs->descCount() + << nixl_post_status_str[stat_status] << " Xfer with " << telemetry.descCount << " descriptors of total size " << telemetry.totalBytes << "B in " << duration.count() << "us."; } @@ -147,12 +141,19 @@ nixlAgentData::nixlAgentData(const std::string &name, const nixlAgentConfig &cfg memorySection = new nixlLocalSection(); const char *telemetry_env_val = std::getenv(TELEMETRY_ENABLED_VAR); + const char *telemetry_env_dir = std::getenv(TELEMETRY_DIR_VAR); if (telemetry_env_val != nullptr) { if (!strcasecmp(telemetry_env_val, "y") || !strcasecmp(telemetry_env_val, "1") || !strcasecmp(telemetry_env_val, "yes") || !strcasecmp(telemetry_env_val, "on")) { - telemetry_ = std::make_unique(name, backendEngines); - NIXL_DEBUG << "NIXL telemetry is enabled"; + telemetryEnabled = true; + if (telemetry_env_dir != nullptr) { + std::string telemetry_file = std::string(telemetry_env_dir) + "/" + name; + telemetry_ = std::make_unique(telemetry_file, backendEngines); + NIXL_DEBUG << "NIXL telemetry is enabled with output file: " << telemetry_file; + } else { + NIXL_DEBUG << "NIXL telemetry is enabled without an output file"; + } } else if (!strcasecmp(telemetry_env_val, "n") || !strcasecmp(telemetry_env_val, "0") || !strcasecmp(telemetry_env_val, "no") || !strcasecmp(telemetry_env_val, "off")) { NIXL_DEBUG << "NIXL telemetry is disabled"; @@ -595,7 +596,7 @@ nixlAgent::prepXferDlist (const std::string &agent_name, // just we can add a call to fetchRemoteMD for next time if (!init_side && (data->remoteSections.count(agent_name) == 0)) { NIXL_ERROR_FUNC << "metadata for remote agent '" << agent_name << "' not found"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_NOT_FOUND)); + data->addErrorTelemetry(NIXL_ERR_NOT_FOUND); return NIXL_ERR_NOT_FOUND; } @@ -609,7 +610,7 @@ nixlAgent::prepXferDlist (const std::string &agent_name, if (!backend_set || backend_set->empty()) { NIXL_ERROR_FUNC << "no available backends for mem type '" << descs.getType() << "'"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_NOT_FOUND)); + data->addErrorTelemetry(NIXL_ERR_NOT_FOUND); return NIXL_ERR_NOT_FOUND; } } else { @@ -656,7 +657,7 @@ nixlAgent::prepXferDlist (const std::string &agent_name, NIXL_ERROR_FUNC << "failed to prepare the descriptors for any of " "the specified or potential backends for agent '" << agent_name << "'"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_NOT_FOUND)); + data->addErrorTelemetry(NIXL_ERR_NOT_FOUND); return NIXL_ERR_NOT_FOUND; } else { dlist_hndl = handle; @@ -682,13 +683,13 @@ nixlAgent::makeXferReq (const nixl_xfer_op_t &operation, if (!local_side || !remote_side) { NIXL_ERROR_FUNC << "local or remote side handle is null"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_INVALID_PARAM)); + data->addErrorTelemetry(NIXL_ERR_INVALID_PARAM); return NIXL_ERR_INVALID_PARAM; } if ((!local_side->isLocal) || (remote_side->isLocal)) { NIXL_ERROR_FUNC << "invalid sides (local must be local, remote must be remote)"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_INVALID_PARAM)); + data->addErrorTelemetry(NIXL_ERR_INVALID_PARAM); return NIXL_ERR_INVALID_PARAM; } @@ -697,7 +698,7 @@ nixlAgent::makeXferReq (const nixl_xfer_op_t &operation, if (data->remoteSections.count(remote_side->remoteAgent) == 0) { NIXL_ERROR_FUNC << "remote agent '" << remote_side->remoteAgent << "' was invalidated in between prepXferDlist and this call"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_NOT_FOUND)); + data->addErrorTelemetry(NIXL_ERR_NOT_FOUND); delete req_hndl; return NIXL_ERR_NOT_FOUND; } @@ -832,7 +833,11 @@ nixlAgent::makeXferReq (const nixl_xfer_op_t &operation, handle->hasNotif = opt_args.hasNotif; handle->backendOp = operation; handle->status = NIXL_ERR_NOT_POSTED; - handle->telemetry.totalBytes = total_bytes; + + if (data->telemetryEnabled) { + handle->telemetry.totalBytes = total_bytes; + handle->telemetry.descCount = handle->initiatorDescs->descCount(); + } ret = handle->engine->prepXfer (handle->backendOp, *handle->initiatorDescs, @@ -843,7 +848,7 @@ nixlAgent::makeXferReq (const nixl_xfer_op_t &operation, if (ret != NIXL_SUCCESS) { NIXL_ERROR_FUNC << "backend '" << backend->getType() << "' failed to prepare the transfer request with status " << ret; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(ret)); + data->addErrorTelemetry(ret); return ret; } @@ -869,7 +874,7 @@ nixlAgent::createXferReq(const nixl_xfer_op_t &operation, if (data->remoteSections.count(remote_agent) == 0) { NIXL_ERROR_FUNC << "metadata for remote agent '" << remote_agent << "' not found"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_NOT_FOUND)); + data->addErrorTelemetry(NIXL_ERR_NOT_FOUND); return NIXL_ERR_NOT_FOUND; } @@ -947,7 +952,7 @@ nixlAgent::createXferReq(const nixl_xfer_op_t &operation, if (!handle->engine) { NIXL_ERROR_FUNC << "no specified or potential backend had the required " "registrations to be able to do the transfer"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_NOT_FOUND)); + data->addErrorTelemetry(NIXL_ERR_NOT_FOUND); return NIXL_ERR_NOT_FOUND; } @@ -964,7 +969,7 @@ nixlAgent::createXferReq(const nixl_xfer_op_t &operation, if (opt_args.hasNotif && (!handle->engine->supportsNotif())) { NIXL_ERROR_FUNC << "the selected backend '" << handle->engine->getType() << "' does not support notifications"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_BACKEND)); + data->addErrorTelemetry(NIXL_ERR_BACKEND); return NIXL_ERR_BACKEND; } @@ -973,7 +978,11 @@ nixlAgent::createXferReq(const nixl_xfer_op_t &operation, handle->status = NIXL_ERR_NOT_POSTED; handle->notifMsg = opt_args.notifMsg; handle->hasNotif = opt_args.hasNotif; - handle->telemetry.totalBytes = total_bytes; + + if (data->telemetryEnabled) { + handle->telemetry.totalBytes = total_bytes; + handle->telemetry.descCount = handle->initiatorDescs->descCount(); + } ret1 = handle->engine->prepXfer (handle->backendOp, *handle->initiatorDescs, @@ -984,7 +993,7 @@ nixlAgent::createXferReq(const nixl_xfer_op_t &operation, if (ret1 != NIXL_SUCCESS) { NIXL_ERROR_FUNC << "backend '" << handle->engine->getType() << "' failed to prepare the transfer request with status " << ret1; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(ret1)); + data->addErrorTelemetry(ret1); return ret1; } @@ -1008,13 +1017,13 @@ nixlAgent::estimateXferCost(const nixlXferReqH *req_hndl, (data->remoteSections.count(req_hndl->remoteAgent) == 0)) { NIXL_ERROR_FUNC << "invalid request handle, remote agent was invalidated " "after transfer request creation"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_NOT_FOUND)); + data->addErrorTelemetry(NIXL_ERR_NOT_FOUND); return NIXL_ERR_NOT_FOUND; } if (!req_hndl->engine) { NIXL_ERROR_FUNC << "invalid request handle: engine is null"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_UNKNOWN)); + data->addErrorTelemetry(NIXL_ERR_UNKNOWN); return NIXL_ERR_UNKNOWN; } @@ -1043,11 +1052,13 @@ nixlAgent::postXferReq(nixlXferReqH *req_hndl, if (!req_hndl) { NIXL_ERROR_FUNC << "transfer request handle is null"; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_INVALID_PARAM)); + data->addErrorTelemetry(NIXL_ERR_INVALID_PARAM); return NIXL_ERR_INVALID_PARAM; } - if (data->telemetry_) req_hndl->telemetry.startTime = std::chrono::steady_clock::now(); + if (data->telemetryEnabled) { + req_hndl->telemetry.startTime = std::chrono::steady_clock::now(); + } NIXL_SHARED_LOCK_GUARD(data->lock); // Check if the remote was invalidated before post/repost @@ -1055,7 +1066,7 @@ nixlAgent::postXferReq(nixlXferReqH *req_hndl, NIXL_ERROR_FUNC << "remote agent '" << req_hndl->remoteAgent << "' was invalidated after transfer request creation"; delete req_hndl; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_NOT_FOUND)); + data->addErrorTelemetry(NIXL_ERR_NOT_FOUND); return NIXL_ERR_NOT_FOUND; } @@ -1101,7 +1112,7 @@ nixlAgent::postXferReq(nixlXferReqH *req_hndl, NIXL_ERROR_FUNC << "the selected backend '" << req_hndl->engine->getType() << "' does not support notifications"; delete req_hndl; - UPDATE_TELEMETRY_DATA(data->telemetry_, updateErrorCount(NIXL_ERR_BACKEND)); + data->addErrorTelemetry(NIXL_ERR_BACKEND); return NIXL_ERR_BACKEND; } @@ -1127,9 +1138,9 @@ nixlAgent::postXferReq(nixlXferReqH *req_hndl, } } - if (data->telemetry_) { + if (data->telemetryEnabled) { if (req_hndl->status < 0) { - data->telemetry_->updateErrorCount(req_hndl->status); + data->addErrorTelemetry(req_hndl->status); } else if (req_hndl->status == NIXL_IN_PROG) { req_hndl->updateRequestStats(data->telemetry_, NIXL_TELEMETRY_POST); } else { @@ -1166,11 +1177,11 @@ nixlAgent::getXferStatus (nixlXferReqH *req_hndl) const { << "' returned error status " << req_hndl->status; } } - if (data->telemetry_) { + if (data->telemetryEnabled) { if (req_hndl->status == NIXL_SUCCESS) { req_hndl->updateRequestStats(data->telemetry_, NIXL_TELEMETRY_FINISH); } else if (req_hndl->status < 0) { - data->telemetry_->updateErrorCount(req_hndl->status); + data->addErrorTelemetry(req_hndl->status); } } } @@ -1179,6 +1190,22 @@ nixlAgent::getXferStatus (nixlXferReqH *req_hndl) const { return req_hndl->status; } +nixl_status_t +nixlAgent::getXferTelemetry(const nixlXferReqH *req_hndl, nixl_xfer_telem_t &telemetry) const { + + if (!data->telemetryEnabled) { + NIXL_ERROR_FUNC << "cannot return values when telemetry is not enabled."; + return NIXL_ERR_NO_TELEMETRY; + } + + if (req_hndl->status != NIXL_SUCCESS) { + NIXL_ERROR_FUNC << "Transfer is not complete yet"; + return req_hndl->status; + } + + telemetry = req_hndl->telemetry; + return NIXL_SUCCESS; +} nixl_status_t nixlAgent::queryXferBackend(const nixlXferReqH* req_hndl, @@ -1220,12 +1247,12 @@ nixlAgent::releaseXferReq(nixlXferReqH *req_hndl) const { nixl_status_t nixlAgent::createGpuXferReq(const nixlXferReqH &req_hndl, nixlGpuXferReqH *&gpu_req_hndl) const { if (!req_hndl.engine) { - NIXL_ERROR << "Invalid request handle[" << &req_hndl << "]: engine is null"; + NIXL_ERROR_FUNC << "Invalid request handle[" << &req_hndl << "]: engine is null"; return NIXL_ERR_INVALID_PARAM; } if (!req_hndl.backendHandle) { - NIXL_ERROR << "Invalid request handle[" << &req_hndl << "]: backendHandle is null"; + NIXL_ERROR_FUNC << "Invalid request handle[" << &req_hndl << "]: backendHandle is null"; return NIXL_ERR_INVALID_PARAM; } diff --git a/src/core/telemetry.cpp b/src/core/telemetry.cpp index d4a69429b..480f7879d 100644 --- a/src/core/telemetry.cpp +++ b/src/core/telemetry.cpp @@ -34,13 +34,13 @@ namespace fs = std::filesystem; constexpr std::chrono::milliseconds DEFAULT_TELEMETRY_RUN_INTERVAL = 100ms; constexpr size_t DEFAULT_TELEMETRY_BUFFER_SIZE = 4096; -nixlTelemetry::nixlTelemetry(const std::string &name, backend_map_t &backend_map) +nixlTelemetry::nixlTelemetry(const std::string &file_path, backend_map_t &backend_map) : pool_(1), writeTask_(pool_.get_executor(), DEFAULT_TELEMETRY_RUN_INTERVAL), - file_(name), + file_(file_path), backendMap_(backend_map) { - if (name.empty()) { - throw std::invalid_argument("Telemetry file name cannot be empty"); + if (file_path.empty()) { + throw std::invalid_argument("Telemetry file path cannot be empty"); } initializeTelemetry(); } @@ -66,9 +66,7 @@ nixlTelemetry::initializeTelemetry() { std::stoul(std::getenv(TELEMETRY_BUFFER_SIZE_VAR)) : DEFAULT_TELEMETRY_BUFFER_SIZE; - auto folder_path = std::getenv(TELEMETRY_DIR_VAR) ? std::getenv(TELEMETRY_DIR_VAR) : "/tmp"; - - auto full_file_path = fs::path(folder_path) / file_; + auto full_file_path = fs::path(file_); if (buffer_size == 0) { throw std::invalid_argument("Telemetry buffer size cannot be 0"); diff --git a/src/core/telemetry.h b/src/core/telemetry.h index e13e18958..5a1cc9b28 100644 --- a/src/core/telemetry.h +++ b/src/core/telemetry.h @@ -43,7 +43,7 @@ struct periodicTask { class nixlTelemetry { public: - nixlTelemetry(const std::string &name, backend_map_t &backend_map); + nixlTelemetry(const std::string &file_path, backend_map_t &backend_map); ~nixlTelemetry(); diff --git a/src/core/telemetry_event.h b/src/core/telemetry_event.h index 57c05ed68..a37afa2a0 100644 --- a/src/core/telemetry_event.h +++ b/src/core/telemetry_event.h @@ -23,7 +23,6 @@ #include "nixl_types.h" constexpr char TELEMETRY_BUFFER_SIZE_VAR[] = "NIXL_TELEMETRY_BUFFER_SIZE"; -constexpr char TELEMETRY_DIR_VAR[] = "NIXL_TELEMETRY_DIR"; constexpr char TELEMETRY_RUN_INTERVAL_VAR[] = "NIXL_TELEMETRY_RUN_INTERVAL"; constexpr int TELEMETRY_VERSION = 1; diff --git a/src/core/transfer_request.h b/src/core/transfer_request.h index 5e482582e..fb80b1abd 100644 --- a/src/core/transfer_request.h +++ b/src/core/transfer_request.h @@ -17,7 +17,6 @@ #ifndef __TRANSFER_REQUEST_H_ #define __TRANSFER_REQUEST_H_ -#include #include #include #include @@ -26,9 +25,6 @@ #include "backend_engine.h" #include "telemetry.h" -using chrono_point_t = std::chrono::steady_clock::time_point; -using std::chrono::microseconds; - enum nixl_telemetry_stat_status_t { NIXL_TELEMETRY_POST = 0, NIXL_TELEMETRY_POST_AND_FINISH = 1, @@ -52,12 +48,7 @@ class nixlXferReqH { nixl_xfer_op_t backendOp; nixl_status_t status; - struct { - chrono_point_t startTime; - microseconds postDuration_ = microseconds(0); - microseconds xferDuration_ = microseconds(0); - size_t totalBytes; - } telemetry; + nixl_xfer_telem_t telemetry; public: inline nixlXferReqH() { } diff --git a/test/gtest/telemetry_test.cpp b/test/gtest/telemetry_test.cpp index 1a29d32f8..cb2d0529b 100644 --- a/test/gtest/telemetry_test.cpp +++ b/test/gtest/telemetry_test.cpp @@ -37,6 +37,7 @@ namespace fs = std::filesystem; constexpr char TELEMETRY_ENABLED_VAR[] = "NIXL_TELEMETRY_ENABLE"; +constexpr char TELEMETRY_DIR_VAR[] = "NIXL_TELEMETRY_DIR"; // Custom mock backend class for testing backend telemetry events class telemetryTestBackend : public mocks::GMockBackendEngine { @@ -55,7 +56,7 @@ class telemetryTest : public ::testing::Test { void SetUp() override { testDir_ = "/tmp/telemetry_test_files"; - testFile_ = "test_telemetry"; + testFile_ = testDir_.string() + "/test_telemetry"; try { if (!fs::exists(testDir_)) { fs::create_directory(testDir_); @@ -85,7 +86,7 @@ class telemetryTest : public ::testing::Test { void validateState() { - auto path = fs::path(testDir_) / testFile_; + auto path = fs::path(testFile_); auto buffer = std::make_unique>( path.string(), false, TELEMETRY_VERSION); EXPECT_EQ(buffer->version(), TELEMETRY_VERSION); @@ -155,7 +156,7 @@ TEST_F(telemetryTest, TransferBytesTracking) { EXPECT_NO_THROW(telemetry.addXferTime(std::chrono::microseconds(100), true, 2000)); std::this_thread::sleep_for(std::chrono::milliseconds(5)); - auto path = fs::path(testDir_) / testFile_; + auto path = fs::path(testFile_); auto buffer = std::make_unique>( path.string(), false, TELEMETRY_VERSION); EXPECT_EQ(buffer->size(), 10); @@ -240,9 +241,9 @@ TEST_F(telemetryTest, CustomTelemetryDirectory) { envHelper_.addVar(TELEMETRY_DIR_VAR, custom_dir.string()); EXPECT_NO_THROW({ - nixlTelemetry telemetry(testFile_, backendMap_); + fs::path telemetry_file = custom_dir / "test_telemetry"; + nixlTelemetry telemetry(telemetry_file.string(), backendMap_); - fs::path telemetry_file = custom_dir / testFile_; EXPECT_TRUE(fs::exists(telemetry_file)); }); envHelper_.popVar(); @@ -265,7 +266,7 @@ TEST_F(telemetryTest, TelemetryCategoryStringConversion) { // Test concurrent access (basic thread safety) TEST_F(telemetryTest, ConcurrentAccess) { envHelper_.addVar(TELEMETRY_RUN_INTERVAL_VAR, "1"); - testFile_ = "test_concurrent_access"; + testFile_ = testDir_.string() + "/test_concurrent_access"; nixlTelemetry telemetry(testFile_, backendMap_); const int num_threads = 4; @@ -333,7 +334,7 @@ TEST_F(telemetryTest, BackendTelemetryEventsCollection) { std::this_thread::sleep_for(std::chrono::milliseconds(3)); // Verify that backend events are collected and written to buffer - auto path = fs::path(testDir_) / testFile_; + auto path = fs::path(testFile_); auto buffer = std::make_unique>( path.string(), false, TELEMETRY_VERSION); @@ -373,7 +374,7 @@ TEST_F(telemetryTest, BackendTelemetryEventsEmptyBackendMap) { std::this_thread::sleep_for(std::chrono::milliseconds(5)); // Verify that only agent events are written (no backend events) - auto path = fs::path(testDir_) / testFile_; + auto path = fs::path(testFile_); auto buffer = std::make_unique>( path.string(), false, TELEMETRY_VERSION); @@ -415,7 +416,7 @@ TEST_F(telemetryTest, BackendTelemetryEventsMixedWithAgentEvents) { std::this_thread::sleep_for(std::chrono::milliseconds(5)); // Verify that both agent and backend events are written - auto path = fs::path(testDir_) / testFile_; + auto path = fs::path(testFile_); auto buffer = std::make_unique>( path.string(), false, TELEMETRY_VERSION); @@ -461,7 +462,7 @@ TEST_F(telemetryTest, BackendTelemetryEventsDisabledTelemetry) { std::this_thread::sleep_for(std::chrono::milliseconds(5)); // Verify that no events are written - auto path = fs::path(testDir_) / testFile_; + auto path = fs::path(testFile_); auto buffer = std::make_unique>( path.string(), false, TELEMETRY_VERSION); @@ -500,7 +501,7 @@ TEST_F(telemetryTest, BackendTelemetryEventsMultipleBackends) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); // Verify that events from all backends are collected - auto path = fs::path(testDir_) / testFile_; + auto path = fs::path(testFile_); auto buffer = std::make_unique>( path.string(), false, TELEMETRY_VERSION); diff --git a/test/gtest/test_transfer.cpp b/test/gtest/test_transfer.cpp index 2bb3bd5f7..f34374e22 100644 --- a/test/gtest/test_transfer.cpp +++ b/test/gtest/test_transfer.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,8 @@ #include #endif +constexpr auto min_chrono_time = std::chrono::steady_clock::time_point::min(); + namespace gtest { class MemBuffer : std::shared_ptr { @@ -126,22 +129,30 @@ class TestTransfer : return params; } + void + addAgent(unsigned int agent_num) { + ports.push_back(PortAllocator::next_tcp_port()); + agents.emplace_back( + std::make_unique(getAgentName(agent_num), getConfig(getPort(agent_num)))); + nixlBackendH *backend_handle = nullptr; + nixl_status_t status = + agents.back()->createBackend(getBackendName(), getBackendParams(), backend_handle); + ASSERT_EQ(status, NIXL_SUCCESS); + EXPECT_NE(backend_handle, nullptr); + } + void SetUp() override { #ifdef HAVE_CUDA m_cuda_device = (cudaSetDevice(0) == cudaSuccess); #endif + // Disabling Telemetry until the corresponding test + env.addVar("NIXL_TELEMETRY_ENABLE", "n"); + // Create two agents for (size_t i = 0; i < 2; i++) { - ports.push_back(PortAllocator::next_tcp_port()); - agents.emplace_back(std::make_unique(getAgentName(i), - getConfig(getPort(i)))); - nixlBackendH *backend_handle = nullptr; - nixl_status_t status = agents.back()->createBackend( - getBackendName(), getBackendParams(), backend_handle); - ASSERT_EQ(status, NIXL_SUCCESS); - EXPECT_NE(backend_handle, nullptr); + addAgent(i); } } @@ -220,10 +231,11 @@ class TestTransfer : return result; } - void exchangeMDIP() - { - for (size_t i = 0; i < agents.size(); i++) { - for (size_t j = 0; j < agents.size(); j++) { + void + exchangeMDIP(size_t start, size_t end) { + // Exchange metadata for the agents in the specified range using their IP + for (size_t i = start; i <= end; i++) { + for (size_t j = start; j <= end; j++) { if (i == j) { continue; } @@ -236,15 +248,15 @@ class TestTransfer : } } - void exchangeMD() - { - // Connect the existing agents and exchange metadata - for (size_t i = 0; i < agents.size(); i++) { + void + exchangeMD(size_t start, size_t end) { + // Exchange metadata for the agents in the specified range + for (size_t i = start; i <= end; i++) { nixl_blob_t md; nixl_status_t status = agents[i]->getLocalMD(md); ASSERT_EQ(status, NIXL_SUCCESS); - for (size_t j = 0; j < agents.size(); j++) { + for (size_t j = start; j <= end; j++) { if (i == j) continue; std::string remote_agent_name; @@ -255,11 +267,11 @@ class TestTransfer : } } - void invalidateMD() - { - // Disconnect the agents and invalidate remote metadata - for (size_t i = 0; i < agents.size(); i++) { - for (size_t j = 0; j < agents.size(); j++) { + void + invalidateMD(size_t start, size_t end) { + // Invalidate each other's metadata for the agents in the specified range + for (size_t i = start; i <= end; i++) { + for (size_t j = start; j < end; j++) { if (i == j) continue; nixl_status_t status = agents[j]->invalidateRemoteMD( @@ -321,7 +333,7 @@ class TestTransfer : size_t num_threads) { const size_t total_notifs = repeat * num_threads; - exchangeMD(); + exchangeMD(0, 1); std::vector threads; nixl_notifs_t notif_map; @@ -344,17 +356,23 @@ class TestTransfer : } verifyNotifs(to, from_name, total_notifs, std::move(notif_map)); - invalidateMD(); + invalidateMD(0, 1); } - void doTransfer(nixlAgent &from, const std::string &from_name, - nixlAgent &to, const std::string &to_name, size_t size, - size_t count, size_t repeat, size_t num_threads, - nixl_mem_t src_mem_type, - std::vector src_buffers, - nixl_mem_t dst_mem_type, - std::vector dst_buffers) - { + void + doTransfer(nixlAgent &from, + const std::string &from_name, + nixlAgent &to, + const std::string &to_name, + size_t size, + size_t count, + size_t repeat, + size_t num_threads, + nixl_mem_t src_mem_type, + std::vector src_buffers, + nixl_mem_t dst_mem_type, + std::vector dst_buffers, + nixl_status_t expected_telem_status = NIXL_ERR_NO_TELEMETRY) { std::mutex logger_mutex; std::vector threads; nixl_notifs_t notif_map; @@ -403,6 +421,16 @@ class TestTransfer : << "(" << bandwidth << " GB/s)"; } + nixl_xfer_telem_t telemetry; + status = from.getXferTelemetry(xfer_req, telemetry); + EXPECT_EQ(status, expected_telem_status); + if (expected_telem_status == NIXL_SUCCESS) { + EXPECT_TRUE(telemetry.startTime > min_chrono_time); + EXPECT_TRUE(telemetry.postDuration > chrono_period_us_t(0)); + EXPECT_TRUE(telemetry.xferDuration > chrono_period_us_t(0)); + EXPECT_TRUE(telemetry.xferDuration >= telemetry.postDuration); + } + status = from.releaseXferReq(xfer_req); EXPECT_EQ(status, NIXL_SUCCESS); }); @@ -413,7 +441,6 @@ class TestTransfer : } verifyNotifs(to, from_name, repeat * num_threads, std::move(notif_map)); - invalidateMD(); } nixlAgent &getAgent(size_t idx) @@ -427,6 +454,7 @@ class TestTransfer : } bool m_cuda_device = false; + gtest::ScopedEnv env; private: static constexpr uint64_t DEV_ID = 0; @@ -461,7 +489,7 @@ TEST_P(TestTransfer, RandomSizes) createRegisteredMem(getAgent(0), size, count, mem_type, src_buffers); createRegisteredMem(getAgent(1), size, count, mem_type, dst_buffers); - exchangeMD(); + exchangeMD(0, 1); doTransfer(getAgent(0), getAgentName(0), getAgent(1), @@ -474,6 +502,7 @@ TEST_P(TestTransfer, RandomSizes) src_buffers, mem_type, dst_buffers); + invalidateMD(0, 1); deregisterMem(getAgent(0), src_buffers, mem_type); deregisterMem(getAgent(1), dst_buffers, mem_type); } @@ -489,12 +518,13 @@ TEST_P(TestTransfer, remoteMDFromSocket) createRegisteredMem(getAgent(0), size, count, mem_type, src_buffers); createRegisteredMem(getAgent(1), size, count, mem_type, dst_buffers); - exchangeMDIP(); + exchangeMDIP(0, 1); doTransfer(getAgent(0), getAgentName(0), getAgent(1), getAgentName(1), size, count, 1, 1, mem_type, src_buffers, mem_type, dst_buffers); + invalidateMD(0, 1); deregisterMem(getAgent(0), src_buffers, mem_type); deregisterMem(getAgent(1), dst_buffers, mem_type); } @@ -528,6 +558,107 @@ TEST_P(TestTransfer, ListenerCommSize) { deregisterMem(getAgent(1), buffers, DRAM_SEG); } +TEST_P(TestTransfer, GetXferTelemetryFile) { + env.addVar("NIXL_TELEMETRY_ENABLE", "y"); + env.addVar("NIXL_TELEMETRY_DIR", "/tmp/"); + + // Create fresh agents that read the current env var and add them to the fixture + addAgent(2); + addAgent(3); + + constexpr size_t size = 1024; + constexpr size_t count = 1; + std::vector src_buffers, dst_buffers; + createRegisteredMem(getAgent(2), size, count, DRAM_SEG, src_buffers); + createRegisteredMem(getAgent(3), size, count, DRAM_SEG, dst_buffers); + + exchangeMD(2, 3); + doTransfer(getAgent(2), + getAgentName(2), + getAgent(3), + getAgentName(3), + size, + count, + 1, + 1, + DRAM_SEG, + src_buffers, + DRAM_SEG, + dst_buffers, + NIXL_SUCCESS); + + invalidateMD(2, 3); + deregisterMem(getAgent(2), src_buffers, DRAM_SEG); + deregisterMem(getAgent(3), dst_buffers, DRAM_SEG); +} + +TEST_P(TestTransfer, GetXferTelemetryAPI) { + // Enable telemetry without file output + env.addVar("NIXL_TELEMETRY_ENABLE", "y"); + + // Create fresh agents that read the current env var and add them to the fixture + addAgent(2); + addAgent(3); + + constexpr size_t size = 1024; + constexpr size_t count = 1; + std::vector src_buffers, dst_buffers; + createRegisteredMem(getAgent(2), size, count, DRAM_SEG, src_buffers); + createRegisteredMem(getAgent(3), size, count, DRAM_SEG, dst_buffers); + + exchangeMD(2, 3); + doTransfer(getAgent(2), + getAgentName(2), + getAgent(3), + getAgentName(3), + size, + count, + 1, + 1, + DRAM_SEG, + src_buffers, + DRAM_SEG, + dst_buffers, + NIXL_SUCCESS); + + invalidateMD(2, 3); + deregisterMem(getAgent(2), src_buffers, DRAM_SEG); + deregisterMem(getAgent(3), dst_buffers, DRAM_SEG); +} + +TEST_P(TestTransfer, GetXferTelemetryDisabled) { + env.addVar("NIXL_TELEMETRY_ENABLE", "n"); + + // Create fresh agents that read the current env var and add them to the fixture + addAgent(2); + addAgent(3); + + constexpr size_t size = 512; + constexpr size_t count = 1; + std::vector src_buffers, dst_buffers; + createRegisteredMem(getAgent(2), size, count, DRAM_SEG, src_buffers); + createRegisteredMem(getAgent(3), size, count, DRAM_SEG, dst_buffers); + + exchangeMD(2, 3); + doTransfer(getAgent(2), + getAgentName(2), + getAgent(3), + getAgentName(3), + size, + count, + 1, + 1, + DRAM_SEG, + src_buffers, + DRAM_SEG, + dst_buffers, + NIXL_ERR_NO_TELEMETRY); + + invalidateMD(2, 3); + deregisterMem(getAgent(2), src_buffers, DRAM_SEG); + deregisterMem(getAgent(3), dst_buffers, DRAM_SEG); +} + INSTANTIATE_TEST_SUITE_P(ucx, TestTransfer, testing::Values(std::make_tuple("UCX", true, 2, 0))); INSTANTIATE_TEST_SUITE_P(ucx_no_pt, TestTransfer, diff --git a/test/python/test_nixl_api.py b/test/python/test_nixl_api.py index e7a69c62d..76734d736 100644 --- a/test/python/test_nixl_api.py +++ b/test/python/test_nixl_api.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import uuid import pytest @@ -123,6 +124,7 @@ def test_metadata_pass(two_ucx_agents): passed_name = agent2.add_remote_agent(agent1.get_agent_metadata()) assert passed_name == agent1.name.encode() + utils.free_passthru(addr) @pytest.mark.timeout(5) @@ -200,3 +202,53 @@ def test_incorrect_plugin_env(monkeypatch): with pytest.raises(RuntimeError): nixl_agent("bad env agent") + + +def test_get_xfer_telemetry(): + os.environ["NIXL_TELEMETRY_ENABLE"] = "y" + + agent1 = nixl_agent(str(uuid.uuid4())) + agent2 = nixl_agent(str(uuid.uuid4())) + + mem_size = 128 + addr1 = utils.malloc_passthru(mem_size) + addr2 = utils.malloc_passthru(mem_size) + + try: + reg1 = agent1.get_reg_descs([(addr1, mem_size, 0, "")], mem_type="DRAM") + reg2 = agent2.get_reg_descs([(addr2, mem_size, 0, "")], mem_type="DRAM") + agent1.register_memory(reg1) + agent2.register_memory(reg2) + + agent1.add_remote_agent(agent2.get_agent_metadata()) + src = agent1.get_xfer_descs( + [(addr1, mem_size // 2, 0), (addr1 + mem_size // 2, mem_size // 2, 0)], + mem_type="DRAM", + ) + dst = agent1.get_xfer_descs( + [(addr2, mem_size // 2, 0), (addr2 + mem_size // 2, mem_size // 2, 0)], + mem_type="DRAM", + ) + + handle = agent1.initialize_xfer("WRITE", src, dst, agent2.name) + st = agent1.transfer(handle) + assert st in ("DONE", "PROC") + + while True: + st = agent1.check_xfer_state(handle) + assert st in ("DONE", "PROC") + if st == "DONE": + break + + telem = agent1.get_xfer_telemetry(handle) + assert telem.descCount == 2 + assert telem.totalBytes == mem_size + assert telem.startTime > 0 + assert telem.postDuration > 0 + assert telem.xferDuration > 0 + assert telem.xferDuration >= telem.postDuration + + agent1.release_xfer_handle(handle) + finally: + utils.free_passthru(addr1) + utils.free_passthru(addr2) diff --git a/test/python/test_nixl_bindings.py b/test/python/test_nixl_bindings.py index 3c5933014..a5f00daf6 100644 --- a/test/python/test_nixl_bindings.py +++ b/test/python/test_nixl_bindings.py @@ -60,6 +60,7 @@ def test_list(): def test_agent(): + os.environ["NIXL_TELEMETRY_ENABLE"] = "y" name1 = "Agent1" name2 = "Agent2" @@ -145,6 +146,15 @@ def test_agent(): logger.info("Transfer verified") + # Verify transfer telemetry + telem = agent1.getXferTelemetry(handle) + assert telem.descCount == 1 + assert telem.totalBytes == req_size + assert telem.startTime > 0 + assert telem.postDuration > 0 + assert telem.xferDuration > 0 + assert telem.xferDuration >= telem.postDuration + agent1.releaseXferReq(handle) ret = agent1.deregisterMem(reg_list1, [ucx1])