Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitlab/test_cpp.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/sh
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion .gitlab/test_nixlbench.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/sh
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion .gitlab/test_python.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/sh
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion benchmark/nixlbench/src/runtime/etcd/etcd_rt.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion examples/cpp/nixl_etcd_example.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion examples/cpp/telemetry_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion examples/python/telemetry_reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
119 changes: 104 additions & 15 deletions src/plugins/gpunetio/gpunetio_backend.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,15 +17,18 @@

#include "gpunetio_backend.h"
#include "serdes/serdes.h"
#include <arpa/inet.h>
#include <cassert>
#include <stdexcept>
#include <cstdlib>
#include <errno.h>
#include <unistd.h>
#include "common/nixl_log.h"
#include <arpa/inet.h>

const char info_delimiter = '-';

extern "C" int
doca_query_mkey_swapped();
/****************************************
* Constructor/Destructor
*****************************************/
Expand All @@ -47,6 +50,21 @@ nixlDocaEngine::nixlDocaEngine(const nixlBackendInitParams *init_params)
result = doca_log_backend_set_sdk_level(sdk_log, DOCA_LOG_LEVEL_ERROR);
if (result != DOCA_SUCCESS) throw std::invalid_argument("Can't initialize doca log");

// Detect device-side expectation for key swapping, allow env override
int device_mkey_swapped = -1;
device_mkey_swapped = doca_query_mkey_swapped();
if (device_mkey_swapped >= 0) swap_keys_config = (device_mkey_swapped == 1);
const char *env_swap = std::getenv("NIXL_GPUNETIO_SWAP_KEYS");
if (env_swap != nullptr) {
if (std::string(env_swap) == "0" || std::string(env_swap) == "false")
swap_keys_config = false;
else
swap_keys_config = true;
}
NIXL_DEBUG << "GPUNETIO key byte-order swap (htonl): "
<< (swap_keys_config ? "enabled" : "disabled") << ", device expects swapped: "
<< (device_mkey_swapped >= 0 ? device_mkey_swapped : -1);

