diff --git a/src/bindings/rust/src/agent.rs b/src/bindings/rust/src/agent.rs index 6f3594b06..00ad05303 100644 --- a/src/bindings/rust/src/agent.rs +++ b/src/bindings/rust/src/agent.rs @@ -337,6 +337,52 @@ impl Agent { } } + /// Gets the local partial metadata as a byte array + /// + /// # Arguments + /// * `descs` - Registration descriptor list to get metadata for + /// * `opt_args` - Optional arguments for getting metadata + /// + /// # Returns + /// A byte array containing the local partial metadata + /// + pub fn get_local_partial_md(&self, descs: &RegDescList, opt_args: Option<&OptArgs>) -> Result, NixlError> { + tracing::trace!("Getting local partial metadata"); + let mut data = std::ptr::null_mut(); + let mut len: usize = 0; + let inner_guard = self.inner.write().unwrap(); + + let status = unsafe { + nixl_capi_get_local_partial_md( + inner_guard.handle.as_ptr(), + descs.handle(), + &mut data as *mut *mut _, + &mut len, + opt_args.map_or(std::ptr::null_mut(), |args| args.inner.as_ptr()), + ) + }; + match status { + NIXL_CAPI_SUCCESS => { + let bytes = unsafe { + let slice = std::slice::from_raw_parts(data as *const u8, len); + let vec = slice.to_vec(); + libc::free(data as *mut libc::c_void); + vec + }; + tracing::trace!(metadata.size = len, "Successfully retrieved local partial metadata"); + Ok(bytes) + } + NIXL_CAPI_ERROR_INVALID_PARAM => { + tracing::error!(error = "invalid_param", "Failed to get local partial metadata"); + Err(NixlError::InvalidParam) + } + _ => { + tracing::error!(error = "backend_error", "Failed to get local partial metadata"); + Err(NixlError::BackendError) + } + } + } + /// Loads remote metadata from a byte slice pub fn load_remote_md(&self, metadata: &[u8]) -> Result { tracing::trace!(metadata.size = metadata.len(), "Loading remote metadata"); @@ -550,6 +596,35 @@ impl Agent { } } + /// Send this agent's partial metadata + /// + /// # Arguments + /// * `descs` - Registration descriptor list to send + /// * `opt_args` - Optional arguments for sending metadata + pub fn send_local_partial_md(&self, descs: &RegDescList, opt_args: Option<&OptArgs>) -> Result<(), NixlError> { + tracing::trace!("Sending local partial metadata to etcd"); + let inner_guard = self.inner.write().unwrap(); + let status = unsafe { + nixl_capi_send_local_partial_md( + inner_guard.handle.as_ptr(), + descs.handle(), + opt_args.map_or(std::ptr::null_mut(), |args| args.inner.as_ptr()), + ) + }; + match status { + NIXL_CAPI_SUCCESS => { + tracing::trace!("Successfully sent local partial metadata to etcd"); + Ok(()) + } + NIXL_CAPI_ERROR_INVALID_PARAM => { + tracing::error!(error = "invalid_param", "Failed to send local partial metadata to etcd"); + Err(NixlError::InvalidParam) + } + _ => Err(NixlError::BackendError) + } + } + + /// Fetch a remote agent's metadata from etcd /// /// Once fetched, the metadata will be loaded and cached locally, enabling @@ -850,6 +925,36 @@ impl Agent { } } + /// Queries the backend for a transfer request + /// + /// # Arguments + /// * `req` - Transfer request handle after `post_xfer_req` + /// + /// # Returns + /// A handle to the backend used for the transfer + /// + /// # Errors + /// Returns a NixlError if the operation fails + pub fn query_xfer_backend(&self, req: &XferRequest) -> Result { + let mut backend = std::ptr::null_mut(); + let inner_guard = self.inner.write().unwrap(); + let status = unsafe { + nixl_capi_query_xfer_backend( + inner_guard.handle.as_ptr(), + req.handle(), + &mut backend + ) + }; + match status { + NIXL_CAPI_SUCCESS => { + Ok(Backend{ inner: NonNull::new(backend).ok_or(NixlError::FailedToCreateBackend)? }) + } + NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam), + _ => Err(NixlError::BackendError), + } + } + + /// Gets notifications from other agents /// /// # Arguments diff --git a/src/bindings/rust/src/lib.rs b/src/bindings/rust/src/lib.rs index 66db2fb4e..21c556081 100644 --- a/src/bindings/rust/src/lib.rs +++ b/src/bindings/rust/src/lib.rs @@ -70,7 +70,9 @@ use bindings::{ nixl_capi_query_mem, nixl_capi_create_query_resp_list, nixl_capi_destroy_query_resp_list, nixl_capi_query_resp_list_size, nixl_capi_query_resp_list_has_value, nixl_capi_query_resp_list_get_params, nixl_capi_prep_xfer_dlist, nixl_capi_release_xfer_dlist_handle, - nixl_capi_make_xfer_req + nixl_capi_make_xfer_req, nixl_capi_get_local_partial_md, + nixl_capi_send_local_partial_md, nixl_capi_query_xfer_backend, nixl_capi_opt_args_set_ip_addr, + nixl_capi_opt_args_set_port }; // Re-export status codes @@ -320,6 +322,29 @@ impl OptArgs { _ => Err(NixlError::BackendError), } } + + /// Set the IP address + /// used in sendLocalMD, fetchRemoteMD, invalidateLocalMD, sendLocalPartialMD. + pub fn set_ip_addr(&mut self, ip_addr: &str) -> Result<(), NixlError> { + let c_str = CString::new(ip_addr).expect("Failed to convert string to CString"); + let status = unsafe { nixl_capi_opt_args_set_ip_addr(self.inner.as_ptr(), c_str.as_ptr()) }; + match status { + NIXL_CAPI_SUCCESS => Ok(()), + NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam), + _ => Err(NixlError::BackendError), + } + } + + /// Set the port + /// used in sendLocalMD, fetchRemoteMD, invalidateLocalMD, sendLocalPartialMD. + pub fn set_port(&mut self, port: u16) -> Result<(), NixlError> { + let status = unsafe { nixl_capi_opt_args_set_port(self.inner.as_ptr(), port) }; + match status { + NIXL_CAPI_SUCCESS => Ok(()), + NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam), + _ => Err(NixlError::BackendError), + } + } } impl Drop for OptArgs { diff --git a/src/bindings/rust/stubs.cpp b/src/bindings/rust/stubs.cpp index 3b77916d5..c172c1e54 100644 --- a/src/bindings/rust/stubs.cpp +++ b/src/bindings/rust/stubs.cpp @@ -68,6 +68,15 @@ nixl_capi_get_local_md(nixl_capi_agent_t agent, void** data, size_t* len) return nixl_capi_stub_abort(); } +nixl_capi_status_t +nixl_capi_get_local_partial_md(nixl_capi_agent_t agent, + nixl_capi_reg_dlist_t descs, + void **data, + size_t *len, + nixl_capi_opt_args_t opt_args) { + return nixl_capi_stub_abort(); +} + nixl_capi_status_t nixl_capi_load_remote_md(nixl_capi_agent_t agent, const void* data, size_t len, char** agent_name) { @@ -219,6 +228,16 @@ nixl_capi_opt_args_get_skip_desc_merge(nixl_capi_opt_args_t args, bool* skip_mer return nixl_capi_stub_abort(); } +nixl_capi_status_t +nixl_capi_opt_args_set_ip_addr(nixl_capi_opt_args_t args, const char *ip_addr) { + return nixl_capi_stub_abort(); +} + +nixl_capi_status_t +nixl_capi_opt_args_set_port(nixl_capi_opt_args_t args, uint16_t port) { + return nixl_capi_stub_abort(); +} + nixl_capi_status_t nixl_capi_params_is_empty(nixl_capi_params_t params, bool* is_empty) { @@ -385,6 +404,13 @@ nixl_capi_get_xfer_status(nixl_capi_agent_t agent, nixl_capi_xfer_req_t req_hndl return nixl_capi_stub_abort(); } +nixl_capi_status_t +nixl_capi_query_xfer_backend(nixl_capi_agent_t agent, + nixl_capi_xfer_req_t req_hndl, + nixl_capi_backend_t *backend) { + return nixl_capi_stub_abort(); +} + nixl_capi_status_t nixl_capi_destroy_xfer_req(nixl_capi_xfer_req_t req) { diff --git a/src/bindings/rust/tests/tests.rs b/src/bindings/rust/tests/tests.rs index 18f452f40..a6e60d3de 100644 --- a/src/bindings/rust/tests/tests.rs +++ b/src/bindings/rust/tests/tests.rs @@ -1232,3 +1232,122 @@ fn test_make_xfer_req_invalid_indices() { assert!(result.is_err_and(|e| matches!(e, NixlError::BackendError)), "Expected InvalidParam for out-of-bounds indices"); } } + +// Tests for get_local_partial_md API +#[test] +fn test_get_local_partial_md_success() { + let (agent, opt_args) = create_agent_with_backend("test_agent") + .expect("Failed to setup agent with backend"); + let _storage_list = create_storage_list(&agent, &opt_args, 10); + // Create a registration descriptor list + let mut reg_descs = RegDescList::new(MemType::Dram) + .expect("Failed to create registration descriptor list"); + reg_descs.add_desc(0x1000, 0x100, 0) + .expect("Failed to add descriptor"); + // Get local partial metadata + let result = agent.get_local_partial_md(®_descs, Some(&opt_args)); + // Should succeed and return metadata + match result { + Ok(metadata) => { + assert!(!metadata.is_empty(), "Metadata should not be empty"); + println!("Partial metadata size: {}", metadata.len()); + } + Err(e) => { + // May fail if no partial metadata exists yet, which is acceptable + assert!( + matches!(e, NixlError::BackendError) || matches!(e, NixlError::InvalidParam), + "Expected BackendError or InvalidParam, got: {:?}", e + ); + } + } +} + +#[test] +fn test_get_local_partial_md_empty_descs() { + let (agent, _) = create_agent_with_backend("test_agent") + .expect("Failed to setup agent with backend"); + // Create empty registration descriptor list + let reg_descs = RegDescList::new(MemType::Dram) + .expect("Failed to create registration descriptor list"); + // Try with empty descriptor list should succeed and return all available backends + let result = agent.get_local_partial_md(®_descs, None); + assert!( + result.is_ok(), + "get_local_partial_md should succeed with empty descriptor list" + ); +} + +// Tests for send_local_partial_md API +#[test] +fn test_send_local_partial_md_success() { + let (agent, opt_args) = create_agent_with_backend("test_agent") + .expect("Failed to setup agent with backend"); + let (agent2, opt_args2) = create_agent_with_backend("test_agent2") + .expect("Failed to setup agent with backend"); + let _storage_list = create_storage_list(&agent, &opt_args, 10); + let _remote_storage_list = create_storage_list(&agent2, &opt_args2, 10); + + // Create a registration descriptor list + let mut reg_descs = RegDescList::new(MemType::Dram) + .expect("Failed to create registration descriptor list"); + reg_descs.add_storage_desc(&_storage_list[0]).expect("Failed to add storage descriptor"); + + // Send local partial metadata + let mut opt_args_temp = OptArgs::new().expect("Failed to create opt args"); + opt_args_temp.set_ip_addr("127.0.0.1").expect("Failed to set ip address"); + let result = agent.send_local_partial_md(®_descs, Some(&opt_args_temp)); + + assert!( + result.is_ok(), + "send_local_partial_md should succeed" + ); +} + +// Tests for query_xfer_backend API +#[test] +fn test_query_xfer_backend_success() { + let (agent1, opt_args) = create_agent_with_backend("agent1").expect("Failed to create agent"); + let (agent2, opt_args_remote) = create_agent_with_backend("agent2").expect("Failed to create agent"); + // Create descriptor lists + let mut storage_list = create_storage_list(&agent1, &opt_args, 1); + let mut remote_storage_list = create_storage_list(&agent2, &opt_args_remote, 1); + { + let local_dlist = create_dlist(&mut storage_list).expect("Failed to create descriptor list"); + let remote_dlist = create_dlist(&mut remote_storage_list).expect("Failed to create descriptor list"); + exchange_metadata(&agent1, &agent2).expect("Failed to exchange metadata"); + // Create transfer request + let xfer_req = agent1.create_xfer_req( + XferOp::Write, + &local_dlist, + &remote_dlist, + "agent2", + None + ).expect("Failed to create transfer request"); + // Query which backend will be used for this transfer + let result: Result = agent1.query_xfer_backend(&xfer_req); + assert!(result.is_ok(), "query_xfer_backend failed with error: {:?}", result.err()); + let backend = result.unwrap(); + println!("Transfer will use backend: {:?}", backend); + } +} +#[test] +fn test_query_xfer_backend_invalid_request() { + let (agent1, opt_args) = create_agent_with_backend("agent1").expect("Failed to create agent"); + let (agent2, opt_args_remote) = create_agent_with_backend("agent2").expect("Failed to create agent"); + // Create descriptor lists + let mut storage_list = create_storage_list(&agent1, &opt_args, 1); + let mut remote_storage_list = create_storage_list(&agent2, &opt_args_remote, 1); + { + let local_dlist = create_dlist(&mut storage_list).expect("Failed to create descriptor list"); + let remote_dlist = create_dlist(&mut remote_storage_list).expect("Failed to create descriptor list"); + // Create transfer request with non-existent remote agent (should fail or succeed) + let xfer_req_result = agent1.create_xfer_req( + XferOp::Write, + &local_dlist, + &remote_dlist, + "non_existent_agent", + None + ); + assert!(xfer_req_result.is_err(), "Transfer request creation should fail for non-existent agent"); + } +} diff --git a/src/bindings/rust/wrapper.cpp b/src/bindings/rust/wrapper.cpp index 29110c705..14b7ba4a2 100644 --- a/src/bindings/rust/wrapper.cpp +++ b/src/bindings/rust/wrapper.cpp @@ -159,6 +159,36 @@ nixl_capi_get_local_md(nixl_capi_agent_t agent, void** data, size_t* len) } } +nixl_capi_status_t +nixl_capi_get_local_partial_md(nixl_capi_agent_t agent, + nixl_capi_reg_dlist_t descs, + void **data, + size_t *len, + nixl_capi_opt_args_t opt_args) { + if (!agent || !descs || !data || !len) { + return NIXL_CAPI_ERROR_INVALID_PARAM; + } + try { + nixl_blob_t blob; + nixl_opt_args_t *args = opt_args ? &opt_args->args : nullptr; + nixl_status_t ret = agent->inner->getLocalPartialMD(*descs->dlist, blob, args); + if (ret != NIXL_SUCCESS) { + return NIXL_CAPI_ERROR_BACKEND; + } + // Allocate memory for the blob data + *data = malloc(blob.size()); + if (!*data) { + return NIXL_CAPI_ERROR_BACKEND; + } + // Copy the data + memcpy(*data, blob.data(), blob.size()); + return ret == NIXL_SUCCESS ? NIXL_CAPI_SUCCESS : NIXL_CAPI_ERROR_BACKEND; + } + catch (...) { + return NIXL_CAPI_ERROR_BACKEND; + } +} + nixl_capi_status_t nixl_capi_load_remote_md(nixl_capi_agent_t agent, const void* data, size_t len, char** agent_name) { @@ -225,6 +255,23 @@ nixl_capi_send_local_md(nixl_capi_agent_t agent, nixl_capi_opt_args_t opt_args) } } +nixl_capi_status_t +nixl_capi_send_local_partial_md(nixl_capi_agent_t agent, + nixl_capi_reg_dlist_t descs, + nixl_capi_opt_args_t opt_args) { + if (!agent || !descs) { + return NIXL_CAPI_ERROR_INVALID_PARAM; + } + try { + nixl_opt_args_t *args = opt_args ? &opt_args->args : nullptr; + nixl_status_t ret = agent->inner->sendLocalPartialMD(*descs->dlist, args); + return ret == NIXL_SUCCESS ? NIXL_CAPI_SUCCESS : NIXL_CAPI_ERROR_BACKEND; + } + catch (...) { + return NIXL_CAPI_ERROR_BACKEND; + } +} + nixl_capi_status_t nixl_capi_fetch_remote_md(nixl_capi_agent_t agent, const char* remote_name, nixl_capi_opt_args_t opt_args) { @@ -618,6 +665,31 @@ nixl_capi_opt_args_get_skip_desc_merge(nixl_capi_opt_args_t args, bool* skip_mer } } +nixl_capi_status_t +nixl_capi_opt_args_set_ip_addr(nixl_capi_opt_args_t args, const char *ip_addr) { + if (!args || !ip_addr) { + return NIXL_CAPI_ERROR_INVALID_PARAM; + } + + try { + args->args.ipAddr.assign(ip_addr); + return NIXL_CAPI_SUCCESS; + } + catch (...) { + return NIXL_CAPI_ERROR_BACKEND; + } +} + +nixl_capi_status_t +nixl_capi_opt_args_set_port(nixl_capi_opt_args_t args, uint16_t port) { + if (!args) { + return NIXL_CAPI_ERROR_INVALID_PARAM; + } + + args->args.port = port; + return NIXL_CAPI_SUCCESS; +} + nixl_capi_status_t nixl_capi_params_is_empty(nixl_capi_params_t params, bool* is_empty) { @@ -1413,6 +1485,28 @@ nixl_capi_get_xfer_status(nixl_capi_agent_t agent, nixl_capi_xfer_req_t req_hndl } } +nixl_capi_status_t +nixl_capi_query_xfer_backend(nixl_capi_agent_t agent, + nixl_capi_xfer_req_t req_hndl, + nixl_capi_backend_t *backend) { + if (!agent || !req_hndl || !backend) { + return NIXL_CAPI_ERROR_INVALID_PARAM; + } + try { + auto backend_handle = new nixl_capi_backend_s; + nixl_status_t ret = agent->inner->queryXferBackend(req_hndl->req, backend_handle->backend); + if (ret != NIXL_SUCCESS) { + delete backend_handle; + return NIXL_CAPI_ERROR_BACKEND; + } + *backend = backend_handle; + return ret == NIXL_SUCCESS ? NIXL_CAPI_SUCCESS : NIXL_CAPI_ERROR_BACKEND; + } + catch (...) { + return NIXL_CAPI_ERROR_BACKEND; + } +} + nixl_capi_status_t nixl_capi_destroy_xfer_req(nixl_capi_xfer_req_t req) { diff --git a/src/bindings/rust/wrapper.h b/src/bindings/rust/wrapper.h index 6ae20be79..8bac7169a 100644 --- a/src/bindings/rust/wrapper.h +++ b/src/bindings/rust/wrapper.h @@ -91,6 +91,14 @@ nixl_capi_status_t nixl_capi_destroy_agent(nixl_capi_agent_t agent); // Get local metadata as a byte array nixl_capi_status_t nixl_capi_get_local_md(nixl_capi_agent_t agent, void** data, size_t* len); +// Get local partial metadata as a byte array +nixl_capi_status_t +nixl_capi_get_local_partial_md(nixl_capi_agent_t agent, + nixl_capi_reg_dlist_t descs, + void **data, + size_t *len, + nixl_capi_opt_args_t opt_args); + // Load remote metadata from a byte array nixl_capi_status_t nixl_capi_load_remote_md(nixl_capi_agent_t agent, const void* data, size_t len, char** agent_name); @@ -106,6 +114,12 @@ nixl_capi_status_t nixl_capi_check_remote_md(nixl_capi_agent_t agent, const char // Send local metadata to etcd nixl_capi_status_t nixl_capi_send_local_md(nixl_capi_agent_t agent, nixl_capi_opt_args_t opt_args); +// Send local partial metadata to etcd +nixl_capi_status_t +nixl_capi_send_local_partial_md(nixl_capi_agent_t agent, + nixl_capi_reg_dlist_t descs, + nixl_capi_opt_args_t opt_args); + // Fetch remote metadata from etcd nixl_capi_status_t nixl_capi_fetch_remote_md(nixl_capi_agent_t agent, const char* remote_name, nixl_capi_opt_args_t opt_args); @@ -142,6 +156,10 @@ nixl_capi_status_t nixl_capi_opt_args_set_has_notif(nixl_capi_opt_args_t args, b nixl_capi_status_t nixl_capi_opt_args_get_has_notif(nixl_capi_opt_args_t args, bool* has_notif); nixl_capi_status_t nixl_capi_opt_args_set_skip_desc_merge(nixl_capi_opt_args_t args, bool skip_merge); nixl_capi_status_t nixl_capi_opt_args_get_skip_desc_merge(nixl_capi_opt_args_t args, bool* skip_merge); +nixl_capi_status_t +nixl_capi_opt_args_set_ip_addr(nixl_capi_opt_args_t args, const char *ip_addr); +nixl_capi_status_t +nixl_capi_opt_args_set_port(nixl_capi_opt_args_t args, uint16_t port); // Parameter access functions nixl_capi_status_t nixl_capi_params_is_empty(nixl_capi_params_t params, bool* is_empty); @@ -220,6 +238,11 @@ nixl_capi_status_t nixl_capi_post_xfer_req( nixl_capi_status_t nixl_capi_get_xfer_status(nixl_capi_agent_t agent, nixl_capi_xfer_req_t req_hndl); +nixl_capi_status_t +nixl_capi_query_xfer_backend(nixl_capi_agent_t agent, + nixl_capi_xfer_req_t req_hndl, + nixl_capi_backend_t *backend); + nixl_capi_status_t nixl_capi_release_xfer_req(nixl_capi_agent_t agent, nixl_capi_xfer_req_t req); nixl_capi_status_t nixl_capi_destroy_xfer_req(nixl_capi_xfer_req_t req);