Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,227 changes: 520 additions & 707 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 7 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ derive_more = { version = "2.0.1", features = [
ed25519-dalek = { version = "3.0.0-pre.1", features = ["serde", "rand_core"] }
hex = "0.4.3"
indexmap = "2.0"
iroh-metrics = { version = "0.36", default-features = false }
iroh-base = { version = "0.94", default-features = false, features = ["key"] }
iroh-metrics = { version = "0.37", default-features = false }
iroh-base = { version = "0.95", default-features = false, features = ["key"] }
n0-future = "0.3"
postcard = { version = "1", default-features = false, features = [
"alloc",
Expand All @@ -62,18 +62,16 @@ serde = { version = "1.0.164", features = ["derive"] }
futures-lite = { version = "2.3", optional = true }
futures-concurrency = { version = "7.6.1", optional = true }
futures-util = { version = "0.3.30", optional = true }
iroh = { version = "0.94", default-features = false, optional = true }
iroh = { version = "0.95", default-features = false, optional = true }
tokio = { version = "1", optional = true, features = ["io-util", "sync"] }
tokio-util = { version = "0.7.12", optional = true, features = ["codec"] }
tracing = "0.1"
irpc = { version = "0.10.0", optional = true, default-features = false, features = [
irpc = { version = "0.11.0", optional = true, default-features = false, features = [
"derive",
"stream",
"spans",
] }
n0-snafu = { version = "0.2.2", optional = true }
nested_enum_utils = { version = "0.2.2", optional = true }
snafu = { version = "0.8.5", features = ["rust_1_81"], optional = true }
n0-error = { version = "0.1", features = ["anyhow"] }

# rpc dependencies (optional)
quinn = { package = "iroh-quinn", version = "0.14.0", optional = true }
Expand All @@ -84,7 +82,7 @@ humantime-serde = { version = "1.1.1", optional = true }

# simulator dependencies (optional)
clap = { version = "4", features = ["derive"], optional = true }
toml = { version = "0.8.20", optional = true }
toml = { version = "0.9.8", optional = true }
tracing-subscriber = { version = "0.3", features = [
"env-filter",
], optional = true }
Expand All @@ -104,7 +102,7 @@ tokio = { version = "1", features = [
] }
clap = { version = "4", features = ["derive"] }
humantime-serde = { version = "1.1.1" }
iroh = { version = "0.94", default-features = false, features = [
iroh = { version = "0.95", default-features = false, features = [
"metrics",
"test-utils",
] }
Expand All @@ -125,17 +123,11 @@ net = [
"dep:tokio-util",
"dep:futures-util",
"dep:futures-concurrency",
"dep:nested_enum_utils",
"dep:n0-snafu",
"dep:snafu",
]
rpc = [
"dep:irpc",
"dep:tokio",
"dep:quinn",
"dep:nested_enum_utils",
"dep:n0-snafu",
"dep:snafu",
"irpc/rpc",
"irpc/quinn_endpoint_setup",
]
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ Here is a basic example of how to set up `iroh-gossip` with `iroh`:
```rust,no_run
use iroh::{protocol::Router, Endpoint, EndpointId};
use iroh_gossip::{api::Event, Gossip, TopicId};
use n0_error::{Result, StdResultExt};
use n0_future::StreamExt;
use n0_snafu::ResultExt;

#[tokio::main]
async fn main() -> n0_snafu::Result<()> {
async fn main() -> Result<()> {
// create an iroh endpoint that includes the standard discovery mechanisms
// we've built at number0
let endpoint = Endpoint::bind().await?;
Expand Down Expand Up @@ -65,7 +65,7 @@ async fn main() -> n0_snafu::Result<()> {
}

// clean shutdown makes sure that other peers are notified that you went offline
router.shutdown().await.e()?;
router.shutdown().await.std_context("shutdown router")?;
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ name = "ring"
[[licenses.clarify.license-files]]
hash = 3171872035
path = "LICENSE"

[sources]
33 changes: 17 additions & 16 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ use iroh_gossip::{
net::{Gossip, GOSSIP_ALPN},
proto::TopicId,
};
use n0_error::{bail_any, AnyError, Result, StdResultExt};
use n0_future::task;
use n0_snafu::{Result, ResultExt};
use serde::{Deserialize, Serialize};
use serde_byte_array::ByteArray;
use snafu::whatever;

/// Chat over iroh-gossip
///
Expand Down Expand Up @@ -105,9 +104,7 @@ async fn main() -> Result<()> {
(false, None) => RelayMode::Default,
(false, Some(url)) => RelayMode::Custom(url.into()),
(true, None) => RelayMode::Disabled,
(true, Some(_)) => {
whatever!("You cannot set --no-relay and --relay at the same time")
}
(true, Some(_)) => bail_any!("You cannot set --no-relay and --relay at the same time"),
};
println!("> using relay servers: {}", fmt_relay_mode(&relay_mode));

Expand Down Expand Up @@ -184,7 +181,7 @@ async fn main() -> Result<()> {
}

// shutdown
router.shutdown().await.e()?;
router.shutdown().await.anyerr()?;

Ok(())
}
Expand Down Expand Up @@ -216,8 +213,8 @@ fn input_loop(line_tx: tokio::sync::mpsc::Sender<String>) -> Result<()> {
let mut buffer = String::new();
let stdin = std::io::stdin(); // We get `Stdin` here.
loop {
stdin.read_line(&mut buffer).e()?;
line_tx.blocking_send(buffer.clone()).e()?;
stdin.read_line(&mut buffer).anyerr()?;
line_tx.blocking_send(buffer.clone()).anyerr()?;
buffer.clear();
}
}
Expand All @@ -234,27 +231,31 @@ struct SignedMessage {

impl SignedMessage {
pub fn verify_and_decode(bytes: &[u8]) -> Result<(PublicKey, Message)> {
let signed_message: Self = postcard::from_bytes(bytes).e()?;
let signed_message: Self =
postcard::from_bytes(bytes).std_context("decode signed message")?;
let key: PublicKey = signed_message.from;
key.verify(
&signed_message.data,
&iroh::Signature::from_bytes(&signed_message.signature),
)
.e()?;
let message: Message = postcard::from_bytes(&signed_message.data).e()?;
.std_context("verify signature")?;
let message: Message =
postcard::from_bytes(&signed_message.data).std_context("decode message")?;
Ok((signed_message.from, message))
}

pub fn sign_and_encode(secret_key: &SecretKey, message: &Message) -> Result<Bytes> {
let data: Bytes = postcard::to_stdvec(&message).e()?.into();
let data: Bytes = postcard::to_stdvec(&message)
.std_context("encode message")?
.into();
let signature = secret_key.sign(&data);
let from: PublicKey = secret_key.public();
let signed_message = Self {
from,
data,
signature: ByteArray::new(signature.to_bytes()),
};
let encoded = postcard::to_stdvec(&signed_message).e()?;
let encoded = postcard::to_stdvec(&signed_message).std_context("encode signed message")?;
Ok(encoded.into())
}
}
Expand All @@ -273,7 +274,7 @@ struct Ticket {
impl Ticket {
/// Deserializes from bytes.
fn from_bytes(bytes: &[u8]) -> Result<Self> {
postcard::from_bytes(bytes).e()
postcard::from_bytes(bytes).std_context("decode ticket")
}
/// Serializes to bytes.
pub fn to_bytes(&self) -> Vec<u8> {
Expand All @@ -292,11 +293,11 @@ impl fmt::Display for Ticket {

/// Deserializes from base32.
impl FromStr for Ticket {
type Err = n0_snafu::Error;
type Err = AnyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let bytes = data_encoding::BASE32_NOPAD
.decode(s.to_ascii_uppercase().as_bytes())
.e()?;
.std_context("decode ticket base32")?;
Self::from_bytes(&bytes)
}
}
Expand Down
6 changes: 3 additions & 3 deletions examples/setup.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use iroh::{protocol::Router, Endpoint};
use iroh_gossip::{net::Gossip, ALPN};
use n0_snafu::ResultExt;
use n0_error::{Result, StdResultExt};

#[tokio::main]
async fn main() -> n0_snafu::Result<()> {
async fn main() -> Result<()> {
// create an iroh endpoint that includes the standard discovery mechanisms
// we've built at number0
let endpoint = Endpoint::bind().await?;
Expand All @@ -16,6 +16,6 @@ async fn main() -> n0_snafu::Result<()> {
.accept(ALPN, gossip.clone())
.spawn();
// do fun stuff with the gossip protocol
router.shutdown().await.e()?;
router.shutdown().await.std_context("shutdown router")?;
Ok(())
}
45 changes: 22 additions & 23 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ use std::{
use bytes::Bytes;
use iroh_base::EndpointId;
use irpc::{channel::mpsc, rpc_requests, Client};
use n0_error::{e, stack_error};
use n0_future::{Stream, StreamExt, TryStreamExt};
use nested_enum_utils::common_fields;
use serde::{Deserialize, Serialize};
use snafu::Snafu;

use crate::proto::{DeliveryScope, TopicId};

Expand All @@ -38,19 +37,14 @@ pub(crate) struct JoinRequest {
}

#[allow(missing_docs)]
#[common_fields({
backtrace: Option<snafu::Backtrace>,
#[snafu(implicit)]
span_trace: n0_snafu::SpanTrace,
})]
#[derive(Debug, Snafu)]
#[stack_error(derive, add_meta, from_sources)]
#[non_exhaustive]
pub enum ApiError {
#[snafu(transparent)]
#[error(transparent)]
Rpc { source: irpc::Error },
/// The gossip topic was closed.
#[snafu(display("topic closed"))]
Closed {},
#[error("topic closed")]
Closed,
}

impl From<irpc::channel::SendError> for ApiError {
Expand Down Expand Up @@ -299,7 +293,7 @@ impl GossipReceiver {
/// continue to track `NeighborUp` events on the event stream.
pub async fn joined(&mut self) -> Result<(), ApiError> {
while !self.is_joined() {
let _event = self.next().await.ok_or(ClosedSnafu.build())??;
let _event = self.next().await.ok_or(e!(ApiError::Closed))??;
}
Ok(())
}
Expand Down Expand Up @@ -416,10 +410,10 @@ mod tests {
#[cfg(all(feature = "rpc", feature = "net"))]
#[tokio::test]
#[tracing_test::traced_test]
async fn test_rpc() -> n0_snafu::Result {
async fn test_rpc() -> n0_error::Result<()> {
use iroh::{discovery::static_provider::StaticProvider, protocol::Router, RelayMap};
use n0_error::{AnyError, Result, StackResultExt, StdResultExt};
use n0_future::{time::Duration, StreamExt};
use n0_snafu::{Error, Result, ResultExt};
use rand_chacha::rand_core::SeedableRng;

use crate::{
Expand Down Expand Up @@ -457,7 +451,7 @@ mod tests {
let task = tokio::task::spawn(async move {
let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
topic.broadcast(b"hello".to_vec().into()).await?;
Result::<_, Error>::Ok(router)
Ok::<_, AnyError>(router)
});
(endpoint_id, endpoint_addr, task)
};
Expand All @@ -471,16 +465,18 @@ mod tests {
// expose the gossip endpoint over RPC
let (rpc_server_endpoint, rpc_server_cert) =
irpc::util::make_server_endpoint("127.0.0.1:0".parse().unwrap())
.map_err(Error::anyhow)?;
let rpc_server_addr = rpc_server_endpoint.local_addr().e()?;
.context("make server endpoint")?;
let rpc_server_addr = rpc_server_endpoint
.local_addr()
.std_context("resolve server addr")?;
let rpc_server_task = tokio::task::spawn(async move {
gossip.listen(rpc_server_endpoint).await;
});

// connect to the RPC endpoint with a new client
let rpc_client_endpoint =
irpc::util::make_client_endpoint("127.0.0.1:0".parse().unwrap(), &[&rpc_server_cert])
.map_err(Error::anyhow)?;
.context("make client endpoint")?;
let rpc_client = GossipApi::connect(rpc_client_endpoint, rpc_server_addr);

// join via RPC
Expand All @@ -499,19 +495,22 @@ mod tests {
_ => {}
}
}
Result::<_, Error>::Ok(())
Ok::<_, AnyError>(())
};

// timeout to not hang in case of failure
tokio::time::timeout(Duration::from_secs(10), recv)
.await
.e()??;
.std_context("rpc recv timeout")??;

// shutdown
rpc_server_task.abort();
router.shutdown().await.e()?;
let router2 = endpoint2_task.await.e()??;
router2.shutdown().await.e()?;
router.shutdown().await.std_context("shutdown router")?;
let router2 = endpoint2_task.await.std_context("join endpoint task")??;
router2
.shutdown()
.await
.std_context("shutdown second router")?;
Ok(())
}

Expand Down
Loading
Loading