Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
120 commits
Select commit Hold shift + click to select a range
c17a870
feat: use quinn multipath
dignifiedquire Jul 7, 2025
7fe570d
update iroh-quinn
dignifiedquire Jul 7, 2025
2f469ac
start opening paths
dignifiedquire Jul 8, 2025
870716f
add more paths
dignifiedquire Jul 8, 2025
346a7c2
set keep alive and idle timeouts for new paths
dignifiedquire Jul 8, 2025
68b1769
insert relay path
dignifiedquire Jul 9, 2025
0eb3fde
set relay path as backup
dignifiedquire Jul 11, 2025
79ec17f
start removing ping logic from the node_map
dignifiedquire Jul 11, 2025
c4baca8
start tracking path events
dignifiedquire Jul 11, 2025
549adee
start figuring out more details
dignifiedquire Jul 12, 2025
9ef5765
wip
dignifiedquire Jul 14, 2025
75d5525
get some stuff to work again
dignifiedquire Jul 18, 2025
4f78898
remove ip_mapped_addresses
dignifiedquire Jul 21, 2025
130710f
use correct relay addr on recv
dignifiedquire Jul 21, 2025
dba89df
ensure connection registration
dignifiedquire Jul 21, 2025
aab083d
remove rtt_actor, this is now done inside quinn on a per path basis
dignifiedquire Jul 21, 2025
d4484da
open additional paths after the initial connection
dignifiedquire Jul 21, 2025
6cb94e4
ensure path open
dignifiedquire Jul 22, 2025
998e283
some debugging
dignifiedquire Jul 23, 2025
99cee61
update quinn branch
dignifiedquire Jul 23, 2025
4b60a9a
fixups
dignifiedquire Jul 28, 2025
04b714c
update deps
dignifiedquire Aug 1, 2025
59efdcf
bunch of renames and doc updates, no functional changes
flub Aug 28, 2025
f9924cd
switch to main multipath branch
flub Aug 29, 2025
3058a8e
another rename
flub Aug 29, 2025
11dd04d
Set max_idle_time to a good value
flub Aug 29, 2025
6869faa
fix typo
flub Sep 1, 2025
539a514
Start hooking up a new NodeStateActor
flub Sep 16, 2025
a1a7d89
Rename AllPathsMappedAddr to NodeIdMappedAddr
flub Sep 16, 2025
4b00ad7
Move all mapped addrs to one module
flub Sep 16, 2025
31ab033
allow me to send via the NodeStateActor
flub Sep 16, 2025
5ae71ed
Unify NodeIdMappedAddr and RelayMappedAddr a bit more
flub Sep 16, 2025
599f25d
start implementing AddConnection
flub Sep 17, 2025
2417e6b
make add_node_addr async and send it to the NodeStateActor
flub Sep 18, 2025
9820e60
start handling AddNodeAddr message
flub Sep 18, 2025
ef7a6f3
start sending datagrams from the actor
flub Sep 18, 2025
76f3768
Start adding state for holepunching decisions
flub Sep 23, 2025
1366161
refactor to allow scheduling holepunching attempts
flub Sep 23, 2025
3436423
plug in DiscoState to the NodeStateActor
flub Sep 23, 2025
01bf15c
a sane method to send disco messages
flub Sep 23, 2025
001d16d
Implement starting of holepunching
flub Sep 24, 2025
7a9023f
handle receiving pings
flub Sep 24, 2025
92dd37c
handle receiving CallMeMaybe messages
flub Sep 24, 2025
ac2db4d
open a path when we receive a pong
flub Sep 25, 2025
6d69555
Move open path to not block the actor
flub Sep 29, 2025
12d12c0
select the right path
flub Sep 30, 2025
9e7be2d
close paths on all connections
flub Sep 30, 2025
c1bfdf5
send connections to the NodeStateActor
flub Sep 30, 2025
7b4f056
remove PingActions from magicsock Actor Message
flub Sep 30, 2025
d3b81e7
Kill a bunch of dead code
flub Sep 30, 2025
d9ed9be
use a better way to send to this channel
flub Sep 30, 2025
2c521b0
Hook up connecting check for a valid send addr
flub Oct 1, 2025
21894e7
Do not send incoming packets through the NodeMap
flub Oct 1, 2025
fc97009
delete a whole bunch of unused code
flub Oct 1, 2025
0b4214b
delete some more unused code
flub Oct 1, 2025
a0d5ba1
fine tune logging
flub Oct 2, 2025
65c5d30
try_send is removed. do not log poll_send span twice
flub Oct 2, 2025
e7d9268
Do not holepunch on sending the first message
flub Oct 2, 2025
924f41b
postpone solving this, make the test work
flub Oct 2, 2025
8b1fd0f
Add the path from a new connection, call select path
flub Oct 3, 2025
0522c0e
Do not close the last direct path on a connection
flub Oct 3, 2025
50cc92f
Open some more paths when needed
flub Oct 3, 2025
4a1375f
Add a first relay test
flub Oct 3, 2025
ee20734
Merge branch 'main' into feat-multipath
flub Oct 6, 2025
fdeb6f4
tidy up NodeMap creation
flub Oct 6, 2025
d4fc291
Simplify delayed discovery start
flub Oct 6, 2025
8ba3fd9
Remove try_send impls, no longer used
flub Oct 6, 2025
23ef6fe
remove dead code
flub Oct 6, 2025
7fbd5df
Merge branch 'main' into feat-multipath
flub Oct 6, 2025
3f58e95
Remove MagicStack and mesh_stacks
flub Oct 7, 2025
7ac4dec
dead code
flub Oct 7, 2025
f38e5c2
remove test and delete then unused function
flub Oct 7, 2025
1eeb4bb
start removing old nodestate
flub Oct 7, 2025
145cb18
delete more: path_validity mod is gone
flub Oct 7, 2025
ccb3287
remove more!
flub Oct 7, 2025
80157d4
and more gone
flub Oct 7, 2025
b0957cb
rename temporary name, now the name is free again
flub Oct 7, 2025
a3e58f8
slightly better logging
flub Oct 7, 2025
ce564f1
avoid unneeded mut when not testing
flub Oct 7, 2025
4eb0f07
Convert to canonical IP address in IpSender
flub Oct 7, 2025
28bf51c
remove duplicate adding
flub Oct 8, 2025
befde29
clearer bounds writing
flub Oct 8, 2025
2b2f3c2
Merge branch 'main' into feat-multipath
flub Oct 8, 2025
79cd50c
fix AddrMap impl to update both maps
flub Oct 8, 2025
c2a192e
Always use IPv6 addresses
flub Oct 8, 2025
04e1e3d
tweak logging, this is too noisy
flub Oct 8, 2025
660c39a
random lost import
flub Oct 8, 2025
9da6c1c
keep reducing redundant logging
flub Oct 8, 2025
85cedca
Make transports Addrs use the canonical form
flub Oct 9, 2025
7ee963f
insert PathId::ZERO in the path_id_map
flub Oct 9, 2025
9de1055
fix addr selection for holepunching
flub Oct 9, 2025
ae07d68
bunch of logging imporvements
flub Oct 9, 2025
0ce64c9
remove redundant logging
flub Oct 9, 2025
9a592ce
plug through a minimal PathInfo
flub Oct 9, 2025
eb92bb7
We don't pend CallMeMaybe anymore
flub Oct 9, 2025
6e31e4d
clippy, at last some code quality
flub Oct 9, 2025
008648c
care to enable multipath?
flub Oct 9, 2025
cdc0f90
refactor into more methods, just mechanical
flub Oct 10, 2025
b80de1a
Log transports::Addr a bit more compactly
flub Oct 14, 2025
4200059
some minimal docs
flub Oct 14, 2025
5988e94
rework the test a little to have easier spans
flub Oct 14, 2025
77dfc32
log the correct packet lengths
flub Oct 14, 2025
8b8321b
Set the path status for the initial path
flub Oct 14, 2025
50f98b8
next thing to work on
flub Oct 14, 2025
b4114fa
chore: update git deps
dignifiedquire Oct 14, 2025
583cf9e
Merge remote-tracking branch 'origin/main' into feat-multipath
dignifiedquire Oct 14, 2025
5071e56
Merge remote-tracking branch 'origin/main' into feat-multipath
dignifiedquire Oct 14, 2025
1976993
improve logging and reduce max concurrent paths to 16
flub Oct 15, 2025
e10ac11
Merge branch 'main' into feat-multipath
flub Oct 16, 2025
ce5bdd2
newtype the connection id
flub Oct 16, 2025
7839a4c
make sure that the transport addrs use canonical addresses
flub Oct 17, 2025
fcaea95
No longer need to patch rustls
flub Oct 18, 2025
7d86eff
Only select the path based on open paths
flub Oct 20, 2025
203204c
some fixes to path selection
flub Oct 20, 2025
de2074e
small cleanups, review comments
flub Oct 21, 2025
55d1c29
Merge branch 'main' into feat-multipath
flub Oct 21, 2025
bd680f1
Merge branch 'main' into feat-multipath
flub Oct 21, 2025
9fcdd71
Use TransportAddr to improve PathInfo exposed
flub Oct 21, 2025
de6eefc
compile on wasm again
flub Oct 22, 2025
c2b131f
add relay to a new connection that is direct (#3569)
flub Oct 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 105 additions & 209 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_l

[workspace.lints.clippy]
unused-async = "warn"


[patch.crates-io]
netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "feat-multipath" }

