Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions src/bindings/rust/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,52 @@ impl Agent {
}
}

/// Gets the local partial metadata as a byte array
///
/// # Arguments
/// * `descs` - Registration descriptor list to get metadata for
/// * `opt_args` - Optional arguments for getting metadata
///
/// # Returns
/// A byte array containing the local partial metadata
///
pub fn get_local_partial_md(&self, descs: &RegDescList, opt_args: Option<&OptArgs>) -> Result<Vec<u8>, NixlError> {
tracing::trace!("Getting local partial metadata");
let mut data = std::ptr::null_mut();
let mut len: usize = 0;
let inner_guard = self.inner.write().unwrap();

let status = unsafe {
nixl_capi_get_local_partial_md(
inner_guard.handle.as_ptr(),
descs.handle(),
&mut data as *mut *mut _,
&mut len,
opt_args.map_or(std::ptr::null_mut(), |args| args.inner.as_ptr()),
)
};
match status {
NIXL_CAPI_SUCCESS => {
let bytes = unsafe {
let slice = std::slice::from_raw_parts(data as *const u8, len);
let vec = slice.to_vec();
libc::free(data as *mut libc::c_void);
vec
};
tracing::trace!(metadata.size = len, "Successfully retrieved local partial metadata");
Ok(bytes)
}
NIXL_CAPI_ERROR_INVALID_PARAM => {
tracing::error!(error = "invalid_param", "Failed to get local partial metadata");
Err(NixlError::InvalidParam)
}
_ => {
tracing::error!(error = "backend_error", "Failed to get local partial metadata");
Err(NixlError::BackendError)
}
}
}

/// Loads remote metadata from a byte slice
pub fn load_remote_md(&self, metadata: &[u8]) -> Result<String, NixlError> {
tracing::trace!(metadata.size = metadata.len(), "Loading remote metadata");
Expand Down Expand Up @@ -550,6 +596,35 @@ impl Agent {
}
}

/// Send this agent's partial metadata
///
/// # Arguments
/// * `descs` - Registration descriptor list to send
/// * `opt_args` - Optional arguments for sending metadata
pub fn send_local_partial_md(&self, descs: &RegDescList, opt_args: Option<&OptArgs>) -> Result<(), NixlError> {
tracing::trace!("Sending local partial metadata to etcd");
let inner_guard = self.inner.write().unwrap();
let status = unsafe {
nixl_capi_send_local_partial_md(
inner_guard.handle.as_ptr(),
descs.handle(),
opt_args.map_or(std::ptr::null_mut(), |args| args.inner.as_ptr()),
)
};
match status {
NIXL_CAPI_SUCCESS => {
tracing::trace!("Successfully sent local partial metadata to etcd");
Ok(())
}
NIXL_CAPI_ERROR_INVALID_PARAM => {
tracing::error!(error = "invalid_param", "Failed to send local partial metadata to etcd");
Err(NixlError::InvalidParam)
}
_ => Err(NixlError::BackendError)
}
}


/// Fetch a remote agent's metadata from etcd
///
/// Once fetched, the metadata will be loaded and cached locally, enabling
Expand Down Expand Up @@ -850,6 +925,36 @@ impl Agent {
}
}

/// Queries the backend for a transfer request
///
/// # Arguments
/// * `req` - Transfer request handle after `post_xfer_req`
///
/// # Returns
/// A handle to the backend used for the transfer
///
/// # Errors
/// Returns a NixlError if the operation fails
pub fn query_xfer_backend(&self, req: &XferRequest) -> Result<Backend, NixlError> {
let mut backend = std::ptr::null_mut();
let inner_guard = self.inner.write().unwrap();
let status = unsafe {
nixl_capi_query_xfer_backend(
inner_guard.handle.as_ptr(),
req.handle(),
&mut backend
)
};
match status {
NIXL_CAPI_SUCCESS => {
Ok(Backend{ inner: NonNull::new(backend).ok_or(NixlError::FailedToCreateBackend)? })
}
NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
_ => Err(NixlError::BackendError),
}
}


/// Gets notifications from other agents
///
/// # Arguments
Expand Down
27 changes: 26 additions & 1 deletion src/bindings/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ use bindings::{
nixl_capi_query_mem, nixl_capi_create_query_resp_list, nixl_capi_destroy_query_resp_list,
nixl_capi_query_resp_list_size, nixl_capi_query_resp_list_has_value,
nixl_capi_query_resp_list_get_params, nixl_capi_prep_xfer_dlist, nixl_capi_release_xfer_dlist_handle,
nixl_capi_make_xfer_req
nixl_capi_make_xfer_req, nixl_capi_get_local_partial_md,
nixl_capi_send_local_partial_md, nixl_capi_query_xfer_backend, nixl_capi_opt_args_set_ip_addr,
nixl_capi_opt_args_set_port
};