NIXL_INFO << "DOCA network devices ";
// Temporary: will extend to more GPUs in a dedicated PR
if (custom_params->count("network_devices") > 1)
Expand Down Expand Up @@ -179,6 +197,15 @@ nixlDocaEngine::nixlDocaEngine(const nixlBackendInitParams *init_params)
} else {
doca_devinfo_get_ipv4_addr(
doca_dev_as_devinfo(ddev), (uint8_t *)ipv4_addr, DOCA_DEVINFO_IPV4_ADDR_SIZE);
if (const char *override_ip = std::getenv("NIXL_DOCASIM_IPV4_OVERRIDE")) {
struct in_addr addr_override = {};
if (inet_pton(AF_INET, override_ip, &addr_override) == 1) {
std::memcpy(ipv4_addr, &addr_override.s_addr, sizeof(ipv4_addr));
NIXL_INFO << "DOCA IP override applied from env " << override_ip;
} else {
NIXL_WARN << "Invalid IPv4 override in NIXL_DOCASIM_IPV4_OVERRIDE: " << override_ip;
}
}
NIXL_DEBUG << "DOCA IP address " << static_cast<unsigned>(ipv4_addr[0]) << " "
<< static_cast<unsigned>(ipv4_addr[1]) << " "
<< static_cast<unsigned>(ipv4_addr[2]) << " "
Expand Down Expand Up @@ -448,7 +475,10 @@ nixlDocaEngine::nixlDocaInitNotif(const std::string &remote_agent, doca_dev *dev
// Ensure notif list is not added twice for the same peer
notifMap[remote_agent] = notif;
((volatile struct docaNotif *)notif_fill_cpu)->msg_buf = (uintptr_t)notif->recv_addr;
((volatile struct docaNotif *)notif_fill_cpu)->msg_lkey = notif->recv_mr->get_lkey();
((volatile struct docaNotif *)notif_fill_cpu)->msg_lkey =
swap_keys_config ? htonl(notif->recv_mr->get_lkey()) : notif->recv_mr->get_lkey();
((volatile struct docaNotif *)notif_fill_cpu)->keys_are_swapped = swap_keys_config ? 1 : 0;

((volatile struct docaNotif *)notif_fill_cpu)->msg_size = notif->elems_size;
std::atomic_thread_fence(std::memory_order_seq_cst);
((volatile struct docaNotif *)notif_fill_cpu)->qp_gpu =
Expand Down Expand Up @@ -513,8 +543,10 @@ nixlDocaEngine::progressThreadStart() {
/* Set port and IP: */
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(DOCA_RDMA_CM_LOCAL_PORT_SERVER);
server_addr.sin_addr.s_addr = INADDR_ANY; /* listen on any interface */

// server_addr.sin_addr.s_addr = INADDR_ANY;
/* listen on any interface */
std::memcpy(&server_addr.sin_addr.s_addr, ipv4_addr, sizeof(ipv4_addr));
if (server_addr.sin_addr.s_addr == 0) server_addr.sin_addr.s_addr = INADDR_ANY;
/* Bind to the set port and IP: */
if (bind(oob_sock_server, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
NIXL_ERROR << "Couldn't bind to the port " << DOCA_RDMA_CM_LOCAL_PORT_SERVER;
Expand Down Expand Up @@ -929,6 +961,8 @@ nixlDocaEngine::getConnInfo(std::string &str) const {
ss << (int)ipv4_addr[0] << "." << (int)ipv4_addr[1] << "." << (int)ipv4_addr[2] << "."
<< (int)ipv4_addr[3];
str = ss.str();
NIXL_DEBUG << "getConnInfo DOCA: " << str;

return NIXL_SUCCESS;
}

Expand Down Expand Up @@ -1025,6 +1059,16 @@ nixlDocaEngine::registerMem(const nixlBlobDesc &mem,
<< info_delimiter << ((size_t)priv->mr->get_tot_size());
priv->remoteMrStr = ss.str();

uint32_t lkey = priv->mr->get_lkey();
uint32_t rkey = priv->mr->get_rkey();
NIXL_INFO << "GPUNETIO registerMem publish dev " << priv->devId << " addr " << std::showbase
<< std::hex << std::uppercase << (uintptr_t)priv->mr->get_addr() << " len "
<< (uint64_t)priv->mr->get_tot_size() << " lkey " << lkey << " rkey " << rkey
<< std::noshowbase << std::dec;
NIXL_DEBUG << "[dbg] publish raw: dev=" << priv->devId
<< " addr_dec=" << (uintptr_t)priv->mr->get_addr()
<< " len_dec=" << (uint64_t)priv->mr->get_tot_size() << " lkey_dec=" << lkey
<< " rkey_dec=" << rkey;
out = (nixlBackendMD *)priv;

return NIXL_SUCCESS;
Expand All @@ -1043,6 +1087,8 @@ nixl_status_t
nixlDocaEngine::getPublicData(const nixlBackendMD *meta, std::string &str) const {
const nixlDocaPrivateMetadata *priv = (nixlDocaPrivateMetadata *)meta;
str = priv->remoteMrStr;
NIXL_TRACE << "[dbg] getPublicData remoteMrStr=" << str;

return NIXL_SUCCESS;
}

Expand Down Expand Up @@ -1071,10 +1117,27 @@ nixlDocaEngine::loadRemoteMD(const nixlBlobDesc &input,
std::stringstream ss(input.metaInfo.data());
while (std::getline(ss, token, info_delimiter))
tokens.push_back(token);

uint32_t rkey = static_cast<uint32_t>(atoi(tokens[0].c_str()));
uintptr_t addr = static_cast<uintptr_t>(atol(tokens[1].c_str()));
size_t tot_size = static_cast<size_t>(atol(tokens[2].c_str()));
// Parse as unsigned to avoid overflow/truncation (rkeys often exceed INT_MAX)
NIXL_TRACE << "[dbg] loadRemoteMD tokens size=" << tokens.size();
for (size_t i = 0; i < tokens.size(); ++i)
NIXL_TRACE << "[dbg] token[" << i << "]=" << tokens[i];
const char *p0 = tokens[0].c_str();
const char *p1 = tokens[1].c_str();
const char *p2 = tokens[2].c_str();
errno = 0;
unsigned long rkey_ul = strtoul(p0, nullptr, 10);
unsigned long long addr_ull = strtoull(p1, nullptr, 10);
unsigned long long size_ull = strtoull(p2, nullptr, 10);
if (errno != 0) {
NIXL_ERROR << "Failed to parse remote metadata (errno=" << errno << ")";
return NIXL_ERR_INVALID_PARAM;
}
uint32_t rkey = static_cast<uint32_t>(rkey_ul);
uintptr_t addr = static_cast<uintptr_t>(addr_ull);
size_t tot_size = static_cast<size_t>(size_ull);
NIXL_TRACE << "[dbg] parsed remote MD rkey=" << std::showbase << std::hex << std::uppercase
<< rkey << " addr=" << addr << " size=" << (uint64_t)tot_size << std::noshowbase
<< std::dec;

// Empty mmap, filled with imported data
try {
Expand Down Expand Up @@ -1145,6 +1208,8 @@ nixlDocaEngine::prepXfer(const nixl_xfer_op_t &operation,
pos = treq->start_pos;

do {
xferReqRingCpu[pos].keys_are_swapped = swap_keys_config ? 1 : 0;

for (uint32_t idx = 0; idx < lcnt && idx < DOCA_XFER_REQ_SIZE; idx++) {
size_t lsize = local[idx].len;
size_t rsize = remote[idx].len;
Expand All @@ -1153,12 +1218,31 @@ nixlDocaEngine::prepXfer(const nixl_xfer_op_t &operation,
lmd = (nixlDocaPrivateMetadata *)local[idx].metadataP;
rmd = (nixlDocaPublicMetadata *)remote[idx].metadataP;

xferReqRingCpu[pos].lbuf[idx] = (uintptr_t)lmd->mr->get_addr();
xferReqRingCpu[pos].lkey[idx] = (uintptr_t)lmd->mr->get_lkey();
xferReqRingCpu[pos].rbuf[idx] = (uintptr_t)rmd->mr->get_addr();
xferReqRingCpu[pos].rkey[idx] = (uintptr_t)rmd->mr->get_rkey();
uint32_t lkey_host = lmd->mr->get_lkey();
uint32_t rkey_host = rmd->mr->get_rkey();
uint32_t lkey_be = swap_keys_config ? htonl(lkey_host) : lkey_host;
uint32_t rkey_be = swap_keys_config ? htonl(rkey_host) : rkey_host;

// Local buffer: use the descriptor address (within the local MR)
xferReqRingCpu[pos].lbuf[idx] = (uintptr_t)local[idx].addr;
xferReqRingCpu[pos].lkey[idx] = lkey_be;
uintptr_t dbg_rbuf_desc = (uintptr_t)remote[idx].addr;
uintptr_t dbg_rbuf_mr = (uintptr_t)rmd->mr->get_addr();
// Use the published remote MR address (not the initiator-side descriptor address)
xferReqRingCpu[pos].rbuf[idx] = dbg_rbuf_mr;
xferReqRingCpu[pos].rkey[idx] = rkey_be;
xferReqRingCpu[pos].size[idx] = lsize;
xferReqRingCpu[pos].num++;

NIXL_INFO << "GPUNETIO prepXfer queue_pos " << pos << " idx " << idx << " laddr "
<< std::showbase << std::hex << std::uppercase
<< xferReqRingCpu[pos].lbuf[idx] << " lkey " << lkey_host << " lkey_be "
<< lkey_be << " raddr " << xferReqRingCpu[pos].rbuf[idx] << " rkey "
<< rkey_host << " rkey_be " << rkey_be << " size "
<< (uint64_t)xferReqRingCpu[pos].size[idx] << std::noshowbase << std::dec;
NIXL_TRACE << "[dbg] remote_desc_addr=" << std::showbase << std::hex << std::uppercase
<< dbg_rbuf_desc << " remote_mr_addr=" << dbg_rbuf_mr << " used=mr_addr"
<< std::noshowbase << std::dec;
}

xferReqRingCpu[pos].last_rsvd = last_rsvd_flags;
Expand Down Expand Up @@ -1199,7 +1283,9 @@ nixlDocaEngine::prepXfer(const nixl_xfer_op_t &operation,
(notif->send_pi.fetch_add(1) & (notif->elems_num - 1));
xferReqRingCpu[treq->end_pos - 1].msg_sz = newMsg.size();
xferReqRingCpu[treq->end_pos - 1].lbuf_notif = notif_addr;
xferReqRingCpu[treq->end_pos - 1].lkey_notif = notif->send_mr->get_lkey();
uint32_t notif_lkey_host = notif->send_mr->get_lkey();
uint32_t notif_lkey_be = swap_keys_config ? htonl(notif_lkey_host) : notif_lkey_host;
xferReqRingCpu[treq->end_pos - 1].lkey_notif = notif_lkey_be;

memcpy((void *)notif_addr, newMsg.c_str(), newMsg.size());

Expand Down Expand Up @@ -1378,7 +1464,10 @@ nixlDocaEngine::genNotif(const std::string &remote_agent, const std::string &msg

std::lock_guard<std::mutex> lock(notifSendLock);
((volatile struct docaNotif *)notif_send_cpu)->msg_buf = msg_buf;
((volatile struct docaNotif *)notif_send_cpu)->msg_lkey = notif->send_mr->get_lkey();
((volatile struct docaNotif *)notif_send_cpu)->msg_lkey =
swap_keys_config ? htonl(notif->send_mr->get_lkey()) : notif->send_mr->get_lkey();
((volatile struct docaNotif *)notif_send_cpu)->keys_are_swapped = swap_keys_config ? 1 : 0;

((volatile struct docaNotif *)notif_send_cpu)->msg_size = newMsg.size();
std::atomic_thread_fence(std::memory_order_seq_cst);
((volatile struct docaNotif *)notif_send_cpu)->qp_gpu =
Expand Down
6 changes: 5 additions & 1 deletion src/plugins/gpunetio/gpunetio_backend.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -185,6 +185,10 @@ class nixlDocaEngine : public nixlBackendEngine {
std::unordered_map<std::string, int> connMap;
std::unordered_map<std::string, struct nixlDocaNotif *> notifMap;

// Whether to swap (htonl) mkeys before handing to GPU. Default: true.
bool swap_keys_config = true;
// Extra debug dump controls (env-driven)

pthread_t server_thread_id;

class nixlDocaBckndReq : public nixlBackendReqH {
Expand Down
5 changes: 4 additions & 1 deletion src/plugins/gpunetio/gpunetio_backend_aux.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -87,6 +87,7 @@ struct docaXferReqGpu {
uint32_t rkey[DOCA_XFER_REQ_SIZE];
uint16_t num;
uint8_t in_use;
uint8_t keys_are_swapped;
uint32_t conn_idx;
uint32_t has_notif_msg_idx;
uint32_t msg_sz;
Expand Down Expand Up @@ -119,6 +120,8 @@ struct docaXferCompletion {
struct docaNotif {
doca_gpu_dev_verbs_qp *qp_gpu;
uint32_t msg_lkey;
uint8_t keys_are_swapped;
uint8_t _pad_keys[3];
uintptr_t msg_buf;
size_t msg_size;
uint32_t msg_num;
Expand Down
Loading