Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitlab/test_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ cd ${INSTALL_DIR}
./bin/nixl_etcd_example
./bin/ucx_backend_test
./bin/ucx_mo_backend_test
NIXL_TELEMETRY_ENABLE=y ./bin/agent_example &
mkdir -p /tmp/telemetry_test
NIXL_TELEMETRY_ENABLE=y NIXL_TELEMETRY_DIR=/tmp/telemetry_test ./bin/agent_example &
sleep 1
./bin/telemetry_reader /tmp/Agent001 &
./bin/telemetry_reader /tmp/telemetry_test/Agent001 &
telePID=$!
sleep 6
kill -s SIGINT $telePID
Expand Down
5 changes: 3 additions & 2 deletions .gitlab/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@ python3 partial_md_example.py --etcd
python3 query_mem_example.py

# Running telemetry for the last test
export NIXL_TELEMETRY_ENABLE=y
blocking_send_recv_port=$(get_next_tcp_port)
mkdir -p /tmp/telemetry_test

python3 blocking_send_recv_example.py --mode="target" --ip=127.0.0.1 --port="$blocking_send_recv_port"&
sleep 5
NIXL_TELEMETRY_ENABLE=y NIXL_TELEMETRY_DIR=/tmp/telemetry_test \
python3 blocking_send_recv_example.py --mode="initiator" --ip=127.0.0.1 --port="$blocking_send_recv_port"

python3 telemetry_reader.py --telemetry_path /tmp/initiator &
python3 telemetry_reader.py --telemetry_path /tmp/telemetry_test/initiator &
telePID=$!
sleep 6
kill -s INT $telePID
Expand Down
6 changes: 4 additions & 2 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ Telemetry is controlled by environment variables:
| Variable | Description | Default |
|----------|-------------|---------|
| `NIXL_TELEMETRY_ENABLE` | Enable telemetry collection | Disabled |
| `NIXL_TELEMETRY_DIR` | Directory for telemetry files | `/tmp` |
| `NIXL_TELEMETRY_DIR` | Directory for telemetry files | - |
| `NIXL_TELEMETRY_BUFFER_SIZE` | Number of events in buffer | `4096` |
| `NIXL_TELEMETRY_RUN_INTERVAL` | Flush interval (ms) | `100` |

- NIXL_TELEMETRY_ENABLE can be set to y/yes/on/1 to be enabled, and n/no/off/0 (or not set) to be disabled,
- If NIXL_TELEMETRY_ENABLE is set to enabled but NIXL_TELEMETRY_DIR is not set, no telemetry file is generated and NIXL_TELEMETRY_RUN_INTERVAL is not used.

## Telemetry File Format