[patch."https://github.com/n0-computer/quinn"]
# iroh-quinn = { path = "../iroh-quinn/quinn" }
# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" }
# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" }
16 changes: 15 additions & 1 deletion iroh-base/src/endpoint_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use crate::{EndpointId, PublicKey, RelayUrl};
/// [discovery]: https://docs.rs/iroh/*/iroh/index.html#endpoint-discovery
/// [home relay]: https://docs.rs/iroh/*/iroh/relay/index.html
/// [Relay server]: https://docs.rs/iroh/*/iroh/index.html#relay-servers
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(
derive_more::Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
pub struct EndpointAddr {
/// The endpoint's identifier.
pub id: EndpointId,
Expand All @@ -54,6 +56,18 @@ pub enum TransportAddr {
Ip(SocketAddr),
}

impl TransportAddr {
/// Whether this is a transport address via a relay server.
pub fn is_relay(&self) -> bool {
matches!(self, Self::Relay(_))
}

/// Whether this is an IP transport address.
pub fn is_ip(&self) -> bool {
matches!(self, Self::Ip(_))
}
}

impl EndpointAddr {
/// Creates a new [`EndpointAddr`] with no network level addresses.
///
Expand Down
4 changes: 2 additions & 2 deletions iroh-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ postcard = { version = "1", default-features = false, features = [
"use-std",
"experimental-derive",
] }
quinn = { package = "iroh-quinn", version = "0.14.0", default-features = false, features = ["rustls-ring"] }
quinn-proto = { package = "iroh-quinn-proto", version = "0.13.0" }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x", default-features = false, features = ["rustls-ring"] }
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x" }
rand = "0.9.2"
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
Expand Down
4 changes: 2 additions & 2 deletions iroh-relay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl ClientBuilder {
let conn = Conn::new(conn, self.key_cache.clone(), &self.secret_key).await?;

event!(
target: "events.net.relay.connected",
target: "iroh::_events::net::relay::connected",
Level::DEBUG,
url = %self.url,
);
Expand Down Expand Up @@ -337,7 +337,7 @@ impl ClientBuilder {
let conn = Conn::new(ws_stream, self.key_cache.clone(), &self.secret_key).await?;

event!(
target: "events.net.relay.connected",
target: "iroh::_events::net::relay::connected",
Level::DEBUG,
url = %self.url,
);
Expand Down
6 changes: 3 additions & 3 deletions iroh-relay/src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use iroh_base::SecretKey;
use n0_future::{Sink, Stream};
use nested_enum_utils::common_fields;
use snafu::{Backtrace, Snafu};
use tracing::debug;
use tracing::{debug, trace};

use super::KeyCache;
#[cfg(not(wasm_browser))]
Expand Down Expand Up @@ -99,9 +99,9 @@ impl Conn {
let mut conn = WsBytesFramed { io };

// exchange information with the server
debug!("server_handshake: started");
trace!("server_handshake: started");
handshake::clientside(&mut conn, secret_key).await?;
debug!("server_handshake: done");
trace!("server_handshake: done");

Ok(Self { conn, key_cache })
}
Expand Down
3 changes: 1 addition & 2 deletions iroh-relay/src/client/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl MaybeTlsStreamBuilder {

async fn dial_url_direct(&self) -> Result<tokio::net::TcpStream, DialError> {
use tokio::net::TcpStream;
debug!(%self.url, "dial url");
let dst_ip = self
.dns_resolver
.resolve_host(&self.url, self.prefer_ipv6, DNS_TIMEOUT)
Expand All @@ -147,7 +146,7 @@ impl MaybeTlsStreamBuilder {
let port = url_port(&self.url).context(InvalidTargetPortSnafu)?;
let addr = SocketAddr::new(dst_ip, port);

debug!("connecting to {}", addr);
trace!("connecting to {}", addr);
let tcp_stream = time::timeout(DIAL_ENDPOINT_TIMEOUT, async move {
TcpStream::connect(addr).await
})
Expand Down
9 changes: 5 additions & 4 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ nested_enum_utils = "0.2.1"
netwatch = { version = "0.10" }
pin-project = "1"
pkarr = { version = "5", default-features = false, features = ["relays"] }
quinn = { package = "iroh-quinn", version = "0.14.0", default-features = false, features = ["rustls-ring"] }
quinn-proto = { package = "iroh-quinn-proto", version = "0.13.0" }
quinn-udp = { package = "iroh-quinn-udp", version = "0.5.7" }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x", default-features = false, features = ["rustls-ring"] }
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x" }
quinn-udp = { package = "iroh-quinn-udp", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x" }
rand = "0.9.2"
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"stream",
] }
ring = "0.17"
rustc-hash = "2"
rustls = { version = "0.23.33", default-features = false, features = ["ring"] }
serde = { version = "1.0.219", features = ["derive", "rc"] }
smallvec = "1.11.1"
Expand Down Expand Up @@ -97,7 +98,7 @@ hickory-resolver = "0.25.1"
igd-next = { version = "0.16", features = ["aio_tokio"] }
netdev = { version = "0.38.1" }
portmapper = { version = "0.10", default-features = false }
quinn = { package = "iroh-quinn", version = "0.14.0", default-features = false, features = ["runtime-tokio", "rustls-ring"] }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x", default-features = false, features = ["runtime-tokio", "rustls-ring"] }
tokio = { version = "1", features = [
"io-util",
"macros",
Expand Down
2 changes: 1 addition & 1 deletion iroh/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ iroh = { path = ".." }
iroh-metrics = "0.36"
n0-future = "0.3.0"
n0-snafu = "0.2.0"
quinn = { package = "iroh-quinn", version = "0.14" }
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x" }
rand = "0.9.2"
rcgen = "0.14"
rustls = { version = "0.23.33", default-features = false, features = ["ring"] }
Expand Down
89 changes: 42 additions & 47 deletions iroh/src/disco.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
};

use data_encoding::HEXLOWER;
use iroh_base::{PublicKey, RelayUrl};
use iroh_base::{EndpointId, PublicKey, RelayUrl};
use nested_enum_utils::common_fields;
use rand::Rng;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -119,12 +119,50 @@ pub struct Ping {
/// Random client-generated per-ping transaction ID.
pub tx_id: TransactionId,

/// Allegedly the ping sender's wireguard public key.
/// It shouldn't be trusted by itself, but can be combined with
/// netmap data to reduce the discokey:endpointkey relation from 1:N to 1:1.
/// Allegedly the ping sender's public key.
///
/// It shouldn't be trusted by itself.
pub endpoint_key: PublicKey,
}

impl Ping {
/// Creates a ping message to ping `node_id`.
///
/// Uses a randomly generated STUN transaction ID.
pub(crate) fn new(endpoint_id: EndpointId) -> Self {
Self {
tx_id: TransactionId::default(),
endpoint_key: endpoint_id,
}
}

fn from_bytes(p: &[u8]) -> Result<Self, ParseError> {
// Deliberately lax on longer-than-expected messages, for future compatibility.
ensure!(p.len() >= PING_LEN, TooShortSnafu);
let tx_id: [u8; TX_LEN] = p[..TX_LEN].try_into().expect("length checked");
let raw_key = &p[TX_LEN..TX_LEN + iroh_base::PublicKey::LENGTH];
let endpoint_key =
PublicKey::try_from(raw_key).map_err(|_| InvalidEncodingSnafu.build())?;
let tx_id = TransactionId::from(tx_id);

Ok(Ping {
tx_id,
endpoint_key,
})
}

fn as_bytes(&self) -> Vec<u8> {
let header = msg_header(MessageType::Ping, V0);
let mut out = vec![0u8; PING_LEN + HEADER_LEN];

out[..HEADER_LEN].copy_from_slice(&header);
out[HEADER_LEN..HEADER_LEN + TX_LEN].copy_from_slice(&self.tx_id);
out[HEADER_LEN + TX_LEN..].copy_from_slice(self.endpoint_key.as_ref());

out
}
}

/// A response a Ping.
///
/// It includes the sender's source IP + port, so it's effectively a STUN response.
Expand All @@ -146,21 +184,6 @@ pub enum SendAddr {
Relay(RelayUrl),
}

impl SendAddr {
/// Returns if this is a `relay` addr.
pub fn is_relay(&self) -> bool {
matches!(self, Self::Relay(_))
}

/// Returns the `Some(Url)` if it is a relay addr.
pub fn relay_url(&self) -> Option<RelayUrl> {
match self {
Self::Relay(url) => Some(url.clone()),
Self::Udp(_) => None,
}
}
}

impl From<transports::Addr> for SendAddr {
fn from(addr: transports::Addr) -> Self {
match addr {
Expand Down Expand Up @@ -214,34 +237,6 @@ pub struct CallMeMaybe {
pub my_numbers: Vec<SocketAddr>,
}

impl Ping {
fn from_bytes(p: &[u8]) -> Result<Self, ParseError> {
// Deliberately lax on longer-than-expected messages, for future compatibility.
ensure!(p.len() >= PING_LEN, TooShortSnafu);
let tx_id: [u8; TX_LEN] = p[..TX_LEN].try_into().expect("length checked");
let raw_key = &p[TX_LEN..TX_LEN + iroh_base::PublicKey::LENGTH];
let endpoint_key =
PublicKey::try_from(raw_key).map_err(|_| InvalidEncodingSnafu.build())?;
let tx_id = TransactionId::from(tx_id);

Ok(Ping {
tx_id,
endpoint_key,
})
}

fn as_bytes(&self) -> Vec<u8> {
let header = msg_header(MessageType::Ping, V0);
let mut out = vec![0u8; PING_LEN + HEADER_LEN];

out[..HEADER_LEN].copy_from_slice(&header);
out[HEADER_LEN..HEADER_LEN + TX_LEN].copy_from_slice(&self.tx_id);
out[HEADER_LEN + TX_LEN..].copy_from_slice(self.endpoint_key.as_ref());

out
}
}

#[allow(missing_docs)]
#[common_fields({
backtrace: Option<snafu::Backtrace>,
Expand Down
30 changes: 7 additions & 23 deletions iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,11 @@ use snafu::{IntoError, Snafu, ensure};
use tokio::sync::oneshot;
use tracing::{Instrument, debug, error_span, warn};

use crate::Endpoint;
pub use crate::endpoint_info::{EndpointData, EndpointInfo, ParseError, UserData};
use crate::{Endpoint, magicsock::endpoint_map::Source};

#[cfg(not(wasm_browser))]
pub mod dns;

#[cfg(feature = "discovery-local-network")]
pub mod mdns;
pub mod pkarr;
Expand Down Expand Up @@ -492,11 +491,7 @@ impl Discovery for ConcurrentDiscovery {
}
}

/// Maximum duration since the last control or data message received from an endpoint to make us
/// start a discovery task.
const MAX_AGE: Duration = Duration::from_secs(10);

/// A wrapper around a tokio task which runs an endpoint discovery.
/// A wrapper around a tokio task which runs a node discovery.
pub(super) struct DiscoveryTask {
on_first_rx: oneshot::Receiver<Result<(), DiscoveryError>>,
_task: AbortOnDropHandle<()>,
Expand Down Expand Up @@ -527,30 +522,19 @@ impl DiscoveryTask {
/// If `delay` is set, the [`DiscoveryTask`] will first wait for `delay` and then check again
/// if we recently received messages from remote endpoint. If true, the task will abort.
/// Otherwise, or if no `delay` is set, the discovery will be started.
pub(super) fn maybe_start_after_delay(
pub(super) fn start_after_delay(
ep: &Endpoint,
endpoint_id: EndpointId,
delay: Option<Duration>,
delay: Duration,
) -> Result<Option<Self>, DiscoveryError> {
// If discovery is not needed, don't even spawn a task.
if !ep.needs_discovery(endpoint_id, MAX_AGE) {
return Ok(None);
}
ensure!(!ep.discovery().is_empty(), NoServiceConfiguredSnafu);
let (on_first_tx, on_first_rx) = oneshot::channel();
let ep = ep.clone();
let me = ep.id();
let task = task::spawn(
async move {
// If delay is set, wait and recheck if discovery is needed. If not, early-exit.
if let Some(delay) = delay {
time::sleep(delay).await;
if !ep.needs_discovery(endpoint_id, MAX_AGE) {
debug!("no discovery needed, abort");
on_first_tx.send(Ok(())).ok();
return;
}
}
time::sleep(delay).await;
Self::run(ep, endpoint_id, on_first_tx).await
}
.instrument(
Expand Down Expand Up @@ -606,10 +590,10 @@ impl DiscoveryTask {
continue;
}
debug!(%provenance, addr = ?endpoint_addr, "new address found");
let source = crate::magicsock::Source::Discovery {
let source = Source::Discovery {
name: provenance.to_string(),
};
ep.add_endpoint_addr(endpoint_addr, source).ok();
ep.add_endpoint_addr(endpoint_addr, source).await.ok();

if let Some(tx) = on_first_tx.take() {
tx.send(Ok(())).ok();
Expand Down
Loading