Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/api/cpp/nixl.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ class nixlAgent {
nixl_status_t
getXferStatus (nixlXferReqH* req_hndl) const;

/**
* @brief Get the telemetry of a transfer request
*
* @param req_hndl Transfer request handle after postXferReq
* @return nixl_xfer_telemetry_t Telemetry of the transfer request
*/
nixl_xfer_telemetry_t
getXferTelemetry (nixlXferReqH *req_hndl) const;

/**
* @brief Query the backend associated with `req_hndl`. E.g., if for genNotif
* the same backend as a transfer is desired.
Expand Down
12 changes: 11 additions & 1 deletion src/api/cpp/nixl_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <string>
#include <unordered_map>
#include <optional>

#include <chrono>

/*** Forward declarations ***/
class nixlSerDes;
Expand Down Expand Up @@ -87,6 +87,16 @@ namespace nixlEnumStrings {
}


/**
* @brief Telemetry data for a transfer request.
*/
struct nixl_xfer_telemetry_t {
std::chrono::high_resolution_clock::time_point startTime;
uint64_t postElapsedTime;
uint64_t xferElapsedTime;
size_t totalBytes;
};

/*** NIXL typedefs and defines used in the API ***/

/**
Expand Down
10 changes: 10 additions & 0 deletions src/api/python/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,16 @@ def check_xfer_state(self, handle: nixl_xfer_handle) -> str:
else:
return "ERR"

"""
@brief Get the telemetry of a transfer operation.

@param handle Handle to the transfer operation, from make_prepped_xfer, or initialize_xfer.
@return Telemetry of the transfer operation.
"""

def get_xfer_telemetry(self, handle: nixl_xfer_handle) -> nixlBind.nixlXferTelemetry:
return self.agent.getXferTelemetry(handle)

"""
@brief Query the backend that was chosen for a transfer operation.

Expand Down
14 changes: 14 additions & 0 deletions src/bindings/python/nixl_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ PYBIND11_MODULE(_bindings, m) {
.value("NIXL_ERR_NOT_SUPPORTED", NIXL_ERR_NOT_SUPPORTED)
.export_values();

py::class_<nixl_xfer_telemetry_t>(m, "nixlXferTelemetry")
.def_property_readonly("start_time_ms", [](const nixl_xfer_telemetry_t& t) {
return std::chrono::duration_cast<std::chrono::milliseconds>(
t.startTime.time_since_epoch()).count();
})
.def_readwrite("post_elapsed_time_ms", &nixl_xfer_telemetry_t::postElapsedTime)
.def_readwrite("xfer_elapsed_time_ms", &nixl_xfer_telemetry_t::xferElapsedTime)
.def_readwrite("total_bytes", &nixl_xfer_telemetry_t::totalBytes);

py::register_exception<nixlNotPostedError>(m, "nixlNotPostedError");
py::register_exception<nixlInvalidParamError>(m, "nixlInvalidParamError");
py::register_exception<nixlBackendError>(m, "nixlBackendError");
Expand Down Expand Up @@ -652,6 +661,11 @@ PYBIND11_MODULE(_bindings, m) {
throw_nixl_exception(ret);
return ret;
})
.def("getXferTelemetry",
[](nixlAgent &agent, uintptr_t reqh) -> nixl_xfer_telemetry_t {
// TODO could also do in line without getter.
return agent.getXferTelemetry((nixlXferReqH *)reqh);
})
.def("queryXferBackend",
[](nixlAgent &agent, uintptr_t reqh) -> uintptr_t {
nixlBackendH *backend = nullptr;
Expand Down
23 changes: 16 additions & 7 deletions src/core/nixl_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ std::string nixlEnumStrings::statusStr (const nixl_status_t &status) {

/*** nixlXferReqH telemetry update method, used mainly in the nixlAgent ***/
void
nixlXferReqH::updateRequestStats(const std::string &dbg_msg_type) {
const auto xfer_time = std::chrono::duration_cast<std::chrono::microseconds>(
nixlXferReqH::updateRequestStats(uint64_t &telemetryToUpdate, const std::string &dbg_msg_type) {
const auto diff_time = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - telemetry.startTime);
telemetryToUpdate = diff_time.count();
// If endTime needs to be recorded per Xfer, now() value here can be returned

// To be replaced with NIXL_DEBUG when full telemetry is added
std::cout << "[NIXL TELEMETRY]: From backend " << engine->getType() << " " << dbg_msg_type
<< " Xfer with " << initiatorDescs->descCount() << " descriptors of total size "
<< telemetry.totalBytes << "B in " << xfer_time.count() << "us." << std::endl;
<< telemetry.totalBytes << "B in " << diff_time.count() << "us." << std::endl;
}

/*** nixlAgentData constructor/destructor, as part of nixlAgent's ***/
Expand Down Expand Up @@ -930,10 +931,13 @@ nixlAgent::postXferReq(nixlXferReqH *req_hndl,
req_hndl->status = ret;

if (data->telemetryEnabled) {
if (req_hndl->status == NIXL_SUCCESS)
req_hndl->updateRequestStats("Posted and Completed");
if (req_hndl->status == NIXL_SUCCESS){
req_hndl->telemetry.postElapsedTime = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - req_hndl->telemetry.startTime).count();
req_hndl->updateRequestStats(req_hndl->telemetry.xferElapsedTime, "Posted and Completed");
}
else if (req_hndl->status == NIXL_IN_PROG)
req_hndl->updateRequestStats("Posted");
req_hndl->updateRequestStats(req_hndl->telemetry.postElapsedTime, "Posted");
// Errors should show up in debug log separately, not adding a print here
}

Expand All @@ -956,11 +960,16 @@ nixlAgent::getXferStatus (nixlXferReqH *req_hndl) const {
}

if (data->telemetryEnabled && req_hndl->status == NIXL_SUCCESS)
req_hndl->updateRequestStats("Completed");
req_hndl->updateRequestStats(req_hndl->telemetry.xferElapsedTime, "Completed");

return req_hndl->status;
}

nixl_xfer_telemetry_t
nixlAgent::getXferTelemetry (nixlXferReqH *req_hndl) const {
return req_hndl->telemetry;
}


nixl_status_t
nixlAgent::queryXferBackend(const nixlXferReqH* req_hndl,
Expand Down
10 changes: 2 additions & 8 deletions src/core/transfer_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
#ifndef __TRANSFER_REQUEST_H_
#define __TRANSFER_REQUEST_H_

constexpr auto min_chrono_time = std::chrono::time_point<std::chrono::high_resolution_clock>::min();
using chrono_point_t = std::chrono::high_resolution_clock::time_point;

// Contains pointers to corresponding backend engine and its handler, and populated
// and verified DescLists, and other state and metadata needed for a NIXL transfer
class nixlXferReqH {
Expand All @@ -37,10 +34,7 @@ class nixlXferReqH {
nixl_xfer_op_t backendOp;
nixl_status_t status;

struct {
chrono_point_t startTime;
size_t totalBytes;
} telemetry;
nixl_xfer_telemetry_t telemetry;

public:
inline nixlXferReqH() { }
Expand All @@ -54,7 +48,7 @@ class nixlXferReqH {
}

void
updateRequestStats(const std::string &dbg_msg_type);
updateRequestStats(uint64_t &telemetryToUpdate, const std::string &dbg_msg_type);

friend class nixlAgent;
};
Expand Down
Loading