Expand Down Expand Up @@ -95,4 +97,4 @@ Category: MEMORY
Event: agent_memory_registered
Value: 4096
===========================
```
```
11 changes: 11 additions & 0 deletions src/api/cpp/nixl.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,17 @@ class nixlAgent {
nixl_status_t
getXferStatus (nixlXferReqH* req_hndl) const;


/**
* @brief Get the telemetry data associated with `req_hndl`.
*
* @param req_hndl Transfer request handle obtained from makeXferReq/createXferReq
* @param telemetry [out] Output telemetry information
* @return nixl_status_t Error code if call was not successful
*/
nixl_status_t
getXferTelemetry(const nixlXferReqH *req_hndl, nixl_xfer_telem_t &telemetry) const;

/**
* @brief Query the backend associated with `req_hndl`. E.g., if for genNotif
* the same backend as a transfer is desired.
Expand Down
53 changes: 52 additions & 1 deletion src/api/cpp/nixl_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <unordered_map>
#include <optional>
#include <chrono>


/*** Forward declarations ***/
Expand Down Expand Up @@ -62,7 +63,8 @@ enum nixl_status_t {
NIXL_ERR_UNKNOWN = -8,
NIXL_ERR_NOT_SUPPORTED = -9,
NIXL_ERR_REMOTE_DISCONNECT = -10,
NIXL_ERR_CANCELED = -11
NIXL_ERR_CANCELED = -11,
NIXL_ERR_NO_TELEMETRY = -12
};

/**
Expand Down Expand Up @@ -228,6 +230,55 @@ using nixl_opt_args_t = nixlAgentOptionalArgs;
*/
using nixlGpuXferReqH = void;

/**
* @brief A typedefs for a point in time
*/
using chrono_point_t = std::chrono::steady_clock::time_point;

/**
* @brief A typedefs for a period of time in microseconds
*/
using chrono_period_us_t = std::chrono::microseconds;

/**
* @struct nixlXferTelemetry
* @brief A structure for telemetry output from agent API
*/
struct nixlXferTelemetry {
/**
* @var startTime Time that the transfer was posted
*/
chrono_point_t startTime;

/**
* @var postDuration Time it took to do the post operation
*/
chrono_period_us_t postDuration;

/**
* @var xferDuration Time it took to complete the transfer
* if checkXferReq is called late, that might impact this result
*/
chrono_period_us_t xferDuration;

/**
* @var totalBytes Amount of bytes transferred in the request
*/
size_t totalBytes;

/**
* @var descCount Number of descriptors in the transfer request.
* If any merging of descriptors were performed, it will be reflected here.
*/
size_t descCount;
};

/**
* @brief A typedef for a nixlXferTelemetry
* for telemetry output.
*/
using nixl_xfer_telem_t = nixlXferTelemetry;

/**
* @brief A define for an empty string, that indicates the descriptor list is being
* prepared for the local agent as an initiator in prepXferDlist method.
Expand Down
16 changes: 16 additions & 0 deletions src/api/python/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,22 @@ def check_xfer_state(self, handle: nixl_xfer_handle) -> str:
else:
return "ERR"

"""
@brief Get telemetry information of a transfer request.
The output object has three time values fields in microseconds
(startTime, postDuration, xferDuration), as well as integer totalBytes transferred
for the request, and integer descCount representing number of descriptors involved
(for example if there was some merging of descriptors).

@param handle Handle to the transfer operation, from make_prepped_xfer or initialize_xfer.
@return nixlXferTelemetry object
"""

def get_xfer_telemetry(
self, handle: nixl_xfer_handle
) -> nixlBind.nixlXferTelemetry:
return self.agent.getXferTelemetry(handle._handle)

"""
@brief Query the backend that was chosen for a transfer operation.

Expand Down
57 changes: 55 additions & 2 deletions src/bindings/python/nixl_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <pybind11/stl.h>
#include <pybind11/operators.h>
#include <pybind11/numpy.h>
#include <pybind11/chrono.h>

#include <tuple>
#include <iostream>
Expand Down Expand Up @@ -64,14 +65,29 @@ class nixlRepostActiveError : public std::runtime_error {
nixlRepostActiveError(const char *what) : runtime_error(what) {}
};

class nixlUnknownError : public std::runtime_error {
public:
nixlUnknownError(const char *what) : runtime_error(what) {}
};

class nixlNotSupportedError : public std::runtime_error {
public:
nixlNotSupportedError(const char *what) : runtime_error(what) {}
};

class nixlUnknownError : public std::runtime_error {
class nixlRemoteDisconnectError : public std::runtime_error {
public:
nixlUnknownError(const char *what) : runtime_error(what) {}
nixlRemoteDisconnectError(const char *what) : runtime_error(what) {}
};

class nixlCancelledError : public std::runtime_error {
public:
nixlCancelledError(const char *what) : runtime_error(what) {}
};

class nixlNoTelemetryError : public std::runtime_error {
public:
nixlNoTelemetryError(const char *what) : runtime_error(what) {}
};

void
Expand Down Expand Up @@ -108,6 +124,15 @@ throw_nixl_exception(const nixl_status_t &status) {
case NIXL_ERR_NOT_SUPPORTED:
throw nixlNotSupportedError(nixlEnumStrings::statusStr(status).c_str());
break;
case NIXL_ERR_REMOTE_DISCONNECT:
throw nixlRemoteDisconnectError(nixlEnumStrings::statusStr(status).c_str());
break;
case NIXL_ERR_CANCELED:
throw nixlCancelledError(nixlEnumStrings::statusStr(status).c_str());
break;
case NIXL_ERR_NO_TELEMETRY:
throw nixlNoTelemetryError(nixlEnumStrings::statusStr(status).c_str());
break;
default:
throw std::runtime_error("BAD_STATUS");
}
Expand Down Expand Up @@ -161,6 +186,22 @@ PYBIND11_MODULE(_bindings, m) {
.value("NIXL_ERR_NOT_SUPPORTED", NIXL_ERR_NOT_SUPPORTED)
.export_values();

py::class_<nixl_xfer_telem_t>(m, "nixlXferTelemetry")
.def(py::init<>())
.def_property_readonly("startTime",
[](const nixl_xfer_telem_t &t) {
return std::chrono::duration_cast<chrono_period_us_t>(
t.startTime.time_since_epoch())
.count();
})
.def_property_readonly("postDuration",
[](const nixl_xfer_telem_t &t) { return t.postDuration.count(); })
.def_property_readonly("xferDuration",
[](const nixl_xfer_telem_t &t) { return t.xferDuration.count(); })
.def_readonly("totalBytes", &nixl_xfer_telem_t::totalBytes)
.def_readonly("descCount", &nixl_xfer_telem_t::descCount);


py::register_exception<nixlNotPostedError>(m, "nixlNotPostedError");
py::register_exception<nixlInvalidParamError>(m, "nixlInvalidParamError");
py::register_exception<nixlBackendError>(m, "nixlBackendError");
Expand All @@ -170,6 +211,9 @@ PYBIND11_MODULE(_bindings, m) {
py::register_exception<nixlRepostActiveError>(m, "nixlRepostActiveError");
py::register_exception<nixlUnknownError>(m, "nixlUnknownError");
py::register_exception<nixlNotSupportedError>(m, "nixlNotSupportedError");
py::register_exception<nixlRemoteDisconnectError>(m, "nixlRemoteDisconnectError");
py::register_exception<nixlCancelledError>(m, "nixlCancelledError");
py::register_exception<nixlNoTelemetryError>(m, "nixlNoTelemetryError");

py::class_<nixl_xfer_dlist_t>(m, "nixlXferDList")
.def(py::init<nixl_mem_t, bool, int>(),
Expand Down Expand Up @@ -652,6 +696,15 @@ PYBIND11_MODULE(_bindings, m) {
throw_nixl_exception(ret);
return ret;
})
.def(
"getXferTelemetry",
[](nixlAgent &agent, uintptr_t reqh) -> nixl_xfer_telem_t {
nixl_xfer_telem_t telemetry;
nixl_status_t ret = agent.getXferTelemetry((nixlXferReqH *)reqh, telemetry);
throw_nixl_exception(ret);
return telemetry;
},
py::arg("reqh"))
.def("queryXferBackend",
[](nixlAgent &agent, uintptr_t reqh) -> uintptr_t {
nixlBackendH *backend = nullptr;
Expand Down
9 changes: 7 additions & 2 deletions src/core/agent_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "common/str_tools.h"
#include "mem_section.h"
#include "telemetry.h"
#include "stream/metadata_stream.h"
#include "sync.h"

Expand All @@ -33,8 +34,6 @@ class SyncClient;
#define NIXL_ETCD_NAMESPACE_DEFAULT "/nixl/agents/"
#endif // HAVE_ETCD

class nixlTelemetry;

using backend_list_t = std::vector<nixlBackendEngine*>;

//Internal typedef to define metadata communication request types
Expand Down Expand Up @@ -65,6 +64,7 @@ class nixlAgentData {
std::string name;
nixlAgentConfig config;
nixlLock lock;
bool telemetryEnabled = false;

// some handle that can be used to instantiate an object from the lib
std::map<std::string, void*> backendLibs;
Expand Down Expand Up @@ -115,6 +115,11 @@ class nixlAgentData {
nixlAgentData(const std::string &name, const nixlAgentConfig &cfg);
~nixlAgentData();

inline void
addErrorTelemetry(nixl_status_t err_status) {
if (telemetry_) telemetry_->updateErrorCount(err_status);
}

friend class nixlAgent;
};

Expand Down
Loading