// Re-export status codes
Expand Down Expand Up @@ -320,6 +322,29 @@ impl OptArgs {
_ => Err(NixlError::BackendError),
}
}

/// Set the IP address
/// used in sendLocalMD, fetchRemoteMD, invalidateLocalMD, sendLocalPartialMD.
pub fn set_ip_addr(&mut self, ip_addr: &str) -> Result<(), NixlError> {
let c_str = CString::new(ip_addr).expect("Failed to convert string to CString");
let status = unsafe { nixl_capi_opt_args_set_ip_addr(self.inner.as_ptr(), c_str.as_ptr()) };
match status {
NIXL_CAPI_SUCCESS => Ok(()),
NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
_ => Err(NixlError::BackendError),
}
}

/// Set the port
/// used in sendLocalMD, fetchRemoteMD, invalidateLocalMD, sendLocalPartialMD.
pub fn set_port(&mut self, port: u16) -> Result<(), NixlError> {
let status = unsafe { nixl_capi_opt_args_set_port(self.inner.as_ptr(), port) };
match status {
NIXL_CAPI_SUCCESS => Ok(()),
NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
_ => Err(NixlError::BackendError),
}
}
}

impl Drop for OptArgs {
Expand Down
26 changes: 26 additions & 0 deletions src/bindings/rust/stubs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ nixl_capi_get_local_md(nixl_capi_agent_t agent, void** data, size_t* len)
return nixl_capi_stub_abort();
}

nixl_capi_status_t
nixl_capi_get_local_partial_md(nixl_capi_agent_t agent,
nixl_capi_reg_dlist_t descs,
void **data,
size_t *len,
nixl_capi_opt_args_t opt_args) {
return nixl_capi_stub_abort();
}

