Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions integration_tests/src/sim_bus.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::{Arc, Mutex};

use zencan_common::messages::CanMessage;
use zencan_common::traits::{AsyncCanReceiver, AsyncCanSender};
use zencan_common::traits::{AsyncCanReceiver, AsyncCanSender, CanSendError};
use zencan_node::NodeMbox;

use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -63,8 +63,25 @@ pub struct SimBusSender<'a> {
external_channels: Arc<Mutex<Vec<UnboundedSender<CanMessage>>>>,
}

/// Create an error type for sim bus sender.
///
/// The sender can't fail, so this type is never instantiated
#[derive(Debug)]
pub struct SimBusSendError(());

impl CanSendError for SimBusSendError {
fn into_can_message(self) -> CanMessage {
panic!("uninstantiable")
}

fn message(&self) -> String {
String::new()
}
}

impl AsyncCanSender for SimBusSender<'_> {
async fn send(&mut self, msg: CanMessage) -> Result<(), CanMessage> {
type Error = SimBusSendError;
async fn send(&mut self, msg: CanMessage) -> Result<(), SimBusSendError> {
// Send to nodes on the bus
for ns in self.node_states.lock().unwrap().iter() {
// It doesn't matter if store message fails; that just means the node did not
Expand Down
17 changes: 17 additions & 0 deletions zencan-cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Changelog

Human-friendly documentation of releases and what's changed in them for the zencan-cli crate.

## [Unreleased]

### Fixed

- Panic during scan when socketcan fails to send messages

## [v0.0.1] - 2025-10-09

The first release!

### Added

- Everything!
15 changes: 10 additions & 5 deletions zencan-cli/src/bin/zencan-cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,17 @@ fn convert_read_bytes_to_string(

async fn run_command<S: AsyncCanSender + Sync + Send>(cmd: Commands, manager: &mut BusManager<S>) {
match cmd {
Commands::Scan => {
let nodes = manager.scan_nodes().await;
for n in &nodes {
println!("{n}");
Commands::Scan => match manager.scan_nodes().await {
Ok(nodes) => {
for n in &nodes {
println!("{n}");
}
}
}
Err(e) => {
println!("Error during scan: ");
println!("{e}");
}
},
Commands::Info => {
let nodes = manager.node_list().await;
for n in &nodes {
Expand Down
1 change: 1 addition & 0 deletions zencan-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Human-friendly documentation of releases and what's changed in them for the zenc
- Default SDO client timeout changed from 100ms to 150ms
- NodeConfiguration is moved into `common`
- The `cob` attribute on `node_configuration::PdoConfig` is renamed to `cob_id`.
- Better error handling on CAN send errors and `BusManager::scan`.

### Fixed

Expand Down
29 changes: 14 additions & 15 deletions zencan-client/src/bus_manager/bus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,18 @@ impl NodeInfo {
async fn scan_node<S: AsyncCanSender + Sync + Send>(
node_id: u8,
clients: &SdoClientMutex<S>,
) -> Option<NodeInfo> {
) -> Result<Option<NodeInfo>, SdoClientError> {
let mut sdo_client = clients.lock(node_id);
log::info!("Scanning Node {node_id}");

let identity = match sdo_client.read_identity().await {
Ok(id) => Some(id),
// A no response here is not really an error, it just indicates the node is not present
Err(SdoClientError::NoResponse) => {
log::info!("No response from node {node_id}");
return None;
}
Err(e) => {
// A server responded, but we failed to read identity. An unexpected situation, as all
// nodes should implement the identity object
log::error!("SDO Abort Response scanning node {node_id} identity: {e:?}");
None
return Ok(None);
}
Err(e) => return Err(e),
};
let device_name = match sdo_client.read_device_name().await {
Ok(s) => Some(s),
Expand All @@ -146,15 +143,15 @@ async fn scan_node<S: AsyncCanSender + Sync + Send>(
None
}
};
Some(NodeInfo {
Ok(Some(NodeInfo {
node_id,
identity,
device_name,
software_version,
hardware_version,
nmt_state: None,
last_seen: Instant::now(),
})
}))
}

/// Result struct for reading PDO configuration from a single node
Expand Down Expand Up @@ -401,11 +398,11 @@ impl<S: AsyncCanSender + Sync + Send> BusManager<S> {
/// - Device Name
/// - Software Version
/// - Hardware Version
pub async fn scan_nodes(&mut self) -> Vec<NodeInfo> {
pub async fn scan_nodes(&mut self) -> Result<Vec<NodeInfo>, SdoClientError> {
const N_PARALLEL: usize = 10;

let ids = Vec::from_iter(1..128u8);
let mut nodes = Vec::new();
let mut nodes: Vec<NodeInfo> = Vec::new();

let mut chunks = Vec::new();
for chunk in ids.chunks(128 / N_PARALLEL) {
Expand All @@ -424,9 +421,11 @@ impl<S: AsyncCanSender + Sync + Send> BusManager<S> {
});
}

// Collect the results from batches. If any errors occurred, fail.
let results = join_all(futures).await;
for r in results {
nodes.extend(r.into_iter().flatten());
let r: Result<Vec<Option<NodeInfo>>, SdoClientError> = r.into_iter().collect();
nodes.extend(r?.into_iter().flatten());
}

let mut node_map = self.nodes.lock().await;
Expand All @@ -443,10 +442,10 @@ impl<S: AsyncCanSender + Sync + Send> BusManager<S> {
// 1) We only included nodes which responded just now to the scan, but
// 2) we also display the latest NMT state for that node, which comes from the heartbeat
// rather than the scan
nodes
Ok(nodes
.iter()
.map(|n| node_map.get(&n.node_id).unwrap().clone())
.collect()
.collect())
}

/// Find all unconfigured devices on the bus
Expand Down
5 changes: 3 additions & 2 deletions zencan-client/src/bus_manager/shared_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ impl<S: AsyncCanSender> SharedSender<S> {
Self { inner: sender }
}

async fn send(&mut self, msg: CanMessage) -> Result<(), CanMessage> {
async fn send(&mut self, msg: CanMessage) -> Result<(), S::Error> {
let mut inner = self.inner.lock().await;
inner.send(msg).await
}
}

impl<S: AsyncCanSender> AsyncCanSender for SharedSender<S> {
type Error = S::Error;
fn send(
&mut self,
msg: CanMessage,
) -> impl core::future::Future<Output = Result<(), CanMessage>> {
) -> impl core::future::Future<Output = Result<(), Self::Error>> {
self.send(msg)
}
}
58 changes: 48 additions & 10 deletions zencan-client/src/sdo_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use zencan_common::{
node_configuration::PdoConfig,
pdo::PdoMapping,
sdo::{AbortCode, BlockSegment, SdoRequest, SdoResponse},
traits::{AsyncCanReceiver, AsyncCanSender},
traits::{AsyncCanReceiver, AsyncCanSender, CanSendError as _},
};

const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(150);
Expand Down Expand Up @@ -82,8 +82,11 @@ pub enum SdoClientError {
/// An SDO upload response had a size that did not match the expected size
UnexpectedSize,
/// Failed to write a message to the socket
#[snafu(display("Error sending CAN message"))]
SocketSendFailed,
#[snafu(display("Failed to send CAN message: {message}"))]
SocketSendFailed {
/// A string describing the error reason
message: String,
},
/// An SDO server shrunk the block size while requesting retransmission
///
/// Hopefully no node will ever do this, but it's a possible corner case, since servers are
Expand Down Expand Up @@ -176,7 +179,12 @@ impl<S: AsyncCanSender, R: AsyncCanReceiver> SdoClient<S, R> {
// Do an expedited transfer
let msg =
SdoRequest::expedited_download(index, sub, data).to_can_message(self.req_cob_id);
self.sender.send(msg).await.unwrap(); // TODO: Expect errors
self.sender.send(msg).await.map_err(|e| {
SocketSendFailedSnafu {
message: e.message(),
}
.build()
})?;

let resp = self.wait_for_response(self.timeout).await?;
match_response!(
Expand All @@ -189,7 +197,12 @@ impl<S: AsyncCanSender, R: AsyncCanReceiver> SdoClient<S, R> {
} else {
let msg = SdoRequest::initiate_download(index, sub, Some(data.len() as u32))
.to_can_message(self.req_cob_id);
self.sender.send(msg).await.unwrap();
self.sender.send(msg).await.map_err(|e| {
SocketSendFailedSnafu {
message: e.message(),
}
.build()
})?;

let resp = self.wait_for_response(self.timeout).await?;
match_response!(
Expand Down Expand Up @@ -244,7 +257,12 @@ impl<S: AsyncCanSender, R: AsyncCanReceiver> SdoClient<S, R> {
let mut read_buf = Vec::new();

let msg = SdoRequest::initiate_upload(index, sub).to_can_message(self.req_cob_id);
self.sender.send(msg).await.unwrap();
self.sender.send(msg).await.map_err(|e| {
SocketSendFailedSnafu {
message: e.message(),
}
.build()
})?;

let resp = self.wait_for_response(self.timeout).await?;

Expand Down Expand Up @@ -277,7 +295,12 @@ impl<S: AsyncCanSender, R: AsyncCanReceiver> SdoClient<S, R> {
let msg =
SdoRequest::upload_segment_request(toggle).to_can_message(self.req_cob_id);

self.sender.send(msg).await.unwrap();
self.sender.send(msg).await.map_err(|e| {
SocketSendFailedSnafu {
message: e.message(),
}
.build()
})?;

let resp = self.wait_for_response(self.timeout).await?;
match_response!(
Expand Down Expand Up @@ -324,7 +347,12 @@ impl<S: AsyncCanSender, R: AsyncCanReceiver> SdoClient<S, R> {
.to_can_message(self.req_cob_id),
)
.await
.map_err(|_| SocketSendFailedSnafu {}.build())?;
.map_err(|e| {
SocketSendFailedSnafu {
message: e.message(),
}
.build()
})?;

let resp = self.wait_for_response(self.timeout).await?;

Expand Down Expand Up @@ -371,7 +399,12 @@ impl<S: AsyncCanSender, R: AsyncCanReceiver> SdoClient<S, R> {
self.sender
.send(segment.to_can_message(self.req_cob_id))
.await
.map_err(|_| SocketSendFailedSnafu.build())?;
.map_err(|e| {
SocketSendFailedSnafu {
message: e.message(),
}
.build()
})?;

// Expect a confirmation message after blksize segments are sent, or after sending the
// complete flag
Expand Down Expand Up @@ -424,7 +457,12 @@ impl<S: AsyncCanSender, R: AsyncCanReceiver> SdoClient<S, R> {
self.sender
.send(SdoRequest::EndBlockDownload { n, crc }.to_can_message(self.req_cob_id))
.await
.map_err(|_| SocketSendFailedSnafu.build())?;
.map_err(|e| {
SocketSendFailedSnafu {
message: e.message(),
}
.build()
})?;

let resp = self.wait_for_response(self.timeout).await?;
match_response!(
Expand Down
31 changes: 23 additions & 8 deletions zencan-common/src/socketcan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use crate::{
messages::{CanError, CanId, CanMessage},
traits::{AsyncCanReceiver, AsyncCanSender},
traits::{AsyncCanReceiver, AsyncCanSender, CanSendError},
};
use snafu::{ResultExt, Snafu};
use socketcan::{CanFrame, CanSocket, EmbeddedFrame, Frame, ShouldRetry, Socket};
Expand Down Expand Up @@ -53,6 +53,22 @@ pub enum ReceiveError {
Can { source: CanError },
}

#[derive(Debug, Snafu)]
pub struct SendError {
source: socketcan::IoError,
message: CanMessage,
}

impl CanSendError for SendError {
fn into_can_message(self) -> CanMessage {
self.message
}

fn message(&self) -> String {
self.source.to_string()
}
}

/// Create an Async socket around a socketcan CanSocket. This is just a reimplemenation of the tokio
/// socket in the `socketcan` crate, but with support for `try_read_frame` and `try_write_frame`
/// added.
Expand Down Expand Up @@ -128,15 +144,14 @@ pub struct SocketCanSender {
}

impl AsyncCanSender for SocketCanSender {
async fn send(&mut self, msg: CanMessage) -> Result<(), CanMessage> {
type Error = SendError;
async fn send(&mut self, msg: CanMessage) -> Result<(), Self::Error> {
let socketcan_frame = zencan_message_to_socket_frame(msg);

let result = self.socket.write_frame(&socketcan_frame).await;
if result.is_err() {
Err(msg)
} else {
Ok(())
}
self.socket
.write_frame(&socketcan_frame)
.await
.context(SendSnafu { message: msg })
}
}

Expand Down
18 changes: 17 additions & 1 deletion zencan-common/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,27 @@ pub trait CanReceiver {

/// An async CAN sender trait
pub trait AsyncCanSender: Send {
/// Error type returned by sender
type Error: CanSendError;
/// Send a message to the bus
fn send(
&mut self,
msg: CanMessage,
) -> impl core::future::Future<Output = Result<(), CanMessage>>;
) -> impl core::future::Future<Output = Result<(), Self::Error>>;
}

/// A trait for CAN errors which may come from different types of interfaces
///
/// On no_std, all the error can do is return the unsent frame. With `std`, it can convert any
/// underlying errors into a String.
pub trait CanSendError: core::fmt::Debug {
/// Convert the error into the undelivered message
fn into_can_message(self) -> CanMessage;

/// Get a string describing the error
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
fn message(&self) -> String;
}

/// An async CAN receiver trait
Expand Down