Skip to content

Commit 60a12a4

Browse files
Cyclic buffer telemetry exporter moved to use plug-in infrastructure (ai-dynamo#1088)
* Cyclic buffer telemetry exporter moved to use plug-in infrastructure Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com> * Removing redundant headers and try/catch routines Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com> * Minor fixes Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com> * Moving buffer to be class member Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com> * Removing fall back routine Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com> * Allowing NIXL_TELEMETRY_DIR to be not set Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com> * Compilance with previous functionality. If NIXL_TELEMETRY_DIR is not set, exporter will not be created Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com> * Removing redaundant logging Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com> * Not letting to load default telemetry plug-in if NIXL_TELEMETRY_DIR is not set Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com> --------- Signed-off-by: Aleksandr Bilkovskii <alexanderb@nvidia.com>
1 parent 6083015 commit 60a12a4

File tree

12 files changed

+200
-57
lines changed

12 files changed

+200
-57
lines changed

meson.build

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ if get_option('buildtype') == 'debug'
312312
run_command('truncate', '-s 0', plugfile, check: true)
313313
endif
314314

315-
nixl_inc_dirs = include_directories('src/api/cpp', 'src/api/cpp/backend', 'src/infra', 'src/core')
315+
nixl_inc_dirs = include_directories('src/api/cpp', 'src/api/cpp/backend', 'src/infra', 'src/core', 'src/core/telemetry')
316316
nixl_gpu_inc_dirs = include_directories('src/api/gpu/ucx')
317317
plugins_inc_dirs = include_directories('src/plugins')
318318
utils_inc_dirs = include_directories('src/utils')

src/api/cpp/telemetry/telemetry_exporter.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@
2323
#include <string>
2424

2525
inline constexpr char telemetryExporterVar[] = "NIXL_TELEMETRY_EXPORTER";
26-
inline constexpr char telemetryExporterOutputPathVar[] = "NIXL_TELEMETRY_EXPORTER_OUTPUT_PATH";
2726

2827
/**
2928
* @struct nixlTelemetryExporterInitParams
3029
* @brief Initialization parameters for telemetry exporters
3130
*/
3231
struct nixlTelemetryExporterInitParams {
33-
std::string outputPath; // Output path (file path, URL, etc.)
3432
std::string agentName;
3533
size_t maxEventsBuffered;
3634
};

src/core/meson.build

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ nixl_lib = library('nixl',
5757
'nixl_agent.cpp',
5858
'nixl_plugin_manager.cpp',
5959
'nixl_listener.cpp',
60-
'telemetry.cpp',
60+
'telemetry/telemetry.cpp',
61+
'telemetry/buffer_exporter.cpp',
62+
'telemetry/buffer_plugin.cpp',
6163
include_directories: [ nixl_inc_dirs, utils_inc_dirs ],
6264
link_args: ['-lstdc++fs'],
6365
dependencies: nixl_lib_deps,

src/core/nixl_agent.cpp

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
#include "telemetry_event.h"
3333

3434
constexpr char TELEMETRY_ENABLED_VAR[] = "NIXL_TELEMETRY_ENABLE";
35-
constexpr char TELEMETRY_DIR_VAR[] = "NIXL_TELEMETRY_DIR";
3635
static const std::vector<std::vector<std::string>> illegal_plugin_combinations = {
3736
{"GDS", "GDS_MT"},
3837
};
@@ -126,19 +125,12 @@ nixlAgentData::nixlAgentData(const std::string &name, const nixlAgentConfig &cfg
126125

127126
memorySection = new nixlLocalSection();
128127
const char *telemetry_env_val = std::getenv(TELEMETRY_ENABLED_VAR);
129-
const char *telemetry_env_dir = std::getenv(TELEMETRY_DIR_VAR);
130128

131129
if (telemetry_env_val != nullptr) {
132130
if (!strcasecmp(telemetry_env_val, "y") || !strcasecmp(telemetry_env_val, "1") ||
133131
!strcasecmp(telemetry_env_val, "yes") || !strcasecmp(telemetry_env_val, "on")) {
134132
telemetryEnabled = true;
135-
if (telemetry_env_dir != nullptr) {
136-
std::string telemetry_file = std::string(telemetry_env_dir) + "/" + name;
137-
telemetry_ = std::make_unique<nixlTelemetry>(telemetry_file, backendEngines);
138-
NIXL_DEBUG << "NIXL telemetry is enabled with output file: " << telemetry_file;
139-
} else {
140-
NIXL_DEBUG << "NIXL telemetry is enabled without an output file";
141-
}
133+
telemetry_ = std::make_unique<nixlTelemetry>(name, backendEngines);
142134
} else if (cfg.captureTelemetry) {
143135
telemetryEnabled = true;
144136
NIXL_WARN << "NIXL telemetry is enabled through config, "

src/core/nixl_plugin_manager.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,4 +674,5 @@ void nixlPluginManager::registerBuiltinPlugins() {
674674
#ifdef STATIC_PLUGIN_HF3FS
675675
NIXL_REGISTER_STATIC_PLUGIN(Backend, HF3FS)
676676
#endif
677+
NIXL_REGISTER_STATIC_PLUGIN(Telemetry, BUFFER)
677678
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#include "buffer_exporter.h"
18+
#include "common/nixl_log.h"
19+
20+
namespace {
21+
static std::filesystem::path
22+
getFilePath(const nixlTelemetryExporterInitParams &init_params) {
23+
// if we reach here, we ensured env var is set
24+
auto telemetry_dir = std::getenv(telemetryDirVar);
25+
return std::filesystem::path(telemetry_dir) / init_params.agentName.data();
26+
}
27+
} // namespace
28+
29+
nixlTelemetryBufferExporter::nixlTelemetryBufferExporter(
30+
const nixlTelemetryExporterInitParams &init_params)
31+
: nixlTelemetryExporter(init_params),
32+
filePath_(getFilePath(init_params)),
33+
buffer_(filePath_.string(), true, TELEMETRY_VERSION, getMaxEventsBuffered()) {
34+
NIXL_INFO << "Telemetry enabled, using buffer path: " << filePath_.string()
35+
<< " with size: " << getMaxEventsBuffered();
36+
}
37+
38+
nixl_status_t
39+
nixlTelemetryBufferExporter::exportEvent(const nixlTelemetryEvent &event) {
40+
if (!buffer_.push(event)) {
41+
return NIXL_ERR_UNKNOWN;
42+
}
43+
44+
return NIXL_SUCCESS;
45+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#ifndef _TELEMETRY_BUFFER_EXPORTER_H
18+
#define _TELEMETRY_BUFFER_EXPORTER_H
19+
20+
#include "common/cyclic_buffer.h"
21+
#include "telemetry/telemetry_exporter.h"
22+
#include "telemetry_event.h"
23+
#include "nixl_types.h"
24+
25+
#include <filesystem>
26+
27+
constexpr const char telemetryDirVar[] = "NIXL_TELEMETRY_DIR";
28+
29+
/**
30+
* @class nixlTelemetryBufferExporter
31+
* @brief Shared memory buffer based telemetry exporter implementation
32+
*
33+
* This class implements the telemetry exporter interface to export
34+
* telemetry events to a shared memory buffer.
35+
*/
36+
class nixlTelemetryBufferExporter : public nixlTelemetryExporter {
37+
public:
38+
/**
39+
* @brief Constructor using init params (plugin-compatible)
40+
* @param init_params Initialization parameters
41+
*/
42+
explicit nixlTelemetryBufferExporter(const nixlTelemetryExporterInitParams &init_params);
43+
44+
nixl_status_t
45+
exportEvent(const nixlTelemetryEvent &event) override;
46+
47+
private:
48+
std::filesystem::path filePath_;
49+
sharedRingBuffer<nixlTelemetryEvent> buffer_;
50+
};
51+
52+
#endif // _TELEMETRY_BUFFER_EXPORTER_H
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "buffer_exporter.h"
19+
#include "telemetry/telemetry_plugin.h"
20+
#include "telemetry/telemetry_exporter.h"
21+
22+
// Plugin type alias for convenience
23+
using buffer_exporter_plugin_t = nixlTelemetryPluginCreator<nixlTelemetryBufferExporter>;
24+
25+
nixlTelemetryPlugin *
26+
createStaticBUFFERPlugin() {
27+
return buffer_exporter_plugin_t::create(nixlTelemetryPluginApiVersionV1, "buffer", "1.0.0");
28+
}
Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,23 @@
2727
#include "telemetry.h"
2828
#include "telemetry_event.h"
2929
#include "util.h"
30+
#include "plugin_manager.h"
31+
#include "buffer_exporter.h"
3032

3133
using namespace std::chrono_literals;
3234
namespace fs = std::filesystem;
3335

3436
constexpr std::chrono::milliseconds DEFAULT_TELEMETRY_RUN_INTERVAL = 100ms;
3537
constexpr size_t DEFAULT_TELEMETRY_BUFFER_SIZE = 4096;
38+
constexpr const char *defaultTelemetryPlugin = "BUFFER";
3639

37-
nixlTelemetry::nixlTelemetry(const std::string &file_path, backend_map_t &backend_map)
40+
nixlTelemetry::nixlTelemetry(const std::string &agent_name, backend_map_t &backend_map)
3841
: pool_(1),
3942
writeTask_(pool_.get_executor(), DEFAULT_TELEMETRY_RUN_INTERVAL, false),
40-
file_(file_path),
43+
agentName_(agent_name),
4144
backendMap_(backend_map) {
42-
if (file_path.empty()) {
43-
throw std::invalid_argument("Telemetry file path cannot be empty");
45+
if (agent_name.empty()) {
46+
throw std::invalid_argument("Expected non-empty agent name in nixl telemetry create");
4447
}
4548
initializeTelemetry();
4649
}
@@ -69,17 +72,38 @@ nixlTelemetry::initializeTelemetry() {
6972
std::stoul(std::getenv(TELEMETRY_BUFFER_SIZE_VAR)) :
7073
DEFAULT_TELEMETRY_BUFFER_SIZE;
7174

72-
auto full_file_path = fs::path(file_);
73-
7475
if (buffer_size == 0) {
7576
throw std::invalid_argument("Telemetry buffer size cannot be 0");
7677
}
7778

78-
NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path
79-
<< " with size: " << buffer_size;
79+
const char *exporter_name = std::getenv(telemetryExporterVar);
80+
81+
if (!exporter_name) {
82+
NIXL_INFO << "No telemetry exporter was specified, using default: "
83+
<< defaultTelemetryPlugin;
84+
exporter_name = defaultTelemetryPlugin;
85+
if (!std::getenv(telemetryDirVar)) {
86+
NIXL_DEBUG << telemetryDirVar
87+
<< " is not set, NIXL telemetry is enabled without any exporter";
88+
return;
89+
}
90+
}
91+
auto &plugin_manager = nixlPluginManager::getInstance();
92+
std::shared_ptr<const nixlTelemetryPluginHandle> plugin_handle =
93+
plugin_manager.loadTelemetryPlugin(exporter_name);
94+
95+
if (plugin_handle == nullptr) {
96+
throw std::runtime_error("Failed to load telemetry plugin: " + std::string(exporter_name));
97+
}
98+
99+
const nixlTelemetryExporterInitParams init_params{agentName_, buffer_size};
100+
exporter_ = plugin_handle->createExporter(init_params);
101+
if (!exporter_) {
102+
NIXL_ERROR << "Failed to create telemetry exporter: " << exporter_name;
103+
return;
104+
}
80105

81-
buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
82-
full_file_path, true, TELEMETRY_VERSION, buffer_size);
106+
NIXL_DEBUG << "NIXL telemetry is enabled with " << exporter_name << "exporter";
83107

84108
auto run_interval = std::getenv(TELEMETRY_RUN_INTERVAL_VAR) ?
85109
std::chrono::milliseconds(std::stoul(std::getenv(TELEMETRY_RUN_INTERVAL_VAR))) :
@@ -96,14 +120,14 @@ bool
96120
nixlTelemetry::writeEventHelper() {
97121
std::vector<nixlTelemetryEvent> next_queue;
98122
// assume next buffer will be the same size as the current one
99-
next_queue.reserve(buffer_->capacity());
123+
next_queue.reserve(exporter_->getMaxEventsBuffered());
100124
{
101125
std::lock_guard<std::mutex> lock(mutex_);
102126
events_.swap(next_queue);
103127
}
104128
for (auto &event : next_queue) {
105129
// if full, ignore
106-
buffer_->push(event);
130+
exporter_->exportEvent(event);
107131
}
108132
// collect all events and sort them by timestamp
109133
std::vector<nixlTelemetryEvent> all_events;
@@ -122,7 +146,7 @@ nixlTelemetry::writeEventHelper() {
122146
return a.timestampUs_ < b.timestampUs_;
123147
});
124148
for (auto &event : all_events) {
125-
buffer_->push(event);
149+
exporter_->exportEvent(event);
126150
}
127151
return true;
128152
}
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#define _TELEMETRY_H
1919

2020
#include "common/cyclic_buffer.h"
21+
#include "telemetry/telemetry_exporter.h"
2122
#include "telemetry_event.h"
2223
#include "mem_section.h"
2324
#include "nixl_types.h"
@@ -49,7 +50,7 @@ struct periodicTask {
4950

5051
class nixlTelemetry {
5152
public:
52-
nixlTelemetry(const std::string &file_path, backend_map_t &backend_map);
53+
nixlTelemetry(const std::string &agent_name, backend_map_t &backend_map);
5354

5455
~nixlTelemetry();
5556

@@ -81,12 +82,13 @@ class nixlTelemetry {
8182
updateData(const std::string &event_name, nixl_telemetry_category_t category, uint64_t value);
8283
bool
8384
writeEventHelper();
85+
std::unique_ptr<nixlTelemetryExporter> exporter_;
8486
std::unique_ptr<sharedRingBuffer<nixlTelemetryEvent>> buffer_;
8587
std::vector<nixlTelemetryEvent> events_;
8688
std::mutex mutex_;
8789
asio::thread_pool pool_;
8890
periodicTask writeTask_;
89-
std::string file_;
91+
std::string agentName_;
9092
backend_map_t &backendMap_;
9193
};
9294

0 commit comments

Comments
 (0)