nixl_capi_status_t
nixl_capi_load_remote_md(nixl_capi_agent_t agent, const void* data, size_t len, char** agent_name)
{
Expand Down Expand Up @@ -219,6 +228,16 @@ nixl_capi_opt_args_get_skip_desc_merge(nixl_capi_opt_args_t args, bool* skip_mer
return nixl_capi_stub_abort();
}

nixl_capi_status_t
nixl_capi_opt_args_set_ip_addr(nixl_capi_opt_args_t args, const char *ip_addr) {
return nixl_capi_stub_abort();
}

nixl_capi_status_t
nixl_capi_opt_args_set_port(nixl_capi_opt_args_t args, uint16_t port) {
return nixl_capi_stub_abort();
}

nixl_capi_status_t
nixl_capi_params_is_empty(nixl_capi_params_t params, bool* is_empty)
{
Expand Down Expand Up @@ -387,6 +406,13 @@ nixl_capi_get_xfer_status(nixl_capi_agent_t agent, nixl_capi_xfer_req_t req_hndl
return nixl_capi_stub_abort();
}

nixl_capi_status_t
nixl_capi_query_xfer_backend(nixl_capi_agent_t agent,
nixl_capi_xfer_req_t req_hndl,
nixl_capi_backend_t *backend) {
return nixl_capi_stub_abort();
}

nixl_capi_status_t
nixl_capi_destroy_xfer_req(nixl_capi_xfer_req_t req)
{
Expand Down
121 changes: 120 additions & 1 deletion src/bindings/rust/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ fn test_prep_xfer_dlist_success() {

// 2. Create memory regions and register them
let mut storage_list = create_storage_list(&local_agent, &opt_args, DLIST_SIZE);
let remote_storage_list = create_storage_list(&remote_agent, &opt_args_remote, DLIST_SIZE);
let _remote_storage_list = create_storage_list(&remote_agent, &opt_args_remote, DLIST_SIZE);

{
// 3. Create transfer descriptor list
Expand Down Expand Up @@ -1320,3 +1320,122 @@ fn test_make_xfer_req_invalid_indices() {
assert!(result.is_err_and(|e| matches!(e, NixlError::BackendError)), "Expected InvalidParam for out-of-bounds indices");
}
}

// Tests for get_local_partial_md API
#[test]
fn test_get_local_partial_md_success() {
let (agent, opt_args) = create_agent_with_backend("test_agent")
.expect("Failed to setup agent with backend");
let _storage_list = create_storage_list(&agent, &opt_args, 10);
// Create a registration descriptor list
let mut reg_descs = RegDescList::new(MemType::Dram, false)
.expect("Failed to create registration descriptor list");
reg_descs.add_desc(0x1000, 0x100, 0)
.expect("Failed to add descriptor");
// Get local partial metadata
let result = agent.get_local_partial_md(&reg_descs, Some(&opt_args));
// Should succeed and return metadata
match result {
Ok(metadata) => {
assert!(!metadata.is_empty(), "Metadata should not be empty");
println!("Partial metadata size: {}", metadata.len());
}
Err(e) => {
// May fail if no partial metadata exists yet, which is acceptable
assert!(
matches!(e, NixlError::BackendError) || matches!(e, NixlError::InvalidParam),
"Expected BackendError or InvalidParam, got: {:?}", e
);
}
}
}

#[test]
fn test_get_local_partial_md_empty_descs() {
let (agent, _) = create_agent_with_backend("test_agent")
.expect("Failed to setup agent with backend");
// Create empty registration descriptor list
let reg_descs = RegDescList::new(MemType::Dram, false)
.expect("Failed to create registration descriptor list");
// Try with empty descriptor list should succeed and return all available backends
let result = agent.get_local_partial_md(&reg_descs, None);
assert!(
result.is_ok(),
"get_local_partial_md should succeed with empty descriptor list"
);
}

// Tests for send_local_partial_md API
#[test]
fn test_send_local_partial_md_success() {
let (agent, opt_args) = create_agent_with_backend("test_agent")
.expect("Failed to setup agent with backend");
let (agent2, opt_args2) = create_agent_with_backend("test_agent2")
.expect("Failed to setup agent with backend");
let _storage_list = create_storage_list(&agent, &opt_args, 10);
let _remote_storage_list = create_storage_list(&agent2, &opt_args2, 10);

// Create a registration descriptor list
let mut reg_descs = RegDescList::new(MemType::Dram, false)
.expect("Failed to create registration descriptor list");
reg_descs.add_storage_desc(&_storage_list[0]).expect("Failed to add storage descriptor");

// Send local partial metadata
let mut opt_args_temp = OptArgs::new().expect("Failed to create opt args");
opt_args_temp.set_ip_addr("127.0.0.1").expect("Failed to set ip address");
let result = agent.send_local_partial_md(&reg_descs, Some(&opt_args_temp));

assert!(
result.is_ok(),
"send_local_partial_md should succeed"
);
}

// Tests for query_xfer_backend API
#[test]
fn test_query_xfer_backend_success() {
let (agent1, opt_args) = create_agent_with_backend("agent1").expect("Failed to create agent");
let (agent2, opt_args_remote) = create_agent_with_backend("agent2").expect("Failed to create agent");
// Create descriptor lists
let mut storage_list = create_storage_list(&agent1, &opt_args, 1);
let mut remote_storage_list = create_storage_list(&agent2, &opt_args_remote, 1);
{
let local_dlist = create_dlist(&mut storage_list).expect("Failed to create descriptor list");
let remote_dlist = create_dlist(&mut remote_storage_list).expect("Failed to create descriptor list");
exchange_metadata(&agent1, &agent2).expect("Failed to exchange metadata");
// Create transfer request
let xfer_req = agent1.create_xfer_req(
XferOp::Write,
&local_dlist,
&remote_dlist,
"agent2",
None
).expect("Failed to create transfer request");
// Query which backend will be used for this transfer
let result: Result<Backend, NixlError> = agent1.query_xfer_backend(&xfer_req);
assert!(result.is_ok(), "query_xfer_backend failed with error: {:?}", result.err());
let backend = result.unwrap();
println!("Transfer will use backend: {:?}", backend);
}
}
#[test]
fn test_query_xfer_backend_invalid_request() {
let (agent1, opt_args) = create_agent_with_backend("agent1").expect("Failed to create agent");
let (agent2, opt_args_remote) = create_agent_with_backend("agent2").expect("Failed to create agent");
// Create descriptor lists
let mut storage_list = create_storage_list(&agent1, &opt_args, 1);
let mut remote_storage_list = create_storage_list(&agent2, &opt_args_remote, 1);
{
let local_dlist = create_dlist(&mut storage_list).expect("Failed to create descriptor list");
let remote_dlist = create_dlist(&mut remote_storage_list).expect("Failed to create descriptor list");
// Create transfer request with non-existent remote agent (should fail or succeed)
let xfer_req_result = agent1.create_xfer_req(
XferOp::Write,
&local_dlist,
&remote_dlist,
"non_existent_agent",
None
);
assert!(xfer_req_result.is_err(), "Transfer request creation should fail for non-existent agent");
}
}
Loading